diff --git a/ksql-common/src/main/java/io/confluent/ksql/schema/connect/SqlSchemaFormatter.java b/ksql-common/src/main/java/io/confluent/ksql/schema/connect/SqlSchemaFormatter.java
index 515ce23d000c..bac8a2fb4d32 100644
--- a/ksql-common/src/main/java/io/confluent/ksql/schema/connect/SqlSchemaFormatter.java
+++ b/ksql-common/src/main/java/io/confluent/ksql/schema/connect/SqlSchemaFormatter.java
@@ -60,7 +60,7 @@ public enum Option {
/**
* Construct instance.
*
- *
The {@code reservedWordPredicate} allows this formatter, which lives in the common module,
+ *
The {@code addQuotesPredicate} allows this formatter, which lives in the common module,
* to be wired up to the set of reserved words defined in the parser module. Wire up to
* {@code ParserUtil::isReservedWord}.
*
@@ -72,11 +72,11 @@ public enum Option {
* quotes. NB: this also makes the field name case-sensitive. So care must be taken to ensure
* field names have the correct case.
*
- * @param reservedWordPredicate predicate to determine if a word is reserved in the SQL syntax.
+ * @param addQuotesPredicate predicate to determine if a word should be quoted.
* @param options the options to use when formatting the SQL.
*/
public SqlSchemaFormatter(
- final Predicate reservedWordPredicate,
+ final Predicate addQuotesPredicate,
final Option... options
) {
this.options = options.length == 0
@@ -84,7 +84,7 @@ public SqlSchemaFormatter(
: EnumSet.of(options[0], options);
this.formatOptions = FormatOptions.of(
- requireNonNull(reservedWordPredicate, "reservedWordPredicate")
+ requireNonNull(addQuotesPredicate, "addQuotesPredicate")
);
}
diff --git a/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/FormatOptions.java b/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/FormatOptions.java
index 705369d4a2b3..d500881c688c 100644
--- a/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/FormatOptions.java
+++ b/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/FormatOptions.java
@@ -22,7 +22,7 @@
public final class FormatOptions {
- private final Predicate reservedWordPredicate;
+ private final Predicate addQuotesPredicate;
public static FormatOptions none() {
return new FormatOptions(word -> true);
@@ -41,7 +41,7 @@ public static FormatOptions noEscape() {
/**
* Construct instance.
*
- * The {@code reservedWordPredicate} allows code that lives in the common module
+ *
The {@code addQuotesPredicate} allows code that lives in the common module
* to be wired up to the set of reserved words defined in the parser module. Wire this up to
* {@code ParserUtil::isReservedIdentifier}.
*
@@ -53,29 +53,29 @@ public static FormatOptions noEscape() {
* quotes. NB: this also makes the field name case-sensitive. So care must be taken to ensure
* field names have the correct case.
*
- * @param reservedWordPredicate predicate to test if a word is a reserved in SQL syntax.
+ * @param addQuotesPredicate predicate to test if a word should be quoted.
* @return instance of {@code FormatOptions}.
*/
- public static FormatOptions of(final Predicate reservedWordPredicate) {
- return new FormatOptions(reservedWordPredicate);
+ public static FormatOptions of(final Predicate addQuotesPredicate) {
+ return new FormatOptions(addQuotesPredicate);
}
private FormatOptions(final Predicate fieldNameEscaper) {
- this.reservedWordPredicate = requireNonNull(fieldNameEscaper, "reservedWordPredicate");
+ this.addQuotesPredicate = requireNonNull(fieldNameEscaper, "addQuotesPredicate");
}
- public boolean isReservedWord(final String word) {
- return reservedWordPredicate.test(word);
+ private boolean shouldQuote(final String word) {
+ return addQuotesPredicate.test(word);
}
/**
- * Escapes {@code word} if it is a reserved word, determined by {@link #isReservedWord(String)}.
+ * Escapes {@code word} if it is a reserved word, determined by {@link #shouldQuote(String)}.
*
* @param word the word to escape
* @return {@code word}, if it is not a reserved word, otherwise {@code word} wrapped in
* back quotes
*/
public String escape(final String word) {
- return isReservedWord(word) ? KsqlConstants.ESCAPE + word + KsqlConstants.ESCAPE : word;
+ return shouldQuote(word) ? KsqlConstants.ESCAPE + word + KsqlConstants.ESCAPE : word;
}
}
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjector.java b/ksql-engine/src/main/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjector.java
index 2261c585be76..90ccb39a339b 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjector.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjector.java
@@ -53,7 +53,7 @@
public class DefaultSchemaInjector implements Injector {
private static final SqlSchemaFormatter FORMATTER = new SqlSchemaFormatter(
- IdentifierUtil::needsQuotes, Option.AS_COLUMN_LIST);
+ w -> !IdentifierUtil.isValid(w), Option.AS_COLUMN_LIST);
private final TopicSchemaSupplier schemaSupplier;
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjectorTest.java b/ksql-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjectorTest.java
index 8d3d413a343c..3bc6a747883f 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjectorTest.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjectorTest.java
@@ -278,7 +278,7 @@ public void shouldBuildNewCsStatementText() {
// Then:
assertThat(result.getStatementText(), is(
- "CREATE STREAM cs ("
+ "CREATE STREAM `cs` ("
+ "INTFIELD INTEGER, "
+ "BIGINTFIELD BIGINT, "
+ "DOUBLEFIELD DOUBLE, "
@@ -303,7 +303,7 @@ public void shouldBuildNewCtStatementText() {
// Then:
assertThat(result.getStatementText(), is(
- "CREATE TABLE ct ("
+ "CREATE TABLE `ct` ("
+ "INTFIELD INTEGER, "
+ "BIGINTFIELD BIGINT, "
+ "DOUBLEFIELD DOUBLE, "
@@ -330,7 +330,7 @@ public void shouldBuildNewCsStatementTextFromId() {
// Then:
assertThat(result.getStatementText(), is(
- "CREATE STREAM cs ("
+ "CREATE STREAM `cs` ("
+ "INTFIELD INTEGER, "
+ "BIGINTFIELD BIGINT, "
+ "DOUBLEFIELD DOUBLE, "
@@ -357,7 +357,7 @@ public void shouldBuildNewCtStatementTextFromId() {
// Then:
assertThat(result.getStatementText(), is(
- "CREATE TABLE ct ("
+ "CREATE TABLE `ct` ("
+ "INTFIELD INTEGER, "
+ "BIGINTFIELD BIGINT, "
+ "DOUBLEFIELD DOUBLE, "
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java
index b8c0a4a28f11..e03c44298a3e 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java
@@ -784,7 +784,7 @@ public void shouldSummarizeExecutionPlanCorrectly() {
// When/Then:
assertThat(schemaKtream.getExecutionPlan(new QueryId("query"), ""), equalTo(
- " > [ SOURCE ] | Schema: [ROWKEY STRING KEY, key STRING, val BIGINT] | "
+ " > [ SOURCE ] | Schema: [ROWKEY STRING KEY, `key` STRING, `val` BIGINT] | "
+ "Logger: query.node.source\n"
+ "\tparent plan"));
}
@@ -809,7 +809,7 @@ public void shouldSummarizeExecutionPlanCorrectlyForRoot() {
// When/Then:
assertThat(schemaKtream.getExecutionPlan(new QueryId("query"), ""), equalTo(
- " > [ SOURCE ] | Schema: [ROWKEY STRING KEY, key STRING, val BIGINT] | "
+ " > [ SOURCE ] | Schema: [ROWKEY STRING KEY, `key` STRING, `val` BIGINT] | "
+ "Logger: query.node.source\n"));
}
@@ -839,7 +839,7 @@ public void shouldSummarizeExecutionPlanCorrectlyWhenMultipleParents() {
// When/Then:
assertThat(schemaKtream.getExecutionPlan(new QueryId("query"), ""), equalTo(
- " > [ SOURCE ] | Schema: [ROWKEY STRING KEY, key STRING, val BIGINT] | "
+ " > [ SOURCE ] | Schema: [ROWKEY STRING KEY, `key` STRING, `val` BIGINT] | "
+ "Logger: query.node.source\n"
+ "\tparent 1 plan"
+ "\tparent 2 plan"));
diff --git a/ksql-parser/src/main/java/io/confluent/ksql/util/IdentifierUtil.java b/ksql-parser/src/main/java/io/confluent/ksql/util/IdentifierUtil.java
index fcde39d28780..e1f99572fd93 100644
--- a/ksql-parser/src/main/java/io/confluent/ksql/util/IdentifierUtil.java
+++ b/ksql-parser/src/main/java/io/confluent/ksql/util/IdentifierUtil.java
@@ -29,7 +29,7 @@ private IdentifierUtil() { }
* @param identifier the identifier
* @return whether or not {@code identifier} is a valid identifier without quotes
*/
- public static boolean needsQuotes(final String identifier) {
+ public static boolean isValid(final String identifier) {
final SqlBaseLexer sqlBaseLexer = new SqlBaseLexer(
new CaseInsensitiveStream(CharStreams.fromString(identifier)));
final CommonTokenStream tokenStream = new CommonTokenStream(sqlBaseLexer);
@@ -43,8 +43,20 @@ public static boolean needsQuotes(final String identifier) {
sqlBaseParser.identifier();
// needs quotes if the `identifier` was not able to read the entire line
- return sqlBaseParser.getNumberOfSyntaxErrors() != 0
- || sqlBaseParser.getCurrentToken().getCharPositionInLine() != identifier.length();
+ return sqlBaseParser.getNumberOfSyntaxErrors() == 0
+ && sqlBaseParser.getCurrentToken().getCharPositionInLine() == identifier.length();
+ }
+
+ /**
+ * @param identifier the identifier
+ * @return whether or not {@code identifier} needs quotes to be parsed as the same identifier
+ */
+ public static boolean needsQuotes(final String identifier) {
+ return !(isValid(identifier) && upperCase(identifier));
+ }
+
+ private static boolean upperCase(final String identifier) {
+ return identifier.toUpperCase().equals(identifier);
}
}
diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java
index 982b962e4aa5..41f6144ea67b 100644
--- a/ksql-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java
+++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java
@@ -230,7 +230,7 @@ public void shouldFormatCreateStreamStatementWithExplicitKey() {
final String sql = SqlFormatter.formatSql(createStream);
// Then:
- assertThat(sql, is("CREATE STREAM TEST (ROWKEY STRING KEY, Foo STRING) "
+ assertThat(sql, is("CREATE STREAM TEST (ROWKEY STRING KEY, `Foo` STRING) "
+ "WITH (KAFKA_TOPIC='topic_test', KEY='ORDERID', VALUE_FORMAT='JSON');"));
}
@@ -247,7 +247,7 @@ public void shouldFormatCreateStreamStatementWithImplicitKey() {
final String sql = SqlFormatter.formatSql(createStream);
// Then:
- assertThat(sql, is("CREATE STREAM TEST (Foo STRING, Bar STRING) "
+ assertThat(sql, is("CREATE STREAM TEST (`Foo` STRING, `Bar` STRING) "
+ "WITH (KAFKA_TOPIC='topic_test', KEY='ORDERID', VALUE_FORMAT='JSON');"));
}
@@ -264,7 +264,7 @@ public void shouldFormatCreateTableStatementWithExplicitKey() {
final String sql = SqlFormatter.formatSql(createTable);
// Then:
- assertThat(sql, is("CREATE TABLE TEST (ROWKEY STRING KEY, Foo STRING) "
+ assertThat(sql, is("CREATE TABLE TEST (ROWKEY STRING KEY, `Foo` STRING) "
+ "WITH (KAFKA_TOPIC='topic_test', KEY='ORDERID', VALUE_FORMAT='JSON');"));
}
@@ -281,7 +281,7 @@ public void shouldFormatCreateTableStatementWithImplicitKey() {
final String sql = SqlFormatter.formatSql(createTable);
// Then:
- assertThat(sql, is("CREATE TABLE TEST (Foo STRING, Bar STRING) "
+ assertThat(sql, is("CREATE TABLE TEST (`Foo` STRING, `Bar` STRING) "
+ "WITH (KAFKA_TOPIC='topic_test', KEY='ORDERID', VALUE_FORMAT='JSON');"));
}
diff --git a/ksql-parser/src/test/java/io/confluent/ksql/util/IdentifierUtilTest.java b/ksql-parser/src/test/java/io/confluent/ksql/util/IdentifierUtilTest.java
index e241f052ec6c..496888079370 100644
--- a/ksql-parser/src/test/java/io/confluent/ksql/util/IdentifierUtilTest.java
+++ b/ksql-parser/src/test/java/io/confluent/ksql/util/IdentifierUtilTest.java
@@ -29,6 +29,7 @@ public void shouldNeedBackQuotes() {
"SELECT", // reserved word
"@ID", // invalid character
"FOO.BAR", // with a dot
+ "foo" // lower case
};
// Then:
@@ -52,4 +53,39 @@ public void shouldNotNeedBackQuotes() {
}
}
+ @Test
+ public void shouldBeValid() {
+ // Given:
+ final String[] identifiers = new String[]{
+ "FOO", // nothing special
+ "foo", // lower-case
+ };
+
+ // Then:
+ for (final String identifier : identifiers) {
+ assertThat(
+ "Expected " + identifier + " to be valid.",
+ IdentifierUtil.isValid(identifier)
+ );
+ }
+ }
+
+ @Test
+ public void shouldNotBeValid() {
+ // Given:
+ final String[] identifiers = new String[]{
+ "@FOO", // invalid character
+ "FOO.BAR", // Dot
+ "SELECT" // reserved word
+ };
+
+ // Then:
+ for (final String identifier : identifiers) {
+ assertThat(
+ "Expected " + identifier + " to be invalid",
+ !IdentifierUtil.isValid(identifier)
+ );
+ }
+ }
+
}
\ No newline at end of file
diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/ProcessingLogServerUtils.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/ProcessingLogServerUtils.java
index 743357fba5cf..1d1aeeb7aefa 100644
--- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/ProcessingLogServerUtils.java
+++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/ProcessingLogServerUtils.java
@@ -39,7 +39,7 @@ public final class ProcessingLogServerUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(ProcessingLogServerUtils.class);
private static final SqlSchemaFormatter FORMATTER =
- new SqlSchemaFormatter(IdentifierUtil::needsQuotes, Option.AS_COLUMN_LIST);
+ new SqlSchemaFormatter(w -> !IdentifierUtil.isValid(w), Option.AS_COLUMN_LIST);
private ProcessingLogServerUtils() {
}