From 70565f2969df109c6fd4ae6bb7c48c5365c45f9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Pe=C3=B1a?= Date: Wed, 11 Aug 2021 16:10:36 -0500 Subject: [PATCH] feat: add CREATE SOURCE TABLE syntax and metadata info (#7945) --- .../ddl/commands/CreateSourceFactory.java | 8 +- .../ksql/ddl/commands/DdlCommandExec.java | 4 +- .../ddl/commands/CommandFactoriesTest.java | 25 +++++- .../ddl/commands/CreateSourceFactoryTest.java | 49 ++++++++--- .../ksql/ddl/commands/DdlCommandExecTest.java | 59 ++++++++++--- .../engine/rewrite/StatementRewriterTest.java | 6 +- .../ksql/planner/plan/DataSourceNodeTest.java | 3 +- .../ddl/commands/CreateTableCommand.java | 47 +++++++++- .../ksql/metastore/model/DataSource.java | 5 ++ .../ksql/metastore/model/KsqlStream.java | 3 +- .../ksql/metastore/model/KsqlTable.java | 10 ++- .../metastore/model/StructuredDataSource.java | 10 ++- .../model/StructuredDataSourceTest.java | 6 +- .../confluent/ksql/util/MetaStoreFixture.java | 9 +- .../io/confluent/ksql/parser/SqlBase.g4 | 2 +- .../io/confluent/ksql/parser/AstBuilder.java | 6 +- .../confluent/ksql/parser/SqlFormatter.java | 4 + .../ksql/parser/tree/CreateSource.java | 14 ++- .../ksql/parser/tree/CreateStream.java | 2 +- .../ksql/parser/tree/CreateTable.java | 15 ++-- .../confluent/ksql/parser/AstBuilderTest.java | 27 ++++++ .../confluent/ksql/parser/KsqlParserTest.java | 86 +++---------------- .../ksql/parser/SqlFormatterTest.java | 42 +++++++-- .../ksql/parser/tree/CreateTableTest.java | 23 ++--- .../rest/server/StandaloneExecutorTest.java | 3 +- .../ksql/rest/server/TemporaryEngine.java | 3 +- .../execution/InsertValuesExecutorTest.java | 3 +- .../server/resources/KsqlResourceTest.java | 3 +- .../resources/ksql-plan-schema/schema.json | 4 + 29 files changed, 330 insertions(+), 151 deletions(-) diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateSourceFactory.java b/ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateSourceFactory.java index 0a7c32335586..72728456b29e 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateSourceFactory.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateSourceFactory.java @@ -134,6 +134,7 @@ public CreateStreamCommand createStreamCommand( ); } + // This method is called by CREATE_AS statements public CreateTableCommand createTableCommand(final KsqlStructuredDataOutputNode outputNode) { return new CreateTableCommand( outputNode.getSinkName().get(), @@ -142,10 +143,12 @@ public CreateTableCommand createTableCommand(final KsqlStructuredDataOutputNode outputNode.getKsqlTopic().getKafkaTopicName(), Formats.from(outputNode.getKsqlTopic()), outputNode.getKsqlTopic().getKeyFormat().getWindowInfo(), - Optional.of(outputNode.getOrReplace()) + Optional.of(outputNode.getOrReplace()), + Optional.of(false) ); } + // This method is called by simple CREATE statements public CreateTableCommand createTableCommand( final CreateTable statement, final KsqlConfig ksqlConfig @@ -188,7 +191,8 @@ public CreateTableCommand createTableCommand( topicName, buildFormats(statement.getName(), schema, props, ksqlConfig), getWindowInfo(props), - Optional.of(statement.isOrReplace()) + Optional.of(statement.isOrReplace()), + Optional.of(statement.isSource()) ); } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/DdlCommandExec.java b/ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/DdlCommandExec.java index 94a587fdc2f5..5b863f6addaa 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/DdlCommandExec.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/DdlCommandExec.java @@ -118,13 +118,15 @@ public DdlCommandResult executeCreateTable(final CreateTableCommand createTable) + "already exists.", sourceName, sourceType.toLowerCase())); } + final KsqlTable ksqlTable = new KsqlTable<>( sql, createTable.getSourceName(), createTable.getSchema(), createTable.getTimestampColumn(), withQuery, - getKsqlTopic(createTable) + getKsqlTopic(createTable), + createTable.isSource() ); metaStore.putSource(ksqlTable, createTable.isOrReplace()); metaStore.addSourceReferences(ksqlTable.getName(), withQuerySources); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/CommandFactoriesTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/CommandFactoriesTest.java index a3e94c4e9c2a..8ffa5b9aaaff 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/CommandFactoriesTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/CommandFactoriesTest.java @@ -198,7 +198,25 @@ public void shouldCreateCommandForCreateTable() { TableElements.of( tableElement(Namespace.VALUE, "COL1", new Type(SqlTypes.BIGINT)), tableElement(Namespace.VALUE, "COL2", new Type(SqlTypes.STRING))), - false, true, withProperties); + false, true, withProperties, false); + + // When: + final DdlCommand result = commandFactories + .create(sqlExpression, statement, SessionConfig.of(ksqlConfig, emptyMap())); + + // Then: + assertThat(result, is(createTableCommand)); + verify(createSourceFactory).createTableCommand(statement, ksqlConfig); + } + + @Test + public void shouldCreateCommandForCreateSourceTable() { + // Given: + final CreateTable statement = new CreateTable(SOME_NAME, + TableElements.of( + tableElement(Namespace.VALUE, "COL1", new Type(SqlTypes.BIGINT)), + tableElement(Namespace.VALUE, "COL2", new Type(SqlTypes.STRING))), + false, true, withProperties, true); // When: final DdlCommand result = commandFactories @@ -216,7 +234,7 @@ public void shouldCreateCommandForCreateTableWithOverriddenProperties() { TableElements.of( tableElement(Namespace.VALUE, "COL1", new Type(SqlTypes.BIGINT)), tableElement(Namespace.VALUE, "COL2", new Type(SqlTypes.STRING))), - false, true, withProperties); + false, true, withProperties, false); // When: commandFactories.create(sqlExpression, statement, SessionConfig.of(ksqlConfig, OVERRIDES)); @@ -354,7 +372,8 @@ public void shouldCreateTableCommandWithSingleValueWrappingFromOverridesNotConfi ); final DdlStatement statement = - new CreateTable(SOME_NAME, ELEMENTS_WITH_PK, false, true, withProperties); + new CreateTable(SOME_NAME, ELEMENTS_WITH_PK, + false, true, withProperties, false); // When: final DdlCommand cmd = commandFactories diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateSourceFactoryTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateSourceFactoryTest.java index ccbdaa347b7f..182521e386b7 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateSourceFactoryTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateSourceFactoryTest.java @@ -293,7 +293,7 @@ public void shouldCreateCommandForCreateTable() { TableElements.of( tableElement(PRIMARY_KEY, "COL1", new Type(BIGINT)), tableElement(VALUE, "COL2", new Type(SqlTypes.STRING))), - false, true, withProperties); + false, true, withProperties, false); // When: final CreateTableCommand result = createSourceFactory @@ -302,6 +302,26 @@ public void shouldCreateCommandForCreateTable() { // Then: assertThat(result.getSourceName(), is(SOME_NAME)); assertThat(result.getTopicName(), is(TOPIC_NAME)); + assertThat(result.isSource(), is(false)); + } + + @Test + public void shouldCreateCommandForCreateSourceTable() { + // Given: + final CreateTable ddlStatement = new CreateTable(SOME_NAME, + TableElements.of( + tableElement(PRIMARY_KEY, "COL1", new Type(BIGINT)), + tableElement(VALUE, "COL2", new Type(SqlTypes.STRING))), + false, true, withProperties, true); + + // When: + final CreateTableCommand result = createSourceFactory + .createTableCommand(ddlStatement, ksqlConfig); + + // Then: + assertThat(result.getSourceName(), is(SOME_NAME)); + assertThat(result.getTopicName(), is(TOPIC_NAME)); + assertThat(result.isSource(), is(true)); } @Test @@ -379,7 +399,8 @@ public void shouldCreateTableCommandWithSingleValueWrappingFromPropertiesNotConf givenProperty(CommonCreateConfigs.WRAP_SINGLE_VALUE, new BooleanLiteral("false")); final CreateTable statement = - new CreateTable(SOME_NAME, TABLE_ELEMENTS_1_VALUE, false, true, withProperties); + new CreateTable(SOME_NAME, TABLE_ELEMENTS_1_VALUE, + false, true, withProperties, false); // When: final CreateTableCommand cmd = createSourceFactory @@ -400,7 +421,8 @@ public void shouldCreateTableCommandWithSingleValueWrappingFromConfig() { )); final CreateTable statement = - new CreateTable(SOME_NAME, TABLE_ELEMENTS_1_VALUE, false, true, withProperties); + new CreateTable(SOME_NAME, TABLE_ELEMENTS_1_VALUE, + false, true, withProperties, false); // When: final CreateTableCommand cmd = createSourceFactory @@ -415,7 +437,8 @@ public void shouldCreateTableCommandWithSingleValueWrappingFromConfig() { public void shouldCreateTableCommandWithSingleValueWrappingFromDefaultConfig() { // Given: final CreateTable statement = - new CreateTable(SOME_NAME, TABLE_ELEMENTS_1_VALUE, false, true, withProperties); + new CreateTable(SOME_NAME, TABLE_ELEMENTS_1_VALUE, + false, true, withProperties, false); // When: final CreateTableCommand cmd = createSourceFactory @@ -446,7 +469,8 @@ public void shouldThrowOnNoElementsInCreateStream() { public void shouldThrowOnNoElementsInCreateTable() { // Given: final CreateTable statement - = new CreateTable(SOME_NAME, TableElements.of(), false, true, withProperties); + = new CreateTable(SOME_NAME, TableElements.of(), + false, true, withProperties, false); // When: final Exception e = assertThrows( @@ -475,7 +499,8 @@ public void shouldNotThrowWhenThereAreElementsInCreateStream() { public void shouldNotThrowWhenThereAreElementsInCreateTable() { // Given: final CreateTable statement = - new CreateTable(SOME_NAME, TABLE_ELEMENTS_1_VALUE, false, true, withProperties); + new CreateTable(SOME_NAME, TABLE_ELEMENTS_1_VALUE, + false, true, withProperties, false); // When: createSourceFactory.createTableCommand(statement, ksqlConfig); @@ -609,7 +634,8 @@ public void shouldBuildTimestampColumnForTable() { new StringLiteral(quote(ELEMENT2.getName().text())) ); final CreateTable statement = - new CreateTable(SOME_NAME, TABLE_ELEMENTS, false, true, withProperties); + new CreateTable(SOME_NAME, TABLE_ELEMENTS, + false, true, withProperties, false); // When: final CreateTableCommand cmd = createSourceFactory.createTableCommand( @@ -731,7 +757,7 @@ public void shouldCreateValueSerdeToValidateValueFormatCanHandleValueSchema() { // Given: givenCommandFactoriesWithMocks(); final CreateTable statement = new CreateTable(SOME_NAME, TABLE_ELEMENTS, false, true, - withProperties); + withProperties, false); when(valueSerdeFactory.create( FormatInfo.of(JSON.name()), @@ -1010,7 +1036,8 @@ public void shouldThrowIfTableIsMissingPrimaryKey() { final TableElements noKey = TableElements.of(ELEMENT1); final CreateTable statement = - new CreateTable(SOME_NAME, noKey, false, true, withProperties); + new CreateTable(SOME_NAME, noKey, + false, true, withProperties, false); // When: final Exception e = assertThrows( @@ -1060,7 +1087,7 @@ public void shouldNotThrowOnCreateTableIfNotExistsIsSet() { TableElements.of( tableElement(PRIMARY_KEY, "COL1", new Type(BIGINT)), tableElement(VALUE, "COL2", new Type(SqlTypes.STRING))), - false, true, withProperties); + false, true, withProperties, false); // When: final CreateTableCommand result = createSourceFactory @@ -1077,7 +1104,7 @@ public void shouldThrowIfTableExists() { TableElements.of( tableElement(PRIMARY_KEY, "COL1", new Type(BIGINT)), tableElement(VALUE, "COL2", new Type(SqlTypes.STRING))), - false, false, withProperties); + false, false, withProperties, false); // When: final Exception e = assertThrows( diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/DdlCommandExecTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/DdlCommandExecTest.java index 9e65a0722dce..883512fd8431 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/DdlCommandExecTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/ddl/commands/DdlCommandExecTest.java @@ -21,6 +21,7 @@ import io.confluent.ksql.metastore.MutableMetaStore; import io.confluent.ksql.metastore.model.DataSource.DataSourceType; import io.confluent.ksql.metastore.model.KsqlStream; +import io.confluent.ksql.metastore.model.KsqlTable; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.name.SourceName; import io.confluent.ksql.schema.ksql.Column; @@ -257,12 +258,49 @@ public void shouldAddSinkTable() { assertThat(metaStore.getSource(TABLE_NAME).isCasTarget(), is(true)); } + @Test + public void shouldAddNormalTableWhenNoTypeIsSpecified() { + // Given: + final CreateTableCommand cmd = buildCreateTable( + SourceName.of("t1"), + false, + null + ); + + // When: + cmdExec.execute(SQL_TEXT, cmd, true, NO_QUERY_SOURCES); + + // Then: + final KsqlTable ksqlTable = (KsqlTable) metaStore.getSource(SourceName.of("t1")); + assertThat(ksqlTable.isSource(), is(false)); + } + + @Test + public void shouldAddSourceTable() { + // Given: + final CreateTableCommand cmd = buildCreateTable( + SourceName.of("t1"), + false, + true + ); + + // When: + cmdExec.execute(SQL_TEXT, cmd, true, NO_QUERY_SOURCES); + + // Then: + final KsqlTable ksqlTable = (KsqlTable) metaStore.getSource(SourceName.of("t1")); + assertThat(ksqlTable.isSource(), is(true)); + } + @Test public void shouldThrowOnDropTableWhenConstraintExist() { // Given: - final CreateTableCommand table1 = buildCreateTable(SourceName.of("t1"), false); - final CreateTableCommand table2 = buildCreateTable(SourceName.of("t2"), false); - final CreateTableCommand table3 = buildCreateTable(SourceName.of("t3"), false); + final CreateTableCommand table1 = buildCreateTable(SourceName.of("t1"), + false, false); + final CreateTableCommand table2 = buildCreateTable(SourceName.of("t2"), + false, false); + final CreateTableCommand table3 = buildCreateTable(SourceName.of("t3"), + false, false); cmdExec.execute(SQL_TEXT, table1, true, Collections.emptySet()); cmdExec.execute(SQL_TEXT, table2, true, Collections.singleton(SourceName.of("t1"))); cmdExec.execute(SQL_TEXT, table3, true, Collections.singleton(SourceName.of("t1"))); @@ -470,7 +508,7 @@ public void shouldWarnAddDuplicateTableWithoutReplace() { cmdExec.execute(SQL_TEXT, createTable, false, NO_QUERY_SOURCES); // When: - givenCreateTable(false); + givenCreateTable(); final DdlCommandResult result =cmdExec.execute(SQL_TEXT, createTable, false, NO_QUERY_SOURCES); @@ -546,21 +584,19 @@ private void givenCreateWindowedTable() { SerdeFeatures.of() ), Optional.of(windowInfo), + Optional.of(false), Optional.of(false) ); } private void givenCreateTable() { - createTable = buildCreateTable(TABLE_NAME, false); - } - - private void givenCreateTable(final boolean allowReplace) { - createTable = buildCreateTable(TABLE_NAME, allowReplace); + createTable = buildCreateTable(TABLE_NAME, false, false); } private CreateTableCommand buildCreateTable( final SourceName sourceName, - final boolean allowReplace + final boolean allowReplace, + final Boolean isSource ) { return new CreateTableCommand( sourceName, @@ -574,7 +610,8 @@ private CreateTableCommand buildCreateTable( SerdeFeatures.of() ), Optional.empty(), - Optional.of(allowReplace) + Optional.of(allowReplace), + Optional.ofNullable(isSource) ); } } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/StatementRewriterTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/StatementRewriterTest.java index 9e66d6bde535..c2f02605484a 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/StatementRewriterTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/StatementRewriterTest.java @@ -659,7 +659,8 @@ public void shouldRewriteCreateTable() { TableElements.of(tableElement1, tableElement2), false, false, - sourceProperties + sourceProperties, + false ); when(mockRewriter.apply(tableElement1, context)).thenReturn(rewrittenTableElement1); when(mockRewriter.apply(tableElement2, context)).thenReturn(rewrittenTableElement2); @@ -677,7 +678,8 @@ public void shouldRewriteCreateTable() { TableElements.of(rewrittenTableElement1, rewrittenTableElement2), false, false, - sourceProperties + sourceProperties, + false ) ) ); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/planner/plan/DataSourceNodeTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/planner/plan/DataSourceNodeTest.java index 9799cbfdf562..44a430294b4a 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/planner/plan/DataSourceNodeTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/planner/plan/DataSourceNodeTest.java @@ -244,7 +244,8 @@ public void shouldBuildSchemaKTableWhenKTableSource() { "topic2", KeyFormat.nonWindowed(FormatInfo.of(FormatFactory.KAFKA.name()), SerdeFeatures.of()), ValueFormat.of(FormatInfo.of(FormatFactory.JSON.name()), SerdeFeatures.of()) - ) + ), + false ); node = new DataSourceNode( diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/ddl/commands/CreateTableCommand.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/ddl/commands/CreateTableCommand.java index bd0620867d63..f0d71922f361 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/ddl/commands/CreateTableCommand.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/ddl/commands/CreateTableCommand.java @@ -15,6 +15,7 @@ package io.confluent.ksql.execution.ddl.commands; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.errorprone.annotations.Immutable; @@ -23,11 +24,13 @@ import io.confluent.ksql.name.SourceName; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.serde.WindowInfo; +import java.util.Objects; import java.util.Optional; @JsonIgnoreProperties({"keyField"}) // Removed at version 0.10 @Immutable public class CreateTableCommand extends CreateSourceCommand { + private final Optional isSource; public CreateTableCommand( @JsonProperty(value = "sourceName", required = true) final SourceName sourceName, @@ -36,7 +39,8 @@ public CreateTableCommand( @JsonProperty(value = "topicName", required = true) final String topicName, @JsonProperty(value = "formats", required = true) final Formats formats, @JsonProperty(value = "windowInfo") final Optional windowInfo, - @JsonProperty(value = "orReplace", defaultValue = "false") final Optional orReplace + @JsonProperty(value = "orReplace", defaultValue = "false") final Optional orReplace, + @JsonProperty(value = "isSource", defaultValue = "false") final Optional isSource ) { super( sourceName, @@ -51,6 +55,47 @@ public CreateTableCommand( if (schema.key().isEmpty()) { throw new UnsupportedOperationException("Tables require key columns"); } + + this.isSource = isSource; + } + + // This can be in CreateSourceCommand, but it fails deserializing the JSON property when + // loading a CreateStreamCommand because it is not supported there yet. We should move this + // source variable and method to the CreateSourceCommand after supporting source streams. + // Also, this getIsSource() is required so that serialized QTT historic plans do not fail + // that the 'isSource' field is not present. + public Optional getIsSource() { + return isSource; + } + + @JsonIgnore + public boolean isSource() { + return isSource.orElse(false); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final CreateTableCommand that = (CreateTableCommand) o; + return super.equals(that) + && isSource() == that.isSource(); + } + + @Override + public int hashCode() { + return Objects.hash( + getSourceName(), + getSchema(), + getTimestampColumn(), + getTopicName(), + getFormats(), + getWindowInfo(), + getIsSource()); } @Override diff --git a/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/model/DataSource.java b/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/model/DataSource.java index 6311ccdad60c..71f010bdc5d2 100644 --- a/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/model/DataSource.java +++ b/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/model/DataSource.java @@ -103,4 +103,9 @@ public String getKsqlType() { * @return a new DataSource object with all attributes the same as this, but with a new schema */ DataSource with(String sql, LogicalSchema schema); + + /** + * @return returns true if this source is read-only + */ + boolean isSource(); } diff --git a/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/model/KsqlStream.java b/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/model/KsqlStream.java index 6e66a4c8bc11..34c8a427409d 100644 --- a/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/model/KsqlStream.java +++ b/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/model/KsqlStream.java @@ -40,7 +40,8 @@ public KsqlStream( timestampExtractionPolicy, DataSourceType.KSTREAM, isKsqlSink, - ksqlTopic + ksqlTopic, + false ); } diff --git a/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/model/KsqlTable.java b/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/model/KsqlTable.java index cb136d72a3d7..1915ab199c64 100644 --- a/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/model/KsqlTable.java +++ b/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/model/KsqlTable.java @@ -24,14 +24,14 @@ @Immutable public class KsqlTable extends StructuredDataSource { - public KsqlTable( final String sqlExpression, final SourceName datasourceName, final LogicalSchema schema, final Optional timestampExtractionPolicy, final boolean isKsqlSink, - final KsqlTopic ksqlTopic + final KsqlTopic ksqlTopic, + final boolean isSourceTable ) { super( sqlExpression, @@ -40,7 +40,8 @@ public KsqlTable( timestampExtractionPolicy, DataSourceType.KTABLE, isKsqlSink, - ksqlTopic + ksqlTopic, + isSourceTable ); } @@ -52,7 +53,8 @@ public DataSource with(final String sql, final LogicalSchema schema) { schema, getTimestampColumn(), isCasTarget(), - getKsqlTopic() + getKsqlTopic(), + isSource() ); } } diff --git a/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/model/StructuredDataSource.java b/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/model/StructuredDataSource.java index 03e572c25734..e11f2afcaa08 100644 --- a/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/model/StructuredDataSource.java +++ b/ksqldb-metastore/src/main/java/io/confluent/ksql/metastore/model/StructuredDataSource.java @@ -47,6 +47,7 @@ abstract class StructuredDataSource implements DataSource { private final KsqlTopic ksqlTopic; private final String sqlExpression; private final boolean casTarget; + private final boolean isSource; private static final ImmutableList> PROPERTIES = ImmutableList.of( new Property<>("name", DataSource::getName), @@ -64,7 +65,8 @@ abstract class StructuredDataSource implements DataSource { final Optional tsExtractionPolicy, final DataSourceType dataSourceType, final boolean casTarget, - final KsqlTopic ksqlTopic + final KsqlTopic ksqlTopic, + final boolean isSource ) { this.sqlExpression = requireNonNull(sqlExpression, "sqlExpression"); this.dataSourceName = requireNonNull(dataSourceName, "dataSourceName"); @@ -73,6 +75,7 @@ abstract class StructuredDataSource implements DataSource { this.dataSourceType = requireNonNull(dataSourceType, "dataSourceType"); this.ksqlTopic = requireNonNull(ksqlTopic, "ksqlTopic"); this.casTarget = casTarget; + this.isSource = isSource; if (schema.valueContainsAny(SystemColumns.systemColumnNames())) { throw new IllegalArgumentException("Schema contains system columns in value schema"); @@ -126,6 +129,11 @@ public String getSqlExpression() { return sqlExpression; } + @Override + public boolean isSource() { + return isSource; + } + @Override public String toString() { return getClass().getSimpleName() + " name:" + getName(); diff --git a/ksqldb-metastore/src/test/java/io/confluent/ksql/metastore/model/StructuredDataSourceTest.java b/ksqldb-metastore/src/test/java/io/confluent/ksql/metastore/model/StructuredDataSourceTest.java index 45a592643fe5..3912d23d92a1 100644 --- a/ksqldb-metastore/src/test/java/io/confluent/ksql/metastore/model/StructuredDataSourceTest.java +++ b/ksqldb-metastore/src/test/java/io/confluent/ksql/metastore/model/StructuredDataSourceTest.java @@ -211,7 +211,8 @@ public void shouldEnforceSameType() { SOME_SCHEMA, Optional.empty(), true, - topic + topic, + false ); // When: @@ -369,7 +370,8 @@ private TestStructuredDataSource( Optional.empty(), DataSourceType.KSTREAM, false, - topic + topic, + false ); } diff --git a/ksqldb-metastore/src/test/java/io/confluent/ksql/util/MetaStoreFixture.java b/ksqldb-metastore/src/test/java/io/confluent/ksql/util/MetaStoreFixture.java index fcc74e567e73..75df78465f97 100644 --- a/ksqldb-metastore/src/test/java/io/confluent/ksql/util/MetaStoreFixture.java +++ b/ksqldb-metastore/src/test/java/io/confluent/ksql/util/MetaStoreFixture.java @@ -116,7 +116,8 @@ public static MutableMetaStore getNewMetaStore( test2Schema, Optional.empty(), false, - ksqlTopic2 + ksqlTopic2, + false ); metaStore.putSource(ksqlTable, false); @@ -191,7 +192,8 @@ public static MutableMetaStore getNewMetaStore( testTable3, Optional.empty(), false, - ksqlTopic3 + ksqlTopic3, + false ); metaStore.putSource(ksqlTable3, false); @@ -294,7 +296,8 @@ public static MutableMetaStore getNewMetaStore( testTable5, Optional.empty(), false, - ksqlTopic5 + ksqlTopic5, + false ); metaStore.putSource(ksqlTable5, false); diff --git a/ksqldb-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 b/ksqldb-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 index 7f0d71971cd1..4211af1cb77f 100644 --- a/ksqldb-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 +++ b/ksqldb-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 @@ -72,7 +72,7 @@ statement (WITH tableProperties)? #createStream | CREATE (OR REPLACE)? STREAM (IF NOT EXISTS)? sourceName (WITH tableProperties)? AS query #createStreamAs - | CREATE (OR REPLACE)? TABLE (IF NOT EXISTS)? sourceName + | CREATE (OR REPLACE)? (SOURCE)? TABLE (IF NOT EXISTS)? sourceName (tableElements)? (WITH tableProperties)? #createTable | CREATE (OR REPLACE)? TABLE (IF NOT EXISTS)? sourceName diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java index fcda758e5691..d8c7108267a0 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java @@ -287,7 +287,8 @@ public Node visitCreateTable(final SqlBaseParser.CreateTableContext context) { TableElements.of(elements), context.REPLACE() != null, context.EXISTS() != null, - CreateSourceProperties.from(properties) + CreateSourceProperties.from(properties), + context.SOURCE() != null ); } @@ -1438,7 +1439,8 @@ public Node visitAssertTable(final AssertTableContext context) { TableElements.of(elements), false, false, - CreateSourceProperties.from(properties) + CreateSourceProperties.from(properties), + false ); return new AssertTable(getLocation(context), createTable); diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java index 16c9f20017b4..a66ab7bbf964 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java @@ -607,6 +607,10 @@ private void formatCreate(final CreateSource node, final String type) { builder.append("OR REPLACE "); } + if (node.isSource()) { + builder.append("SOURCE "); + } + builder.append(type); builder.append(" "); diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/CreateSource.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/CreateSource.java index 2a5d74a11c3a..f14c7aeb028c 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/CreateSource.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/CreateSource.java @@ -32,6 +32,7 @@ public abstract class CreateSource extends Statement { private final boolean notExists; private final CreateSourceProperties properties; private final boolean orReplace; + private final boolean isSource; CreateSource( final Optional location, @@ -39,7 +40,8 @@ public abstract class CreateSource extends Statement { final TableElements elements, final boolean orReplace, final boolean notExists, - final CreateSourceProperties properties + final CreateSourceProperties properties, + final boolean isSource ) { super(location); this.name = requireNonNull(name, "name"); @@ -47,6 +49,7 @@ public abstract class CreateSource extends Statement { this.orReplace = orReplace; this.notExists = notExists; this.properties = requireNonNull(properties, "properties"); + this.isSource = isSource; } public CreateSourceProperties getProperties() { @@ -69,11 +72,15 @@ public boolean isNotExists() { return notExists; } + public boolean isSource() { + return isSource; + } + public abstract CreateSource copyWith(TableElements elements, CreateSourceProperties properties); @Override public int hashCode() { - return Objects.hash(name, elements, orReplace, notExists, properties); + return Objects.hash(name, elements, orReplace, notExists, properties, isSource); } @Override @@ -89,6 +96,7 @@ public boolean equals(final Object o) { && orReplace == that.orReplace && Objects.equals(name, that.name) && Objects.equals(elements, that.elements) - && Objects.equals(properties, that.properties); + && Objects.equals(properties, that.properties) + && isSource == that.isSource; } } diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/CreateStream.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/CreateStream.java index e9d7df9466a4..5f7c4a14dc58 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/CreateStream.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/CreateStream.java @@ -46,7 +46,7 @@ public CreateStream( final boolean notExists, final CreateSourceProperties properties ) { - super(location, name, elements, orReplace, notExists, properties); + super(location, name, elements, orReplace, notExists, properties, false); throwOnPrimaryKeys(elements); } diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/CreateTable.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/CreateTable.java index c8e97b434fe7..9f81e5139f85 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/CreateTable.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/CreateTable.java @@ -27,15 +27,15 @@ @Immutable public class CreateTable extends CreateSource implements ExecutableDdlStatement { - public CreateTable( final SourceName name, final TableElements elements, final boolean orReplace, final boolean notExists, - final CreateSourceProperties properties + final CreateSourceProperties properties, + final boolean isSource ) { - this(Optional.empty(), name, elements, orReplace, notExists, properties); + this(Optional.empty(), name, elements, orReplace, notExists, properties, isSource); } public CreateTable( @@ -44,9 +44,10 @@ public CreateTable( final TableElements elements, final boolean orReplace, final boolean notExists, - final CreateSourceProperties properties + final CreateSourceProperties properties, + final boolean isSource ) { - super(location, name, elements, orReplace, notExists, properties); + super(location, name, elements, orReplace, notExists, properties, isSource); throwOnNonPrimaryKeys(elements); } @@ -62,7 +63,8 @@ public CreateSource copyWith( elements, isOrReplace(), isNotExists(), - properties); + properties, + isSource()); } @Override @@ -89,6 +91,7 @@ public String toString() { .add("orReplace", isOrReplace()) .add("notExists", isNotExists()) .add("properties", getProperties()) + .add("isSource", isSource()) .toString(); } diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/AstBuilderTest.java b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/AstBuilderTest.java index 04010af9dcff..d33ee92bb71e 100644 --- a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/AstBuilderTest.java +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/AstBuilderTest.java @@ -44,6 +44,7 @@ import io.confluent.ksql.parser.exception.ParseFailedException; import io.confluent.ksql.parser.tree.AliasedRelation; import io.confluent.ksql.parser.tree.AllColumns; +import io.confluent.ksql.parser.tree.CreateTable; import io.confluent.ksql.parser.tree.Explain; import io.confluent.ksql.parser.tree.Join; import io.confluent.ksql.parser.tree.Query; @@ -603,6 +604,32 @@ public void shouldSupportExplicitEmitFinalOnBareQuery() { assertThat(result.getRefinement().get().getOutputRefinement(), is(OutputRefinement.FINAL)); } + @Test + public void shouldCreateSourceTable() { + // Given: + final SingleStatementContext stmt = + givenQuery("CREATE SOURCE TABLE X WITH (kafka_topic='X');"); + + // When: + final CreateTable result = (CreateTable) builder.buildStatement(stmt); + + // Then: + assertThat(result.isSource(), is(true)); + } + + @Test + public void shouldCreateNormalTable() { + // Given: + final SingleStatementContext stmt = + givenQuery("CREATE TABLE X WITH (kafka_topic='X');"); + + // When: + final CreateTable result = (CreateTable) builder.buildStatement(stmt); + + // Then: + assertThat(result.isSource(), is(false)); + } + @Test public void shouldDefaultToEmitChangesForCsas() { // Given: diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java index 832b9abe80e9..963d774bd298 100644 --- a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java @@ -32,7 +32,6 @@ import static org.mockito.Mockito.mock; import com.google.common.collect.Iterables; -import io.confluent.ksql.execution.ddl.commands.KsqlTopic; import io.confluent.ksql.execution.expression.tree.ArithmeticUnaryExpression; import io.confluent.ksql.execution.expression.tree.ArithmeticUnaryExpression.Sign; import io.confluent.ksql.execution.expression.tree.ComparisonExpression; @@ -48,8 +47,6 @@ import io.confluent.ksql.execution.windows.WindowTimeClause; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.metastore.MutableMetaStore; -import io.confluent.ksql.metastore.model.KsqlStream; -import io.confluent.ksql.metastore.model.KsqlTable; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.name.SourceName; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; @@ -78,7 +75,6 @@ import io.confluent.ksql.parser.tree.RegisterType; import io.confluent.ksql.parser.tree.SelectItem; import io.confluent.ksql.parser.tree.SetProperty; -import io.confluent.ksql.parser.tree.ShowColumns; import io.confluent.ksql.parser.tree.SingleColumn; import io.confluent.ksql.parser.tree.Statement; import io.confluent.ksql.parser.tree.TableElement; @@ -86,17 +82,11 @@ import io.confluent.ksql.parser.tree.TerminateQuery; import io.confluent.ksql.parser.tree.WithinExpression; import io.confluent.ksql.query.QueryId; -import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.types.SqlArray; import io.confluent.ksql.schema.ksql.types.SqlBaseType; import io.confluent.ksql.schema.ksql.types.SqlStruct; -import io.confluent.ksql.schema.ksql.types.SqlType; import io.confluent.ksql.schema.ksql.types.SqlTypes; -import io.confluent.ksql.serde.FormatFactory; import io.confluent.ksql.serde.FormatInfo; -import io.confluent.ksql.serde.KeyFormat; -import io.confluent.ksql.serde.SerdeFeatures; -import io.confluent.ksql.serde.ValueFormat; import io.confluent.ksql.util.MetaStoreFixture; import java.math.BigDecimal; import java.util.List; @@ -116,74 +106,9 @@ public class KsqlParserTest { private MutableMetaStore metaStore; - private static final SqlType addressSchema = SqlTypes.struct() - .field("NUMBER", SqlTypes.BIGINT) - .field("STREET", SqlTypes.STRING) - .field("CITY", SqlTypes.STRING) - .field("STATE", SqlTypes.STRING) - .field("ZIPCODE", SqlTypes.BIGINT) - .build(); - - private static final SqlType categorySchema = SqlTypes.struct() - .field("ID", SqlTypes.BIGINT) - .field("NAME", SqlTypes.STRING) - .build(); - - private static final SqlType itemInfoSchema = SqlStruct.builder() - .field("ITEMID", SqlTypes.BIGINT) - .field("NAME", SqlTypes.STRING) - .field("CATEGORY", categorySchema) - .build(); - - private static final LogicalSchema ORDERS_SCHEMA = LogicalSchema.builder() - .keyColumn(ColumnName.of("k0"), SqlTypes.STRING) - .valueColumn(ColumnName.of("ORDERTIME"), SqlTypes.BIGINT) - .valueColumn(ColumnName.of("ORDERID"), SqlTypes.BIGINT) - .valueColumn(ColumnName.of("ITEMID"), SqlTypes.STRING) - .valueColumn(ColumnName.of("ITEMINFO"), itemInfoSchema) - .valueColumn(ColumnName.of("ORDERUNITS"), SqlTypes.INTEGER) - .valueColumn(ColumnName.of("ARRAYCOL"), SqlTypes.array(SqlTypes.DOUBLE)) - .valueColumn(ColumnName.of("MAPCOL"), SqlTypes.map(SqlTypes.STRING, SqlTypes.DOUBLE)) - .valueColumn(ColumnName.of("ADDRESS"), addressSchema) - .build(); - @Before public void init() { metaStore = MetaStoreFixture.getNewMetaStore(mock(FunctionRegistry.class)); - - final KsqlTopic ksqlTopicOrders = new KsqlTopic( - "orders_topic", - KeyFormat.nonWindowed(FormatInfo.of(FormatFactory.KAFKA.name()), SerdeFeatures.of()), - ValueFormat.of(FormatInfo.of(FormatFactory.JSON.name()), SerdeFeatures.of()) - ); - - final KsqlStream ksqlStreamOrders = new KsqlStream<>( - "sqlexpression", - SourceName.of("ADDRESS"), - ORDERS_SCHEMA, - Optional.empty(), - false, - ksqlTopicOrders - ); - - metaStore.putSource(ksqlStreamOrders, false); - - final KsqlTopic ksqlTopicItems = new KsqlTopic( - "item_topic", - KeyFormat.nonWindowed(FormatInfo.of(FormatFactory.KAFKA.name()), SerdeFeatures.of()), - ValueFormat.of(FormatInfo.of(FormatFactory.JSON.name()), SerdeFeatures.of()) - ); - - final KsqlTable ksqlTableOrders = new KsqlTable<>( - "sqlexpression", - SourceName.of("ITEMID"), - ORDERS_SCHEMA, - Optional.empty(), - false, - ksqlTopicItems - ); - - metaStore.putSource(ksqlTableOrders, false); } @Test @@ -444,6 +369,17 @@ public void testReservedColumnIdentifers() { assertQuerySucceeds("SELECT ROWKEY as ROWKEY FROM test1 t1;"); } + @Test + public void testCreateSourceTable() { + // When: + final CreateTable stmt = (CreateTable) KsqlParserTestUtil.buildSingleAst( + "CREATE SOURCE TABLE foozball (id VARCHAR PRIMARY KEY) WITH (kafka_topic='foozball', " + + "value_format='json', partitions=1, replicas=-1);", metaStore).getStatement(); + + // Then: + assertThat(stmt.isSource(), is(true)); + } + @Test public void testNegativeInWith() { // When: diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java index ea3d78d6e8d6..62e5a5fc756a 100644 --- a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java @@ -202,7 +202,8 @@ public void setUp() { ITEM_INFO_SCHEMA, Optional.empty(), false, - ksqlTopicItems + ksqlTopicItems, + false ); metaStore.putSource(ksqlTableOrders, false); @@ -213,7 +214,8 @@ public void setUp() { TABLE_SCHEMA, Optional.empty(), false, - ksqlTopicItems + ksqlTopicItems, + false ); metaStore.putSource(ksqlTableTable, false); @@ -278,6 +280,30 @@ public void shouldFormatCreateOrReplaceStreamStatement() { + "WITH (KAFKA_TOPIC='topic_test', VALUE_FORMAT='JSON');")); } + @Test + public void shouldFormatCreateSourceTableStatement() { + // Given: + final CreateSourceProperties props = CreateSourceProperties.from( + new ImmutableMap.Builder() + .putAll(SOME_WITH_PROPS.copyOfOriginalLiterals()) + .build() + ); + final CreateTable createTable = new CreateTable( + TEST, + ELEMENTS_WITH_PRIMARY_KEY, + false, + false, + props, + true); + + // When: + final String sql = SqlFormatter.formatSql(createTable); + + // Then: + assertThat(sql, is("CREATE SOURCE TABLE TEST (`k3` STRING PRIMARY KEY, `Foo` STRING) " + + "WITH (KAFKA_TOPIC='topic_test', VALUE_FORMAT='JSON');")); + } + @Test public void shouldFormatCreateOrReplaceTableStatement() { // Given: @@ -291,7 +317,8 @@ public void shouldFormatCreateOrReplaceTableStatement() { ELEMENTS_WITH_PRIMARY_KEY, true, false, - props); + props, + false); // When: final String sql = SqlFormatter.formatSql(createTable); @@ -316,7 +343,8 @@ public void shouldFormatCreateTableStatementWithExplicitTimestamp() { ELEMENTS_WITH_PRIMARY_KEY, false, false, - props); + props, + false); // When: final String sql = SqlFormatter.formatSql(createTable); @@ -335,7 +363,8 @@ public void shouldFormatCreateTableStatementWithExplicitKey() { ELEMENTS_WITH_PRIMARY_KEY, false, false, - SOME_WITH_PROPS); + SOME_WITH_PROPS, + false); // When: final String sql = SqlFormatter.formatSql(createTable); @@ -353,7 +382,8 @@ public void shouldFormatCreateTableStatementWithImplicitKey() { ELEMENTS_WITHOUT_KEY, false, false, - SOME_WITH_PROPS); + SOME_WITH_PROPS, + false); // When: final String sql = SqlFormatter.formatSql(createTable); diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/CreateTableTest.java b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/CreateTableTest.java index d1b70b0651c8..04564366d4d3 100644 --- a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/CreateTableTest.java +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/tree/CreateTableTest.java @@ -60,25 +60,28 @@ public void shouldImplementHashCodeAndEqualsProperty() { new EqualsTester() .addEqualityGroup( // Note: At the moment location does not take part in equality testing - new CreateTable(SOME_NAME, SOME_ELEMENTS, false, true, SOME_PROPS), - new CreateTable(SOME_NAME, SOME_ELEMENTS, false, true, SOME_PROPS), - new CreateTable(Optional.of(SOME_LOCATION), SOME_NAME, SOME_ELEMENTS, false, true, SOME_PROPS), - new CreateTable(Optional.of(OTHER_LOCATION), SOME_NAME, SOME_ELEMENTS, false, true, SOME_PROPS) + new CreateTable(SOME_NAME, SOME_ELEMENTS, false, true, SOME_PROPS, false), + new CreateTable(SOME_NAME, SOME_ELEMENTS, false, true, SOME_PROPS, false), + new CreateTable(Optional.of(SOME_LOCATION), SOME_NAME, SOME_ELEMENTS, false, true, SOME_PROPS, false), + new CreateTable(Optional.of(OTHER_LOCATION), SOME_NAME, SOME_ELEMENTS, false, true, SOME_PROPS, false) ) .addEqualityGroup( - new CreateTable(SourceName.of("jim"), SOME_ELEMENTS, false, true, SOME_PROPS) + new CreateTable(SourceName.of("jim"), SOME_ELEMENTS, false, true, SOME_PROPS, false) ) .addEqualityGroup( - new CreateTable(SOME_NAME, TableElements.of(), false, true, SOME_PROPS) + new CreateTable(SOME_NAME, TableElements.of(), false, true, SOME_PROPS, false) ) .addEqualityGroup( - new CreateTable(SOME_NAME, SOME_ELEMENTS, false, false, SOME_PROPS) + new CreateTable(SOME_NAME, SOME_ELEMENTS, false, false, SOME_PROPS, false) ) .addEqualityGroup( - new CreateTable(SOME_NAME, SOME_ELEMENTS, true, true, SOME_PROPS) + new CreateTable(SOME_NAME, SOME_ELEMENTS, true, true, SOME_PROPS, true) ) .addEqualityGroup( - new CreateTable(SOME_NAME, SOME_ELEMENTS, false, true, OTHER_PROPS) + new CreateTable(SOME_NAME, SOME_ELEMENTS, false, true, OTHER_PROPS, false) + ) + .addEqualityGroup( + new CreateTable(SOME_NAME, SOME_ELEMENTS, false, false, OTHER_PROPS, true) ) .testEquals(); } @@ -107,7 +110,7 @@ public void shouldThrowOnNonePrimaryKey() { // When: final ParseFailedException e = assertThrows( ParseFailedException.class, - () -> new CreateTable(SOME_NAME, invalidElements, false, false, SOME_PROPS) + () -> new CreateTable(SOME_NAME, invalidElements, false, false, SOME_PROPS, false) ); // Then: diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java index 0ce5fcced0e4..28bf8775fd9a 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java @@ -527,7 +527,8 @@ public void shouldRunCsStatement() { public void shouldRunCtStatement() { // Given: final PreparedStatement ct = PreparedStatement.of("CT", - new CreateTable(SOME_NAME, SOME_ELEMENTS, false, false, JSON_PROPS)); + new CreateTable(SOME_NAME, SOME_ELEMENTS, false, false, JSON_PROPS, + false)); givenQueryFileParsesTo(ct); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/TemporaryEngine.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/TemporaryEngine.java index bdc06919cbe2..0fe01782bfe3 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/TemporaryEngine.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/TemporaryEngine.java @@ -161,7 +161,8 @@ public T givenSource( SCHEMA, Optional.empty(), false, - topic + topic, + false ); break; default: diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/InsertValuesExecutorTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/InsertValuesExecutorTest.java index 5829681d129a..6ab99b0a66e3 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/InsertValuesExecutorTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/InsertValuesExecutorTest.java @@ -1121,7 +1121,8 @@ private void givenDataSourceWithSchema( schema, Optional.empty(), false, - topic + topic, + false ); } else { dataSource = new KsqlStream<>( diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java index d54029aac3e1..4625afa534a0 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java @@ -2640,7 +2640,8 @@ private void givenSource( schema, Optional.empty(), false, - ksqlTopic + ksqlTopic, + false ); break; default: diff --git a/ksqldb-rest-app/src/test/resources/ksql-plan-schema/schema.json b/ksqldb-rest-app/src/test/resources/ksql-plan-schema/schema.json index 486bd0ec2257..3b28c70e9c43 100644 --- a/ksqldb-rest-app/src/test/resources/ksql-plan-schema/schema.json +++ b/ksqldb-rest-app/src/test/resources/ksql-plan-schema/schema.json @@ -181,6 +181,10 @@ "orReplace" : { "type" : "boolean", "default" : false + }, + "isSource" : { + "type" : "boolean", + "default" : false } }, "title" : "createTableV1",