diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/PlanNode.java b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/PlanNode.java index 7b3d09518a7a..cb1f7f5dc64e 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/PlanNode.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/PlanNode.java @@ -103,7 +103,9 @@ public Stream resolveSelectStar( final Optional sourceName ) { return getSources().stream() - .filter(s -> !sourceName.isPresent() || sourceName.equals(s.getSourceName())) + .filter(s -> !sourceName.isPresent() + || !s.getSourceName().isPresent() + || sourceName.equals(s.getSourceName())) .flatMap(s -> s.resolveSelectStar(sourceName)); } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/planner/plan/PlanNodeTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/planner/plan/PlanNodeTest.java index a307f9511ac2..2b0f92642e30 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/planner/plan/PlanNodeTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/planner/plan/PlanNodeTest.java @@ -99,8 +99,7 @@ public void shouldResolveUnaliasedSelectStarByCallingAllSources() { @Test public void shouldResolveAliasedSelectStarByCallingOnlyCorrectParent() { // When: - final Stream result = planNode.resolveSelectStar(Optional.of(SOURCE_2_NAME) - ); + final Stream result = planNode.resolveSelectStar(Optional.of(SOURCE_2_NAME)); // Then: final List columns = result.collect(Collectors.toList()); @@ -110,6 +109,22 @@ public void shouldResolveAliasedSelectStarByCallingOnlyCorrectParent() { verify(source2).resolveSelectStar(Optional.of(SOURCE_2_NAME)); } + @Test + public void shouldResolveAliasedSelectStarByCallingParentIfParentHasNoSourceName() { + // Given: + when(source1.getSourceName()).thenReturn(Optional.empty()); + + // When: + final Stream result = planNode.resolveSelectStar(Optional.of(SOURCE_2_NAME)); + + // Then: + final List columns = result.collect(Collectors.toList()); + assertThat(columns, contains(COL0, COL1, COL2, COL3)); + + verify(source1).resolveSelectStar(Optional.of(SOURCE_2_NAME)); + verify(source2).resolveSelectStar(Optional.of(SOURCE_2_NAME)); + } + private final class TestPlanNode extends PlanNode { protected TestPlanNode( diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/joins_-_matching_columns_in_both_sides_=_select_left.__and_right.__with_WHERE/6.0.0_1593039433111/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/joins_-_matching_columns_in_both_sides_=_select_left.__and_right.__with_WHERE/6.0.0_1593039433111/plan.json new file mode 100644 index 000000000000..fd995d87edf7 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/joins_-_matching_columns_in_both_sides_=_select_left.__and_right.__with_WHERE/6.0.0_1593039433111/plan.json @@ -0,0 +1,208 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM L (A INTEGER KEY, B INTEGER, C INTEGER) WITH (KAFKA_TOPIC='LEFT', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "L", + "schema" : "`A` INTEGER KEY, `B` INTEGER, `C` INTEGER", + "topicName" : "LEFT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM R (A INTEGER KEY, B INTEGER, C INTEGER) WITH (KAFKA_TOPIC='RIGHT', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "R", + "schema" : "`A` INTEGER KEY, `B` INTEGER, `C` INTEGER", + "topicName" : "RIGHT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n L.*,\n R.*\nFROM L L\nINNER JOIN R R WITHIN 10 SECONDS ON ((L.A = R.A))\nWHERE (R.B < 5)\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`L_A` INTEGER KEY, `L_B` INTEGER, `L_C` INTEGER, `R_A` INTEGER, `R_B` INTEGER, `R_C` INTEGER", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "queryPlan" : { + "sources" : [ "L", "R" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamFilterV1", + "properties" : { + "queryContext" : "WhereFilter" + }, + "source" : { + "@type" : "streamStreamJoinV1", + "properties" : { + "queryContext" : "Join" + }, + "joinType" : "INNER", + "leftInternalFormats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "rightInternalFormats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "leftSource" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "PrependAliasLeft" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KafkaTopic_Left/Source" + }, + "topicName" : "LEFT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`A` INTEGER KEY, `B` INTEGER, `C` INTEGER" + }, + "keyColumnNames" : [ "L_A" ], + "selectExpressions" : [ "B AS L_B", "C AS L_C", "ROWTIME AS L_ROWTIME", "A AS L_A" ] + }, + "rightSource" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "PrependAliasRight" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KafkaTopic_Right/Source" + }, + "topicName" : "RIGHT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`A` INTEGER KEY, `B` INTEGER, `C` INTEGER" + }, + "keyColumnNames" : [ "R_A" ], + "selectExpressions" : [ "B AS R_B", "C AS R_C", "ROWTIME AS R_ROWTIME", "A AS R_A" ] + }, + "beforeMillis" : 10.000000000, + "afterMillis" : 10.000000000, + "keyColName" : "L_A" + }, + "filterExpression" : "(R_B < 5)" + }, + "keyColumnNames" : [ "L_A" ], + "selectExpressions" : [ "L_B AS L_B", "L_C AS L_C", "R_A AS R_A", "R_B AS R_B", "R_C AS R_C" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.error.classifier.regex" : "" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/joins_-_matching_columns_in_both_sides_=_select_left.__and_right.__with_WHERE/6.0.0_1593039433111/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/joins_-_matching_columns_in_both_sides_=_select_left.__and_right.__with_WHERE/6.0.0_1593039433111/spec.json new file mode 100644 index 000000000000..7457b39e9b31 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/joins_-_matching_columns_in_both_sides_=_select_left.__and_right.__with_WHERE/6.0.0_1593039433111/spec.json @@ -0,0 +1,130 @@ +{ + "version" : "6.0.0", + "timestamp" : 1593039433111, + "path" : "query-validation-tests/joins.json", + "schemas" : { + "CSAS_OUTPUT_0.KafkaTopic_Left.Source" : "STRUCT NOT NULL", + "CSAS_OUTPUT_0.KafkaTopic_Right.Source" : "STRUCT NOT NULL", + "CSAS_OUTPUT_0.Join.Left" : "STRUCT NOT NULL", + "CSAS_OUTPUT_0.Join.Right" : "STRUCT NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT NOT NULL" + }, + "testCase" : { + "name" : "matching columns in both sides = select left.* and right.* with WHERE", + "inputs" : [ { + "topic" : "LEFT", + "key" : 0, + "value" : { + "B" : 1, + "C" : 2 + }, + "timestamp" : 10 + }, { + "topic" : "RIGHT", + "key" : 0, + "value" : { + "B" : -1, + "C" : -2 + }, + "timestamp" : 11 + }, { + "topic" : "RIGHT", + "key" : 0, + "value" : { + "B" : 9, + "C" : 10 + }, + "timestamp" : 12 + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : 0, + "value" : { + "R_A" : 0, + "L_B" : 1, + "R_B" : -1, + "L_C" : 2, + "R_C" : -2 + }, + "timestamp" : 11 + } ], + "topics" : [ { + "name" : "LEFT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "RIGHT", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM L (A INT KEY, B INT, C INT) WITH (kafka_topic='LEFT', value_format='JSON');", "CREATE STREAM R (A INT KEY, B INT, C INT) WITH (kafka_topic='RIGHT', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT l.*, r.* FROM L INNER JOIN R WITHIN 10 SECONDS ON L.A = R.A WHERE R.B < 5;" ], + "post" : { + "sources" : [ { + "name" : "OUTPUT", + "type" : "stream", + "schema" : "L_A INT KEY, L_B INT, L_C INT, R_A INT, R_B INT, R_C INT" + } ], + "topics" : { + "topics" : [ { + "name" : "LEFT", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "RIGHT", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "_confluent-ksql-some.ksql.service.idquery_CSAS_OUTPUT_0-KSTREAM-JOINOTHER-0000000009-store-changelog", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + } + }, { + "name" : "_confluent-ksql-some.ksql.service.idquery_CSAS_OUTPUT_0-KSTREAM-JOINTHIS-0000000008-store-changelog", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + } + } ], + "blackList" : ".*-repartition" + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/joins_-_matching_columns_in_both_sides_=_select_left.__and_right.__with_WHERE/6.0.0_1593039433111/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/joins_-_matching_columns_in_both_sides_=_select_left.__and_right.__with_WHERE/6.0.0_1593039433111/topology new file mode 100644 index 000000000000..3cf7105d35ea --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/joins_-_matching_columns_in_both_sides_=_select_left.__and_right.__with_WHERE/6.0.0_1593039433111/topology @@ -0,0 +1,42 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [LEFT]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Source: KSTREAM-SOURCE-0000000003 (topics: [RIGHT]) + --> KSTREAM-TRANSFORMVALUES-0000000004 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> PrependAliasLeft + <-- KSTREAM-SOURCE-0000000000 + Processor: KSTREAM-TRANSFORMVALUES-0000000004 (stores: []) + --> PrependAliasRight + <-- KSTREAM-SOURCE-0000000003 + Processor: PrependAliasLeft (stores: []) + --> Join-this-windowed + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Processor: PrependAliasRight (stores: []) + --> Join-other-windowed + <-- KSTREAM-TRANSFORMVALUES-0000000004 + Processor: Join-other-windowed (stores: [KSTREAM-JOINOTHER-0000000009-store]) + --> Join-other-join + <-- PrependAliasRight + Processor: Join-this-windowed (stores: [KSTREAM-JOINTHIS-0000000008-store]) + --> Join-this-join + <-- PrependAliasLeft + Processor: Join-other-join (stores: [KSTREAM-JOINTHIS-0000000008-store]) + --> Join-merge + <-- Join-other-windowed + Processor: Join-this-join (stores: [KSTREAM-JOINOTHER-0000000009-store]) + --> Join-merge + <-- Join-this-windowed + Processor: Join-merge (stores: []) + --> WhereFilter + <-- Join-this-join, Join-other-join + Processor: WhereFilter (stores: []) + --> Project + <-- Join-merge + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000013 + <-- WhereFilter + Sink: KSTREAM-SINK-0000000013 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/multi-joins_-_scoped_include_all_columns/6.0.0_1593098410884/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/multi-joins_-_scoped_include_all_columns/6.0.0_1593098410884/plan.json new file mode 100644 index 000000000000..ed62639807a3 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/multi-joins_-_scoped_include_all_columns/6.0.0_1593098410884/plan.json @@ -0,0 +1,275 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM S1 (A INTEGER KEY, B INTEGER) WITH (KAFKA_TOPIC='S1', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "S1", + "schema" : "`A` INTEGER KEY, `B` INTEGER", + "topicName" : "S1", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM S2 (A INTEGER KEY, B INTEGER) WITH (KAFKA_TOPIC='S2', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "S2", + "schema" : "`A` INTEGER KEY, `B` INTEGER", + "topicName" : "S2", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM S3 (A INTEGER KEY, B INTEGER) WITH (KAFKA_TOPIC='S3', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "S3", + "schema" : "`A` INTEGER KEY, `B` INTEGER", + "topicName" : "S3", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n S1.*,\n S2.*,\n S3.*\nFROM S1 S1\nINNER JOIN S2 S2 WITHIN 10 SECONDS ON ((S1.A = S2.A))\nINNER JOIN S3 S3 WITHIN 10 SECONDS ON ((S1.A = S3.A))\nWHERE (S1.B < 5)\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`S1_A` INTEGER KEY, `S1_B` INTEGER, `S2_A` INTEGER, `S2_B` INTEGER, `S3_A` INTEGER, `S3_B` INTEGER", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "queryPlan" : { + "sources" : [ "S1", "S2", "S3" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamFilterV1", + "properties" : { + "queryContext" : "WhereFilter" + }, + "source" : { + "@type" : "streamStreamJoinV1", + "properties" : { + "queryContext" : "Join" + }, + "joinType" : "INNER", + "leftInternalFormats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "rightInternalFormats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "leftSource" : { + "@type" : "streamStreamJoinV1", + "properties" : { + "queryContext" : "L_Join" + }, + "joinType" : "INNER", + "leftInternalFormats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "rightInternalFormats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "leftSource" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "PrependAliasL_Left" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KafkaTopic_L_Left/Source" + }, + "topicName" : "S1", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`A` INTEGER KEY, `B` INTEGER" + }, + "keyColumnNames" : [ "S1_A" ], + "selectExpressions" : [ "B AS S1_B", "ROWTIME AS S1_ROWTIME", "A AS S1_A" ] + }, + "rightSource" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "PrependAliasL_Right" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KafkaTopic_L_Right/Source" + }, + "topicName" : "S2", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`A` INTEGER KEY, `B` INTEGER" + }, + "keyColumnNames" : [ "S2_A" ], + "selectExpressions" : [ "B AS S2_B", "ROWTIME AS S2_ROWTIME", "A AS S2_A" ] + }, + "beforeMillis" : 10.000000000, + "afterMillis" : 10.000000000, + "keyColName" : "S1_A" + }, + "rightSource" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "PrependAliasRight" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KafkaTopic_Right/Source" + }, + "topicName" : "S3", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`A` INTEGER KEY, `B` INTEGER" + }, + "keyColumnNames" : [ "S3_A" ], + "selectExpressions" : [ "B AS S3_B", "ROWTIME AS S3_ROWTIME", "A AS S3_A" ] + }, + "beforeMillis" : 10.000000000, + "afterMillis" : 10.000000000, + "keyColName" : "S1_A" + }, + "filterExpression" : "(S1_B < 5)" + }, + "keyColumnNames" : [ "S1_A" ], + "selectExpressions" : [ "S1_B AS S1_B", "S2_A AS S2_A", "S2_B AS S2_B", "S3_A AS S3_A", "S3_B AS S3_B" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.error.classifier.regex" : "" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/multi-joins_-_scoped_include_all_columns/6.0.0_1593098410884/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/multi-joins_-_scoped_include_all_columns/6.0.0_1593098410884/spec.json new file mode 100644 index 000000000000..3e265a4fb20a --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/multi-joins_-_scoped_include_all_columns/6.0.0_1593098410884/spec.json @@ -0,0 +1,166 @@ +{ + "version" : "6.0.0", + "timestamp" : 1593098410884, + "path" : "query-validation-tests/multi-joins.json", + "schemas" : { + "CSAS_OUTPUT_0.KafkaTopic_L_Left.Source" : "STRUCT NOT NULL", + "CSAS_OUTPUT_0.KafkaTopic_L_Right.Source" : "STRUCT NOT NULL", + "CSAS_OUTPUT_0.L_Join.Left" : "STRUCT NOT NULL", + "CSAS_OUTPUT_0.L_Join.Right" : "STRUCT NOT NULL", + "CSAS_OUTPUT_0.KafkaTopic_Right.Source" : "STRUCT NOT NULL", + "CSAS_OUTPUT_0.Join.Left" : "STRUCT NOT NULL", + "CSAS_OUTPUT_0.Join.Right" : "STRUCT NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT NOT NULL" + }, + "testCase" : { + "name" : "scoped include all columns", + "inputs" : [ { + "topic" : "S1", + "key" : 0, + "value" : { + "B" : 1 + }, + "timestamp" : 10 + }, { + "topic" : "S2", + "key" : 0, + "value" : { + "B" : -1 + }, + "timestamp" : 11 + }, { + "topic" : "S3", + "key" : 0, + "value" : { + "B" : 9 + }, + "timestamp" : 12 + }, { + "topic" : "S1", + "key" : 0, + "value" : { + "B" : 9 + }, + "timestamp" : 13 + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : 0, + "value" : { + "S1_B" : 1, + "S2_A" : 0, + "S2_B" : -1, + "S3_A" : 0, + "S3_B" : 9 + }, + "timestamp" : 12 + } ], + "topics" : [ { + "name" : "S3", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "S1", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "S2", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM S1 (A INT KEY, B INT) WITH (kafka_topic='S1', value_format='JSON');", "CREATE STREAM S2 (A INT KEY, B INT) WITH (kafka_topic='S2', value_format='JSON');", "CREATE STREAM S3 (A INT KEY, B INT) WITH (kafka_topic='S3', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT s1.*, s2.*, s3.* FROM S1 INNER JOIN S2 WITHIN 10 SECONDS ON S1.A = S2.A INNER JOIN S3 WITHIN 10 SECONDS ON S1.A = S3.A WHERE S1.B < 5;" ], + "post" : { + "topics" : { + "topics" : [ { + "name" : "OUTPUT", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "S1", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "S2", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "S3", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "_confluent-ksql-some.ksql.service.idquery_CSAS_OUTPUT_0-KSTREAM-JOINOTHER-0000000009-store-changelog", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + } + }, { + "name" : "_confluent-ksql-some.ksql.service.idquery_CSAS_OUTPUT_0-KSTREAM-JOINOTHER-0000000017-store-changelog", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + } + }, { + "name" : "_confluent-ksql-some.ksql.service.idquery_CSAS_OUTPUT_0-KSTREAM-JOINTHIS-0000000008-store-changelog", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + } + }, { + "name" : "_confluent-ksql-some.ksql.service.idquery_CSAS_OUTPUT_0-KSTREAM-JOINTHIS-0000000016-store-changelog", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + } + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/multi-joins_-_scoped_include_all_columns/6.0.0_1593098410884/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/multi-joins_-_scoped_include_all_columns/6.0.0_1593098410884/topology new file mode 100644 index 000000000000..c18c8979bbe1 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/multi-joins_-_scoped_include_all_columns/6.0.0_1593098410884/topology @@ -0,0 +1,65 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [S1]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Source: KSTREAM-SOURCE-0000000003 (topics: [S2]) + --> KSTREAM-TRANSFORMVALUES-0000000004 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> PrependAliasL_Left + <-- KSTREAM-SOURCE-0000000000 + Processor: KSTREAM-TRANSFORMVALUES-0000000004 (stores: []) + --> PrependAliasL_Right + <-- KSTREAM-SOURCE-0000000003 + Processor: PrependAliasL_Left (stores: []) + --> L_Join-this-windowed + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Processor: PrependAliasL_Right (stores: []) + --> L_Join-other-windowed + <-- KSTREAM-TRANSFORMVALUES-0000000004 + Source: KSTREAM-SOURCE-0000000011 (topics: [S3]) + --> KSTREAM-TRANSFORMVALUES-0000000012 + Processor: L_Join-other-windowed (stores: [KSTREAM-JOINOTHER-0000000009-store]) + --> L_Join-other-join + <-- PrependAliasL_Right + Processor: L_Join-this-windowed (stores: [KSTREAM-JOINTHIS-0000000008-store]) + --> L_Join-this-join + <-- PrependAliasL_Left + Processor: KSTREAM-TRANSFORMVALUES-0000000012 (stores: []) + --> PrependAliasRight + <-- KSTREAM-SOURCE-0000000011 + Processor: L_Join-other-join (stores: [KSTREAM-JOINTHIS-0000000008-store]) + --> L_Join-merge + <-- L_Join-other-windowed + Processor: L_Join-this-join (stores: [KSTREAM-JOINOTHER-0000000009-store]) + --> L_Join-merge + <-- L_Join-this-windowed + Processor: L_Join-merge (stores: []) + --> Join-this-windowed + <-- L_Join-this-join, L_Join-other-join + Processor: PrependAliasRight (stores: []) + --> Join-other-windowed + <-- KSTREAM-TRANSFORMVALUES-0000000012 + Processor: Join-other-windowed (stores: [KSTREAM-JOINOTHER-0000000017-store]) + --> Join-other-join + <-- PrependAliasRight + Processor: Join-this-windowed (stores: [KSTREAM-JOINTHIS-0000000016-store]) + --> Join-this-join + <-- L_Join-merge + Processor: Join-other-join (stores: [KSTREAM-JOINTHIS-0000000016-store]) + --> Join-merge + <-- Join-other-windowed + Processor: Join-this-join (stores: [KSTREAM-JOINOTHER-0000000017-store]) + --> Join-merge + <-- Join-this-windowed + Processor: Join-merge (stores: []) + --> WhereFilter + <-- Join-this-join, Join-other-join + Processor: WhereFilter (stores: []) + --> Project + <-- Join-merge + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000021 + <-- WhereFilter + Sink: KSTREAM-SINK-0000000021 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/joins.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/joins.json index 1a6e323342ab..0d0546f72bb5 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/joins.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/joins.json @@ -46,6 +46,30 @@ } } }, + { + "name": "matching columns in both sides = select left.* and right.* with WHERE", + "statements": [ + "CREATE STREAM L (A INT KEY, B INT, C INT) WITH (kafka_topic='LEFT', value_format='JSON');", + "CREATE STREAM R (A INT KEY, B INT, C INT) WITH (kafka_topic='RIGHT', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT l.*, r.* FROM L INNER JOIN R WITHIN 10 SECONDS ON L.A = R.A WHERE R.B < 5;" + ], + "inputs": [ + {"topic": "LEFT", "key": 0, "value": {"B": 1, "C": 2}, "timestamp": 10}, + {"topic": "RIGHT", "key": 0, "value": {"B": -1, "C": -2}, "timestamp": 11}, + {"topic": "RIGHT", "key": 0, "value": {"B": 9, "C": 10}, "timestamp": 12} + ], + "outputs": [ + {"topic": "OUTPUT", "key": 0, "value": {"R_A": 0, "L_B": 1, "R_B": -1, "L_C": 2, "R_C": -2}, "timestamp": 11} + ], + "post": { + "sources": [ + {"name": "OUTPUT", "type": "stream", "schema": "L_A INT KEY, L_B INT, L_C INT, R_A INT, R_B INT, R_C INT"} + ], + "topics": { + "blacklist": ".*-repartition" + } + } + }, { "name": "aliased join key", "statements": [ diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/multi-joins.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/multi-joins.json index c05e8ef47bc0..8ece1d0cc71e 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/multi-joins.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/multi-joins.json @@ -1029,6 +1029,24 @@ "type": "io.confluent.ksql.util.KsqlStatementException", "message": "Can't join `S1` with `T3` since the number of partitions don't match. `S1` partitions = 1; `T3` partitions = 2. Please repartition either one so that the number of partitions match." } + }, + { + "name": "scoped include all columns", + "statements": [ + "CREATE STREAM S1 (A INT KEY, B INT) WITH (kafka_topic='S1', value_format='JSON');", + "CREATE STREAM S2 (A INT KEY, B INT) WITH (kafka_topic='S2', value_format='JSON');", + "CREATE STREAM S3 (A INT KEY, B INT) WITH (kafka_topic='S3', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT s1.*, s2.*, s3.* FROM S1 INNER JOIN S2 WITHIN 10 SECONDS ON S1.A = S2.A INNER JOIN S3 WITHIN 10 SECONDS ON S1.A = S3.A WHERE S1.B < 5;" + ], + "inputs": [ + {"topic": "S1", "key": 0, "value": {"B": 1}, "timestamp": 10}, + {"topic": "S2", "key": 0, "value": {"B": -1}, "timestamp": 11}, + {"topic": "S3", "key": 0, "value": {"B": 9}, "timestamp": 12}, + {"topic": "S1", "key": 0, "value": {"B": 9}, "timestamp": 13} + ], + "outputs": [ + {"topic": "OUTPUT", "key": 0, "value": {"S1_B": 1, "S2_A": 0, "S2_B": -1, "S3_A": 0, "S3_B": 9}, "timestamp": 12} + ] } ] } \ No newline at end of file