Skip to content

Commit

Permalink
feat: add CREATE SOURCE TABLE syntax and metadata info (#7945)
Browse files Browse the repository at this point in the history
  • Loading branch information
spena committed Aug 11, 2021
1 parent 2e1d635 commit 70565f2
Show file tree
Hide file tree
Showing 29 changed files with 330 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ public CreateStreamCommand createStreamCommand(
);
}

// This method is called by CREATE_AS statements
public CreateTableCommand createTableCommand(final KsqlStructuredDataOutputNode outputNode) {
return new CreateTableCommand(
outputNode.getSinkName().get(),
Expand All @@ -142,10 +143,12 @@ public CreateTableCommand createTableCommand(final KsqlStructuredDataOutputNode
outputNode.getKsqlTopic().getKafkaTopicName(),
Formats.from(outputNode.getKsqlTopic()),
outputNode.getKsqlTopic().getKeyFormat().getWindowInfo(),
Optional.of(outputNode.getOrReplace())
Optional.of(outputNode.getOrReplace()),
Optional.of(false)
);
}

// This method is called by simple CREATE statements
public CreateTableCommand createTableCommand(
final CreateTable statement,
final KsqlConfig ksqlConfig
Expand Down Expand Up @@ -188,7 +191,8 @@ public CreateTableCommand createTableCommand(
topicName,
buildFormats(statement.getName(), schema, props, ksqlConfig),
getWindowInfo(props),
Optional.of(statement.isOrReplace())
Optional.of(statement.isOrReplace()),
Optional.of(statement.isSource())
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,15 @@ public DdlCommandResult executeCreateTable(final CreateTableCommand createTable)
+ "already exists.",
sourceName, sourceType.toLowerCase()));
}

final KsqlTable<?> ksqlTable = new KsqlTable<>(
sql,
createTable.getSourceName(),
createTable.getSchema(),
createTable.getTimestampColumn(),
withQuery,
getKsqlTopic(createTable)
getKsqlTopic(createTable),
createTable.isSource()
);
metaStore.putSource(ksqlTable, createTable.isOrReplace());
metaStore.addSourceReferences(ksqlTable.getName(), withQuerySources);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,25 @@ public void shouldCreateCommandForCreateTable() {
TableElements.of(
tableElement(Namespace.VALUE, "COL1", new Type(SqlTypes.BIGINT)),
tableElement(Namespace.VALUE, "COL2", new Type(SqlTypes.STRING))),
false, true, withProperties);
false, true, withProperties, false);

// When:
final DdlCommand result = commandFactories
.create(sqlExpression, statement, SessionConfig.of(ksqlConfig, emptyMap()));

// Then:
assertThat(result, is(createTableCommand));
verify(createSourceFactory).createTableCommand(statement, ksqlConfig);
}

@Test
public void shouldCreateCommandForCreateSourceTable() {
// Given:
final CreateTable statement = new CreateTable(SOME_NAME,
TableElements.of(
tableElement(Namespace.VALUE, "COL1", new Type(SqlTypes.BIGINT)),
tableElement(Namespace.VALUE, "COL2", new Type(SqlTypes.STRING))),
false, true, withProperties, true);

// When:
final DdlCommand result = commandFactories
Expand All @@ -216,7 +234,7 @@ public void shouldCreateCommandForCreateTableWithOverriddenProperties() {
TableElements.of(
tableElement(Namespace.VALUE, "COL1", new Type(SqlTypes.BIGINT)),
tableElement(Namespace.VALUE, "COL2", new Type(SqlTypes.STRING))),
false, true, withProperties);
false, true, withProperties, false);

// When:
commandFactories.create(sqlExpression, statement, SessionConfig.of(ksqlConfig, OVERRIDES));
Expand Down Expand Up @@ -354,7 +372,8 @@ public void shouldCreateTableCommandWithSingleValueWrappingFromOverridesNotConfi
);

final DdlStatement statement =
new CreateTable(SOME_NAME, ELEMENTS_WITH_PK, false, true, withProperties);
new CreateTable(SOME_NAME, ELEMENTS_WITH_PK,
false, true, withProperties, false);

// When:
final DdlCommand cmd = commandFactories
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ public void shouldCreateCommandForCreateTable() {
TableElements.of(
tableElement(PRIMARY_KEY, "COL1", new Type(BIGINT)),
tableElement(VALUE, "COL2", new Type(SqlTypes.STRING))),
false, true, withProperties);
false, true, withProperties, false);

// When:
final CreateTableCommand result = createSourceFactory
Expand All @@ -302,6 +302,26 @@ public void shouldCreateCommandForCreateTable() {
// Then:
assertThat(result.getSourceName(), is(SOME_NAME));
assertThat(result.getTopicName(), is(TOPIC_NAME));
assertThat(result.isSource(), is(false));
}

@Test
public void shouldCreateCommandForCreateSourceTable() {
// Given:
final CreateTable ddlStatement = new CreateTable(SOME_NAME,
TableElements.of(
tableElement(PRIMARY_KEY, "COL1", new Type(BIGINT)),
tableElement(VALUE, "COL2", new Type(SqlTypes.STRING))),
false, true, withProperties, true);

// When:
final CreateTableCommand result = createSourceFactory
.createTableCommand(ddlStatement, ksqlConfig);

// Then:
assertThat(result.getSourceName(), is(SOME_NAME));
assertThat(result.getTopicName(), is(TOPIC_NAME));
assertThat(result.isSource(), is(true));
}

@Test
Expand Down Expand Up @@ -379,7 +399,8 @@ public void shouldCreateTableCommandWithSingleValueWrappingFromPropertiesNotConf
givenProperty(CommonCreateConfigs.WRAP_SINGLE_VALUE, new BooleanLiteral("false"));

final CreateTable statement =
new CreateTable(SOME_NAME, TABLE_ELEMENTS_1_VALUE, false, true, withProperties);
new CreateTable(SOME_NAME, TABLE_ELEMENTS_1_VALUE,
false, true, withProperties, false);

// When:
final CreateTableCommand cmd = createSourceFactory
Expand All @@ -400,7 +421,8 @@ public void shouldCreateTableCommandWithSingleValueWrappingFromConfig() {
));

final CreateTable statement =
new CreateTable(SOME_NAME, TABLE_ELEMENTS_1_VALUE, false, true, withProperties);
new CreateTable(SOME_NAME, TABLE_ELEMENTS_1_VALUE,
false, true, withProperties, false);

// When:
final CreateTableCommand cmd = createSourceFactory
Expand All @@ -415,7 +437,8 @@ public void shouldCreateTableCommandWithSingleValueWrappingFromConfig() {
public void shouldCreateTableCommandWithSingleValueWrappingFromDefaultConfig() {
// Given:
final CreateTable statement =
new CreateTable(SOME_NAME, TABLE_ELEMENTS_1_VALUE, false, true, withProperties);
new CreateTable(SOME_NAME, TABLE_ELEMENTS_1_VALUE,
false, true, withProperties, false);

// When:
final CreateTableCommand cmd = createSourceFactory
Expand Down Expand Up @@ -446,7 +469,8 @@ public void shouldThrowOnNoElementsInCreateStream() {
public void shouldThrowOnNoElementsInCreateTable() {
// Given:
final CreateTable statement
= new CreateTable(SOME_NAME, TableElements.of(), false, true, withProperties);
= new CreateTable(SOME_NAME, TableElements.of(),
false, true, withProperties, false);

// When:
final Exception e = assertThrows(
Expand Down Expand Up @@ -475,7 +499,8 @@ public void shouldNotThrowWhenThereAreElementsInCreateStream() {
public void shouldNotThrowWhenThereAreElementsInCreateTable() {
// Given:
final CreateTable statement =
new CreateTable(SOME_NAME, TABLE_ELEMENTS_1_VALUE, false, true, withProperties);
new CreateTable(SOME_NAME, TABLE_ELEMENTS_1_VALUE,
false, true, withProperties, false);

// When:
createSourceFactory.createTableCommand(statement, ksqlConfig);
Expand Down Expand Up @@ -609,7 +634,8 @@ public void shouldBuildTimestampColumnForTable() {
new StringLiteral(quote(ELEMENT2.getName().text()))
);
final CreateTable statement =
new CreateTable(SOME_NAME, TABLE_ELEMENTS, false, true, withProperties);
new CreateTable(SOME_NAME, TABLE_ELEMENTS,
false, true, withProperties, false);

// When:
final CreateTableCommand cmd = createSourceFactory.createTableCommand(
Expand Down Expand Up @@ -731,7 +757,7 @@ public void shouldCreateValueSerdeToValidateValueFormatCanHandleValueSchema() {
// Given:
givenCommandFactoriesWithMocks();
final CreateTable statement = new CreateTable(SOME_NAME, TABLE_ELEMENTS, false, true,
withProperties);
withProperties, false);

when(valueSerdeFactory.create(
FormatInfo.of(JSON.name()),
Expand Down Expand Up @@ -1010,7 +1036,8 @@ public void shouldThrowIfTableIsMissingPrimaryKey() {
final TableElements noKey = TableElements.of(ELEMENT1);

final CreateTable statement =
new CreateTable(SOME_NAME, noKey, false, true, withProperties);
new CreateTable(SOME_NAME, noKey,
false, true, withProperties, false);

// When:
final Exception e = assertThrows(
Expand Down Expand Up @@ -1060,7 +1087,7 @@ public void shouldNotThrowOnCreateTableIfNotExistsIsSet() {
TableElements.of(
tableElement(PRIMARY_KEY, "COL1", new Type(BIGINT)),
tableElement(VALUE, "COL2", new Type(SqlTypes.STRING))),
false, true, withProperties);
false, true, withProperties, false);

// When:
final CreateTableCommand result = createSourceFactory
Expand All @@ -1077,7 +1104,7 @@ public void shouldThrowIfTableExists() {
TableElements.of(
tableElement(PRIMARY_KEY, "COL1", new Type(BIGINT)),
tableElement(VALUE, "COL2", new Type(SqlTypes.STRING))),
false, false, withProperties);
false, false, withProperties, false);

// When:
final Exception e = assertThrows(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.confluent.ksql.metastore.MutableMetaStore;
import io.confluent.ksql.metastore.model.DataSource.DataSourceType;
import io.confluent.ksql.metastore.model.KsqlStream;
import io.confluent.ksql.metastore.model.KsqlTable;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.schema.ksql.Column;
Expand Down Expand Up @@ -257,12 +258,49 @@ public void shouldAddSinkTable() {
assertThat(metaStore.getSource(TABLE_NAME).isCasTarget(), is(true));
}

@Test
public void shouldAddNormalTableWhenNoTypeIsSpecified() {
// Given:
final CreateTableCommand cmd = buildCreateTable(
SourceName.of("t1"),
false,
null
);

// When:
cmdExec.execute(SQL_TEXT, cmd, true, NO_QUERY_SOURCES);

// Then:
final KsqlTable ksqlTable = (KsqlTable) metaStore.getSource(SourceName.of("t1"));
assertThat(ksqlTable.isSource(), is(false));
}

@Test
public void shouldAddSourceTable() {
// Given:
final CreateTableCommand cmd = buildCreateTable(
SourceName.of("t1"),
false,
true
);

// When:
cmdExec.execute(SQL_TEXT, cmd, true, NO_QUERY_SOURCES);

// Then:
final KsqlTable ksqlTable = (KsqlTable) metaStore.getSource(SourceName.of("t1"));
assertThat(ksqlTable.isSource(), is(true));
}

@Test
public void shouldThrowOnDropTableWhenConstraintExist() {
// Given:
final CreateTableCommand table1 = buildCreateTable(SourceName.of("t1"), false);
final CreateTableCommand table2 = buildCreateTable(SourceName.of("t2"), false);
final CreateTableCommand table3 = buildCreateTable(SourceName.of("t3"), false);
final CreateTableCommand table1 = buildCreateTable(SourceName.of("t1"),
false, false);
final CreateTableCommand table2 = buildCreateTable(SourceName.of("t2"),
false, false);
final CreateTableCommand table3 = buildCreateTable(SourceName.of("t3"),
false, false);
cmdExec.execute(SQL_TEXT, table1, true, Collections.emptySet());
cmdExec.execute(SQL_TEXT, table2, true, Collections.singleton(SourceName.of("t1")));
cmdExec.execute(SQL_TEXT, table3, true, Collections.singleton(SourceName.of("t1")));
Expand Down Expand Up @@ -470,7 +508,7 @@ public void shouldWarnAddDuplicateTableWithoutReplace() {
cmdExec.execute(SQL_TEXT, createTable, false, NO_QUERY_SOURCES);

// When:
givenCreateTable(false);
givenCreateTable();
final DdlCommandResult result =cmdExec.execute(SQL_TEXT, createTable,
false, NO_QUERY_SOURCES);

Expand Down Expand Up @@ -546,21 +584,19 @@ private void givenCreateWindowedTable() {
SerdeFeatures.of()
),
Optional.of(windowInfo),
Optional.of(false),
Optional.of(false)
);
}

private void givenCreateTable() {
createTable = buildCreateTable(TABLE_NAME, false);
}

private void givenCreateTable(final boolean allowReplace) {
createTable = buildCreateTable(TABLE_NAME, allowReplace);
createTable = buildCreateTable(TABLE_NAME, false, false);
}

private CreateTableCommand buildCreateTable(
final SourceName sourceName,
final boolean allowReplace
final boolean allowReplace,
final Boolean isSource
) {
return new CreateTableCommand(
sourceName,
Expand All @@ -574,7 +610,8 @@ private CreateTableCommand buildCreateTable(
SerdeFeatures.of()
),
Optional.empty(),
Optional.of(allowReplace)
Optional.of(allowReplace),
Optional.ofNullable(isSource)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,8 @@ public void shouldRewriteCreateTable() {
TableElements.of(tableElement1, tableElement2),
false,
false,
sourceProperties
sourceProperties,
false
);
when(mockRewriter.apply(tableElement1, context)).thenReturn(rewrittenTableElement1);
when(mockRewriter.apply(tableElement2, context)).thenReturn(rewrittenTableElement2);
Expand All @@ -677,7 +678,8 @@ public void shouldRewriteCreateTable() {
TableElements.of(rewrittenTableElement1, rewrittenTableElement2),
false,
false,
sourceProperties
sourceProperties,
false
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,8 @@ public void shouldBuildSchemaKTableWhenKTableSource() {
"topic2",
KeyFormat.nonWindowed(FormatInfo.of(FormatFactory.KAFKA.name()), SerdeFeatures.of()),
ValueFormat.of(FormatInfo.of(FormatFactory.JSON.name()), SerdeFeatures.of())
)
),
false
);

node = new DataSourceNode(
Expand Down

0 comments on commit 70565f2

Please sign in to comment.