Skip to content

Commit

Permalink
fix: query w/ scoped all columns, join and where clause (#5684)
Browse files Browse the repository at this point in the history
#5503

This fixes a regression introduced in 0.10.

The implications of this are not that it crashes the command runner thread, as the original ticket states. Instead, the implications are that queries a users could previously run will now fail. Existing running queries will not be affected.

Affected queries will be any using combining:

* A join
* with a projection using a scoped 'all columns', e.g. a A.*
* with a where clause

e.g.

```
SELECT A.*, B.Id
   FROM A
     JOIN B ON A.Id = B.userId
   WHERE A.x < 10;
```

Co-authored-by: Andy Coates <big-andy-coates@users.noreply.github.com>
  • Loading branch information
big-andy-coates and big-andy-coates committed Jun 30, 2020
1 parent cc17927 commit 304eb0c
Show file tree
Hide file tree
Showing 10 changed files with 948 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ public Stream<ColumnName> resolveSelectStar(
final Optional<SourceName> sourceName
) {
return getSources().stream()
.filter(s -> !sourceName.isPresent() || sourceName.equals(s.getSourceName()))
.filter(s -> !sourceName.isPresent()
|| !s.getSourceName().isPresent()
|| sourceName.equals(s.getSourceName()))
.flatMap(s -> s.resolveSelectStar(sourceName));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@ public void shouldResolveUnaliasedSelectStarByCallingAllSources() {
@Test
public void shouldResolveAliasedSelectStarByCallingOnlyCorrectParent() {
// When:
final Stream<ColumnName> result = planNode.resolveSelectStar(Optional.of(SOURCE_2_NAME)
);
final Stream<ColumnName> result = planNode.resolveSelectStar(Optional.of(SOURCE_2_NAME));

// Then:
final List<ColumnName> columns = result.collect(Collectors.toList());
Expand All @@ -110,6 +109,22 @@ public void shouldResolveAliasedSelectStarByCallingOnlyCorrectParent() {
verify(source2).resolveSelectStar(Optional.of(SOURCE_2_NAME));
}

@Test
public void shouldResolveAliasedSelectStarByCallingParentIfParentHasNoSourceName() {
// Given:
when(source1.getSourceName()).thenReturn(Optional.empty());

// When:
final Stream<ColumnName> result = planNode.resolveSelectStar(Optional.of(SOURCE_2_NAME));

// Then:
final List<ColumnName> columns = result.collect(Collectors.toList());
assertThat(columns, contains(COL0, COL1, COL2, COL3));

verify(source1).resolveSelectStar(Optional.of(SOURCE_2_NAME));
verify(source2).resolveSelectStar(Optional.of(SOURCE_2_NAME));
}

private final class TestPlanNode extends PlanNode {

protected TestPlanNode(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM L (A INTEGER KEY, B INTEGER, C INTEGER) WITH (KAFKA_TOPIC='LEFT', VALUE_FORMAT='JSON');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "L",
"schema" : "`A` INTEGER KEY, `B` INTEGER, `C` INTEGER",
"topicName" : "LEFT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
}
}
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM R (A INTEGER KEY, B INTEGER, C INTEGER) WITH (KAFKA_TOPIC='RIGHT', VALUE_FORMAT='JSON');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "R",
"schema" : "`A` INTEGER KEY, `B` INTEGER, `C` INTEGER",
"topicName" : "RIGHT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
}
}
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM OUTPUT AS SELECT\n L.*,\n R.*\nFROM L L\nINNER JOIN R R WITHIN 10 SECONDS ON ((L.A = R.A))\nWHERE (R.B < 5)\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "OUTPUT",
"schema" : "`L_A` INTEGER KEY, `L_B` INTEGER, `L_C` INTEGER, `R_A` INTEGER, `R_B` INTEGER, `R_C` INTEGER",
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
}
},
"queryPlan" : {
"sources" : [ "L", "R" ],
"sink" : "OUTPUT",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "OUTPUT"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamFilterV1",
"properties" : {
"queryContext" : "WhereFilter"
},
"source" : {
"@type" : "streamStreamJoinV1",
"properties" : {
"queryContext" : "Join"
},
"joinType" : "INNER",
"leftInternalFormats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"rightInternalFormats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"leftSource" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "PrependAliasLeft"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KafkaTopic_Left/Source"
},
"topicName" : "LEFT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"sourceSchema" : "`A` INTEGER KEY, `B` INTEGER, `C` INTEGER"
},
"keyColumnNames" : [ "L_A" ],
"selectExpressions" : [ "B AS L_B", "C AS L_C", "ROWTIME AS L_ROWTIME", "A AS L_A" ]
},
"rightSource" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "PrependAliasRight"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KafkaTopic_Right/Source"
},
"topicName" : "RIGHT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"sourceSchema" : "`A` INTEGER KEY, `B` INTEGER, `C` INTEGER"
},
"keyColumnNames" : [ "R_A" ],
"selectExpressions" : [ "B AS R_B", "C AS R_C", "ROWTIME AS R_ROWTIME", "A AS R_A" ]
},
"beforeMillis" : 10.000000000,
"afterMillis" : 10.000000000,
"keyColName" : "L_A"
},
"filterExpression" : "(R_B < 5)"
},
"keyColumnNames" : [ "L_A" ],
"selectExpressions" : [ "L_B AS L_B", "L_C AS L_C", "R_A AS R_A", "R_B AS R_B", "R_C AS R_C" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"topicName" : "OUTPUT"
},
"queryId" : "CSAS_OUTPUT_0"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"ksql.transient.prefix" : "transient_",
"ksql.persistence.wrap.single.values" : "true",
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.schema.registry.url" : "",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.query.pull.max.qps" : "2147483647",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.metric.reporters" : "",
"ksql.query.pull.metrics.enabled" : "false",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.metrics.extension" : null,
"ksql.streams.topology.optimization" : "all",
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.streams.num.stream.threads" : "4",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.metrics.tags.custom" : "",
"ksql.pull.queries.enable" : "true",
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.udf.collect.metrics" : "false",
"ksql.persistent.prefix" : "query_",
"ksql.query.persistent.active.limit" : "2147483647",
"ksql.error.classifier.regex" : ""
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
{
"version" : "6.0.0",
"timestamp" : 1593039433111,
"path" : "query-validation-tests/joins.json",
"schemas" : {
"CSAS_OUTPUT_0.KafkaTopic_Left.Source" : "STRUCT<B INT, C INT> NOT NULL",
"CSAS_OUTPUT_0.KafkaTopic_Right.Source" : "STRUCT<B INT, C INT> NOT NULL",
"CSAS_OUTPUT_0.Join.Left" : "STRUCT<L_B INT, L_C INT, L_ROWTIME BIGINT, L_A INT> NOT NULL",
"CSAS_OUTPUT_0.Join.Right" : "STRUCT<R_B INT, R_C INT, R_ROWTIME BIGINT, R_A INT> NOT NULL",
"CSAS_OUTPUT_0.OUTPUT" : "STRUCT<L_B INT, L_C INT, R_A INT, R_B INT, R_C INT> NOT NULL"
},
"testCase" : {
"name" : "matching columns in both sides = select left.* and right.* with WHERE",
"inputs" : [ {
"topic" : "LEFT",
"key" : 0,
"value" : {
"B" : 1,
"C" : 2
},
"timestamp" : 10
}, {
"topic" : "RIGHT",
"key" : 0,
"value" : {
"B" : -1,
"C" : -2
},
"timestamp" : 11
}, {
"topic" : "RIGHT",
"key" : 0,
"value" : {
"B" : 9,
"C" : 10
},
"timestamp" : 12
} ],
"outputs" : [ {
"topic" : "OUTPUT",
"key" : 0,
"value" : {
"R_A" : 0,
"L_B" : 1,
"R_B" : -1,
"L_C" : 2,
"R_C" : -2
},
"timestamp" : 11
} ],
"topics" : [ {
"name" : "LEFT",
"replicas" : 1,
"numPartitions" : 4
}, {
"name" : "OUTPUT",
"replicas" : 1,
"numPartitions" : 4
}, {
"name" : "RIGHT",
"replicas" : 1,
"numPartitions" : 4
} ],
"statements" : [ "CREATE STREAM L (A INT KEY, B INT, C INT) WITH (kafka_topic='LEFT', value_format='JSON');", "CREATE STREAM R (A INT KEY, B INT, C INT) WITH (kafka_topic='RIGHT', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT l.*, r.* FROM L INNER JOIN R WITHIN 10 SECONDS ON L.A = R.A WHERE R.B < 5;" ],
"post" : {
"sources" : [ {
"name" : "OUTPUT",
"type" : "stream",
"schema" : "L_A INT KEY, L_B INT, L_C INT, R_A INT, R_B INT, R_C INT"
} ],
"topics" : {
"topics" : [ {
"name" : "LEFT",
"keyFormat" : {
"formatInfo" : {
"format" : "KAFKA"
}
},
"valueFormat" : {
"format" : "JSON"
},
"partitions" : 4
}, {
"name" : "OUTPUT",
"keyFormat" : {
"formatInfo" : {
"format" : "KAFKA"
}
},
"valueFormat" : {
"format" : "JSON"
},
"partitions" : 4
}, {
"name" : "RIGHT",
"keyFormat" : {
"formatInfo" : {
"format" : "KAFKA"
}
},
"valueFormat" : {
"format" : "JSON"
},
"partitions" : 4
}, {
"name" : "_confluent-ksql-some.ksql.service.idquery_CSAS_OUTPUT_0-KSTREAM-JOINOTHER-0000000009-store-changelog",
"keyFormat" : {
"formatInfo" : {
"format" : "KAFKA"
}
},
"valueFormat" : {
"format" : "JSON"
}
}, {
"name" : "_confluent-ksql-some.ksql.service.idquery_CSAS_OUTPUT_0-KSTREAM-JOINTHIS-0000000008-store-changelog",
"keyFormat" : {
"formatInfo" : {
"format" : "KAFKA"
}
},
"valueFormat" : {
"format" : "JSON"
}
} ],
"blackList" : ".*-repartition"
}
}
}
}

0 comments on commit 304eb0c

Please sign in to comment.