Skip to content

Commit

Permalink
fix: throw error message on create source with no value columns (#6680)
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra committed Nov 30, 2020
1 parent 57b0c91 commit 14465a2
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,10 @@ public void shouldNotThrowOnRowKeyKeyColumn() {
// Given:
final CreateStream statement = new CreateStream(
SOME_NAME,
TableElements.of(tableElement(KEY, "k", new Type(SqlTypes.STRING))),
TableElements.of(
tableElement(KEY, "k", new Type(SqlTypes.STRING)),
tableElement(VALUE, "v", new Type(SqlTypes.INTEGER))
),
false,
true,
withProperties
Expand All @@ -945,7 +948,10 @@ public void shouldAllowNonStringKeyColumn() {
// Given:
final CreateStream statement = new CreateStream(
SOME_NAME,
TableElements.of(tableElement(KEY, "k", new Type(SqlTypes.INTEGER))),
TableElements.of(
tableElement(KEY, "k", new Type(SqlTypes.INTEGER)),
tableElement(VALUE, "v", new Type(SqlTypes.INTEGER))
),
false,
true,
withProperties
Expand All @@ -966,7 +972,10 @@ public void shouldNotThrowOnKeyColumnThatIsNotCalledRowKey() {
// Given:
final CreateStream statement = new CreateStream(
SOME_NAME,
TableElements.of(tableElement(KEY, "someKey", new Type(SqlTypes.STRING))),
TableElements.of(
tableElement(KEY, "someKey", new Type(SqlTypes.STRING)),
tableElement(VALUE, "someVal", new Type(SqlTypes.INTEGER))
),
false,
true,
withProperties
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM INPUT (ID STRING KEY) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');",
"statementText" : "CREATE STREAM INPUT (ID STRING KEY, FOO STRING) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "INPUT",
"schema" : "`ID` STRING KEY",
"schema" : "`ID` STRING KEY, `FOO` STRING",
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
Expand Down Expand Up @@ -60,7 +60,7 @@
"format" : "JSON"
}
},
"sourceSchema" : "`ID` STRING KEY"
"sourceSchema" : "`ID` STRING KEY, `FOO` STRING"
},
"keyColumnNames" : [ "ID" ],
"selectExpressions" : [ "LPAD('foo', 7, 'Bar') AS RESULT" ]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"path" : "query-validation-tests/string-lpad-rpad.json",
"schemas" : {
"CSAS_OUTPUT_0.KsqlTopic.Source" : {
"schema" : "`ID` STRING KEY",
"schema" : "`ID` STRING KEY, `FOO` STRING",
"keyFormat" : {
"format" : "KAFKA"
},
Expand Down Expand Up @@ -55,12 +55,12 @@
"replicas" : 1,
"numPartitions" : 4
} ],
"statements" : [ "CREATE STREAM INPUT (id STRING KEY) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT id, lpad('foo', 7, 'Bar') as result FROM INPUT;" ],
"statements" : [ "CREATE STREAM INPUT (id STRING KEY, foo STRING) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT id, lpad('foo', 7, 'Bar') as result FROM INPUT;" ],
"post" : {
"sources" : [ {
"name" : "INPUT",
"type" : "STREAM",
"schema" : "`ID` STRING KEY",
"schema" : "`ID` STRING KEY, `FOO` STRING",
"keyFormat" : {
"format" : "KAFKA"
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM INPUT (ID STRING KEY) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');",
"statementText" : "CREATE STREAM INPUT (ID STRING KEY, FOO STRING) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "INPUT",
"schema" : "`ID` STRING KEY",
"schema" : "`ID` STRING KEY, `FOO` STRING",
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
Expand Down Expand Up @@ -60,7 +60,7 @@
"format" : "JSON"
}
},
"sourceSchema" : "`ID` STRING KEY"
"sourceSchema" : "`ID` STRING KEY, `FOO` STRING"
},
"keyColumnNames" : [ "ID" ],
"selectExpressions" : [ "RPAD('foo', 7, 'Bar') AS RESULT" ]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"path" : "query-validation-tests/string-lpad-rpad.json",
"schemas" : {
"CSAS_OUTPUT_0.KsqlTopic.Source" : {
"schema" : "`ID` STRING KEY",
"schema" : "`ID` STRING KEY, `FOO` STRING",
"keyFormat" : {
"format" : "KAFKA"
},
Expand Down Expand Up @@ -55,12 +55,12 @@
"replicas" : 1,
"numPartitions" : 4
} ],
"statements" : [ "CREATE STREAM INPUT (id STRING KEY) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT id, rpad('foo', 7, 'Bar') as result FROM INPUT;" ],
"statements" : [ "CREATE STREAM INPUT (id STRING KEY, foo STRING) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT id, rpad('foo', 7, 'Bar') as result FROM INPUT;" ],
"post" : {
"sources" : [ {
"name" : "INPUT",
"type" : "STREAM",
"schema" : "`ID` STRING KEY",
"schema" : "`ID` STRING KEY, `FOO` STRING",
"keyFormat" : {
"format" : "KAFKA"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,23 @@
"message": "The statement does not define any columns."
}
},
{
"name": "validate without value elements FAILS",
"statements": [
"CREATE STREAM INPUT (ID INT KEY) WITH (kafka_topic='input', value_format='JSON');"
],
"topics": [
{
"name": "input",
"valueSchema": {"name": "blah", "type": "record", "fields": [{"name": "c1", "type": "int"}]},
"valueFormat": "JSON"
}
],
"expectedException": {
"type": "io.confluent.ksql.util.KsqlException",
"message": "ksqlDB does not support sources with no value columns: https://github.com/confluentinc/ksql/issues/5564"
}
},
{
"name": "validate without elements OK - AVRO",
"statements": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
"name": "LPad with literal args",
"format": ["JSON"],
"statements": [
"CREATE STREAM INPUT (id STRING KEY) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM INPUT (id STRING KEY, foo STRING) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT id, lpad('foo', 7, 'Bar') as result FROM INPUT;"
],
"inputs": [
Expand Down Expand Up @@ -83,7 +83,7 @@
"name": "RPad with literal args",
"format": ["JSON"],
"statements": [
"CREATE STREAM INPUT (id STRING KEY) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM INPUT (id STRING KEY, foo STRING) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT id, rpad('foo', 7, 'Bar') as result FROM INPUT;"
],
"inputs": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,13 @@ public LogicalSchema toLogicalSchema() {
}
}

return builder.build();
final LogicalSchema schema = builder.build();
if (schema.value().isEmpty()) {
throw new KsqlException("ksqlDB does not support sources with no value columns: "
+ "https://github.com/confluentinc/ksql/issues/5564");
}

return schema;
}

private TableElements(final ImmutableList<TableElement> elements) {
Expand Down

0 comments on commit 14465a2

Please sign in to comment.