From 744fa36e7aaf526f3a7cd83b45d31d60353a0fc0 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 14 May 2021 13:42:41 -0700 Subject: [PATCH] feat: build physical plan for FK table-table joins (#7517) --- .../io/confluent/ksql/util/PlanSummary.java | 4 +- .../ksql/execution/plan/ExecutionStep.java | 5 +- .../plan/ForeignKeyTableTableJoin.java | 118 +++++++ .../ksql/execution/plan/PlanBuilder.java | 7 +- .../execution/plan/PlanInfoExtractor.java | 8 +- .../plan/ForeignKeyTableTableJoinTest.java | 71 ++++ .../resources/ksql-plan-schema/schema.json | 31 ++ .../streams/ExecutionStepFactory.java | 20 +- .../streams/ForeignKeyJoinParams.java | 45 +++ .../streams/ForeignKeyJoinParamsFactory.java | 67 ++++ .../ForeignKeyTableTableJoinBuilder.java | 66 ++++ .../ksql/execution/streams/KSPlanBuilder.java | 15 +- .../execution/streams/KsqlKeyExtractor.java | 66 ++++ .../execution/streams/KsqlValueJoiner.java | 2 +- .../execution/streams/StepSchemaResolver.java | 15 +- .../streams/TableTableJoinBuilder.java | 4 +- .../streams/AggregateParamsFactoryTest.java | 18 +- .../ForeignKeyJoinParamsFactoryTest.java | 82 +++++ .../ForeignKeyTableTableJoinBuilderTest.java | 321 ++++++++++++++++++ .../streams/JoinParamsFactoryTest.java | 23 +- .../streams/KsqlValueJoinerTest.java | 2 +- .../streams/MaterializedFactoryTest.java | 4 +- .../streams/PartitionByParamsFactoryTest.java | 4 +- .../streams/PlanInfoExtractorTest.java | 4 +- .../execution/streams/SourceBuilderTest.java | 12 +- .../streams/StepSchemaResolverTest.java | 4 +- .../streams/StreamAggregateBuilderTest.java | 10 +- .../streams/StreamFilterBuilderTest.java | 18 +- .../streams/StreamGroupByBuilderTest.java | 15 + .../streams/StreamGroupByBuilderV1Test.java | 15 + .../streams/StreamSelectKeyBuilderV1Test.java | 4 +- .../streams/StreamTableJoinBuilderTest.java | 15 + .../streams/TableAggregateBuilderTest.java | 4 +- .../streams/TableFilterBuilderTest.java | 106 +++--- .../streams/TableSuppressBuilderTest.java | 4 +- .../streams/TableTableJoinBuilderTest.java | 40 ++- 36 files changed, 1150 insertions(+), 99 deletions(-) create mode 100644 ksqldb-execution/src/main/java/io/confluent/ksql/execution/plan/ForeignKeyTableTableJoin.java create mode 100644 ksqldb-execution/src/test/java/io/confluent/ksql/execution/plan/ForeignKeyTableTableJoinTest.java create mode 100644 ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/ForeignKeyJoinParams.java create mode 100644 ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/ForeignKeyJoinParamsFactory.java create mode 100644 ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/ForeignKeyTableTableJoinBuilder.java create mode 100644 ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/KsqlKeyExtractor.java create mode 100644 ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/ForeignKeyJoinParamsFactoryTest.java create mode 100644 ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/ForeignKeyTableTableJoinBuilderTest.java diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/util/PlanSummary.java b/ksqldb-engine/src/main/java/io/confluent/ksql/util/PlanSummary.java index e90e7974c986..e98edc0f3792 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/util/PlanSummary.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/util/PlanSummary.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Confluent Inc. + * Copyright 2021 Confluent Inc. * * Licensed under the Confluent Community License (the "License"); you may not use * this file except in compliance with the License. You may obtain a copy of the @@ -19,6 +19,7 @@ import com.google.common.collect.ImmutableMap; import io.confluent.ksql.execution.context.QueryLoggerUtil; import io.confluent.ksql.execution.plan.ExecutionStep; +import io.confluent.ksql.execution.plan.ForeignKeyTableTableJoin; import io.confluent.ksql.execution.plan.SourceStep; import io.confluent.ksql.execution.plan.StreamAggregate; import io.confluent.ksql.execution.plan.StreamFilter; @@ -93,6 +94,7 @@ public class PlanSummary { .put(TableSelectKey.class, "REKEY") .put(TableSink.class, "SINK") .put(TableTableJoin.class, "JOIN") + .put(ForeignKeyTableTableJoin.class, "JOIN") .put(TableSource.class, "SOURCE") .put(TableSuppress.class, "SUPPRESS") .put(WindowedTableSource.class, "SOURCE") diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/plan/ExecutionStep.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/plan/ExecutionStep.java index 54517afd5a5c..93e6e48ca0b4 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/plan/ExecutionStep.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/plan/ExecutionStep.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Confluent Inc. + * Copyright 2021 Confluent Inc. * * Licensed under the Confluent Community License; you may not use this file * except in compliance with the License. You may obtain a copy of the License at @@ -53,7 +53,8 @@ @Type(value = TableSelectKey.class, name = "tableSelectKeyV1"), @Type(value = TableSink.class, name = "tableSinkV1"), @Type(value = TableSuppress.class, name = "tableSuppressV1"), - @Type(value = TableTableJoin.class, name = "tableTableJoinV1") + @Type(value = TableTableJoin.class, name = "tableTableJoinV1"), + @Type(value = ForeignKeyTableTableJoin.class, name = "fkTableTableJoinV1") }) @Immutable public interface ExecutionStep { diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/plan/ForeignKeyTableTableJoin.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/plan/ForeignKeyTableTableJoin.java new file mode 100644 index 000000000000..35d9fc52046d --- /dev/null +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/plan/ForeignKeyTableTableJoin.java @@ -0,0 +1,118 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License; you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.plan; + +import static java.util.Objects.requireNonNull; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.name.ColumnName; +import java.util.List; +import java.util.Objects; + +@Immutable +public class ForeignKeyTableTableJoin + implements ExecutionStep> { + + private final ExecutionStepPropertiesV1 properties; + private final JoinType joinType; + private final ColumnName leftJoinColumnName; + private final ExecutionStep> leftSource; + private final ExecutionStep> rightSource; + + @JsonCreator + public ForeignKeyTableTableJoin( + @JsonProperty(value = "properties", required = true) + final ExecutionStepPropertiesV1 props, + @JsonProperty(value = "joinType", required = true) + final JoinType joinType, + @JsonProperty(value = "leftJoinColumnName", required = true) + final ColumnName leftJoinColumnName, + @JsonProperty(value = "leftSource", required = true) + final ExecutionStep> leftSource, + @JsonProperty(value = "rightSource", required = true) + final ExecutionStep> rightSource + ) { + this.properties = requireNonNull(props, "props"); + this.joinType = requireNonNull(joinType, "joinType"); + if (joinType == JoinType.OUTER) { + throw new IllegalArgumentException("OUTER join not supported."); + } + this.leftJoinColumnName = requireNonNull(leftJoinColumnName, "leftJoinColumnName"); + this.leftSource = requireNonNull(leftSource, "leftSource"); + this.rightSource = requireNonNull(rightSource, "rightSource"); + } + + @Override + public ExecutionStepPropertiesV1 getProperties() { + return properties; + } + + @Override + @JsonIgnore + public List> getSources() { + return ImmutableList.of(leftSource, rightSource); + } + + public ExecutionStep> getLeftSource() { + return leftSource; + } + + public ExecutionStep> getRightSource() { + return rightSource; + } + + public JoinType getJoinType() { + return joinType; + } + + public ColumnName getLeftJoinColumnName() { + return leftJoinColumnName; + } + + @Override + public KTableHolder build(final PlanBuilder builder, final PlanInfo info) { + return builder.visitForeignKeyTableTableJoin(this, info); + } + + @Override + public PlanInfo extractPlanInfo(final PlanInfoExtractor extractor) { + return extractor.visitForeignKeyTableTableJoin(this); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final ForeignKeyTableTableJoin that = (ForeignKeyTableTableJoin) o; + return Objects.equals(properties, that.properties) + && joinType == that.joinType + && Objects.equals(leftJoinColumnName, that.leftJoinColumnName) + && Objects.equals(leftSource, that.leftSource) + && Objects.equals(rightSource, that.rightSource); + } + + @Override + public int hashCode() { + return Objects.hash(properties, joinType, leftJoinColumnName, leftSource, rightSource); + } +} diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/plan/PlanBuilder.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/plan/PlanBuilder.java index 7a7c7d7def3c..fb21269b4a83 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/plan/PlanBuilder.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/plan/PlanBuilder.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Confluent Inc. + * Copyright 2021 Confluent Inc. * * Licensed under the Confluent Community License (the "License"); you may not use * this file except in compliance with the License. You may obtain a copy of the @@ -82,4 +82,9 @@ KTableHolder> visitStreamWindowedAggregate( KTableHolder visitTableSuppress(TableSuppress tableSuppress, PlanInfo planInfo); KTableHolder visitTableTableJoin(TableTableJoin tableTableJoin, PlanInfo planInfo); + + KTableHolder visitForeignKeyTableTableJoin( + ForeignKeyTableTableJoin foreignKeyTableTableJoin, + PlanInfo planInfo + ); } diff --git a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/plan/PlanInfoExtractor.java b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/plan/PlanInfoExtractor.java index 75821865365a..65398b3acdb9 100644 --- a/ksqldb-execution/src/main/java/io/confluent/ksql/execution/plan/PlanInfoExtractor.java +++ b/ksqldb-execution/src/main/java/io/confluent/ksql/execution/plan/PlanInfoExtractor.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Confluent Inc. + * Copyright 2021 Confluent Inc. * * Licensed under the Confluent Community License (final the "License"); you may not use * this file except in compliance with the License. You may obtain a copy of the @@ -130,6 +130,12 @@ public PlanInfo visitTableTableJoin(final TableTableJoin tableTableJoin) return visitJoinStep(tableTableJoin); } + public PlanInfo visitForeignKeyTableTableJoin( + final ForeignKeyTableTableJoin foreignKeyTableTableJoin) { + + return visitJoinStep(foreignKeyTableTableJoin); + } + private PlanInfo visitSourceStep(final ExecutionStep step) { return new PlanInfo(step); } diff --git a/ksqldb-execution/src/test/java/io/confluent/ksql/execution/plan/ForeignKeyTableTableJoinTest.java b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/plan/ForeignKeyTableTableJoinTest.java new file mode 100644 index 000000000000..f8949aabff45 --- /dev/null +++ b/ksqldb-execution/src/test/java/io/confluent/ksql/execution/plan/ForeignKeyTableTableJoinTest.java @@ -0,0 +1,71 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License; you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.plan; + +import static io.confluent.ksql.execution.plan.JoinType.INNER; +import static io.confluent.ksql.execution.plan.JoinType.LEFT; + +import com.google.common.testing.EqualsTester; +import io.confluent.ksql.name.ColumnName; +import org.apache.kafka.connect.data.Struct; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class ForeignKeyTableTableJoinTest { + + private static final ColumnName JOIN_COLUMN_NAME = ColumnName.of("Bob"); + private static final ColumnName JOIN_COLUMN_NAME_2 = ColumnName.of("Vic"); + + @Mock + private ExecutionStepPropertiesV1 props1; + @Mock + private ExecutionStepPropertiesV1 props2; + @Mock + private ExecutionStep> left1; + @Mock + private ExecutionStep> right1; + @Mock + private ExecutionStep> left2; + @Mock + private ExecutionStep> right2; + + @SuppressWarnings("UnstableApiUsage") + @Test + public void shouldImplementEquals() { + new EqualsTester() + .addEqualityGroup( + new ForeignKeyTableTableJoin<>(props1, INNER, JOIN_COLUMN_NAME, left1, right1), + new ForeignKeyTableTableJoin<>(props1, INNER, JOIN_COLUMN_NAME, left1, right1) + ) + .addEqualityGroup( + new ForeignKeyTableTableJoin<>(props2, INNER, JOIN_COLUMN_NAME, left1, right1) + ) + .addEqualityGroup( + new ForeignKeyTableTableJoin<>(props1, LEFT, JOIN_COLUMN_NAME, left1, right1) + ) + .addEqualityGroup( + new ForeignKeyTableTableJoin<>(props1, INNER, JOIN_COLUMN_NAME_2, left1, right1) + ) + .addEqualityGroup( + new ForeignKeyTableTableJoin<>(props1, INNER, JOIN_COLUMN_NAME, left2, right1) + ) + .addEqualityGroup( + new ForeignKeyTableTableJoin<>(props1, INNER, JOIN_COLUMN_NAME, left1, right2) + ).testEquals(); + } +} \ No newline at end of file diff --git a/ksqldb-rest-app/src/test/resources/ksql-plan-schema/schema.json b/ksqldb-rest-app/src/test/resources/ksql-plan-schema/schema.json index b0099fc1a20b..c807b9107c44 100644 --- a/ksqldb-rest-app/src/test/resources/ksql-plan-schema/schema.json +++ b/ksqldb-rest-app/src/test/resources/ksql-plan-schema/schema.json @@ -1044,6 +1044,35 @@ "title" : "tableTableJoinV1", "required" : [ "@type", "properties", "joinType", "leftSource", "rightSource" ] }, + "ForeignKeyTableTableJoin" : { + "type" : "object", + "additionalProperties" : false, + "properties" : { + "@type" : { + "type" : "string", + "enum" : [ "fkTableTableJoinV1" ], + "default" : "fkTableTableJoinV1" + }, + "properties" : { + "$ref" : "#/definitions/ExecutionStepPropertiesV1" + }, + "joinType" : { + "type" : "string", + "enum" : [ "INNER", "LEFT", "OUTER" ] + }, + "leftJoinColumnName" : { + "type" : "string" + }, + "leftSource" : { + "$ref" : "#/definitions/ExecutionStep" + }, + "rightSource" : { + "$ref" : "#/definitions/ExecutionStep" + } + }, + "title" : "fkTableTableJoinV1", + "required" : [ "@type", "properties", "joinType", "leftJoinColumnName", "leftSource", "rightSource" ] + }, "RefinementInfo" : { "type" : "object", "additionalProperties" : false, @@ -1108,6 +1137,8 @@ "$ref" : "#/definitions/TableSuppress" }, { "$ref" : "#/definitions/TableTableJoin" + }, { + "$ref" : "#/definitions/ForeignKeyTableTableJoin" } ] } } diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java index 5898bd63f90f..e35227050728 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Confluent Inc. + * Copyright 2021 Confluent Inc. * * Licensed under the Confluent Community License; you may not use this file * except in compliance with the License. You may obtain a copy of the License at @@ -21,6 +21,7 @@ import io.confluent.ksql.execution.expression.tree.FunctionCall; import io.confluent.ksql.execution.plan.ExecutionStep; import io.confluent.ksql.execution.plan.ExecutionStepPropertiesV1; +import io.confluent.ksql.execution.plan.ForeignKeyTableTableJoin; import io.confluent.ksql.execution.plan.Formats; import io.confluent.ksql.execution.plan.JoinType; import io.confluent.ksql.execution.plan.KGroupedStreamHolder; @@ -320,6 +321,23 @@ public static TableTableJoin tableTableJoin( ); } + public static ForeignKeyTableTableJoin + foreignKeyTableTableJoin(final QueryContext.Stacker stacker, + final JoinType joinType, + final ColumnName leftJoinColumnName, + final ExecutionStep> left, + final ExecutionStep> right + ) { + final QueryContext queryContext = stacker.getQueryContext(); + return new ForeignKeyTableTableJoin<>( + new ExecutionStepPropertiesV1(queryContext), + joinType, + leftJoinColumnName, + left, + right + ); + } + public static StreamAggregate streamAggregate( final QueryContext.Stacker stacker, final ExecutionStep sourceStep, diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/ForeignKeyJoinParams.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/ForeignKeyJoinParams.java new file mode 100644 index 000000000000..30cdcd83c905 --- /dev/null +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/ForeignKeyJoinParams.java @@ -0,0 +1,45 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.streams; + +import io.confluent.ksql.schema.ksql.LogicalSchema; +import java.util.Objects; + +public class ForeignKeyJoinParams { + private final KsqlKeyExtractor keyExtractor; + private final KsqlValueJoiner joiner; + private final LogicalSchema schema; + + ForeignKeyJoinParams(final KsqlKeyExtractor keyExtractor, + final KsqlValueJoiner joiner, + final LogicalSchema schema) { + this.keyExtractor = Objects.requireNonNull(keyExtractor, "keyExtractor"); + this.joiner = Objects.requireNonNull(joiner, "joiner"); + this.schema = Objects.requireNonNull(schema, "schema"); + } + + public LogicalSchema getSchema() { + return schema; + } + + public KsqlKeyExtractor getKeyExtractor() { + return keyExtractor; + } + + public KsqlValueJoiner getJoiner() { + return joiner; + } +} diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/ForeignKeyJoinParamsFactory.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/ForeignKeyJoinParamsFactory.java new file mode 100644 index 000000000000..b01488ba5dfe --- /dev/null +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/ForeignKeyJoinParamsFactory.java @@ -0,0 +1,67 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.streams; + +import io.confluent.ksql.name.ColumnName; +import io.confluent.ksql.schema.ksql.Column; +import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.schema.ksql.LogicalSchema.Builder; +import java.util.Optional; + +public final class ForeignKeyJoinParamsFactory { + + private ForeignKeyJoinParamsFactory() { + } + + public static ForeignKeyJoinParams create( + final ColumnName leftJoinColumnName, + final LogicalSchema leftSchema, + final LogicalSchema rightSchema + ) { + if (rightSchema.key().size() != 1) { + throw new IllegalStateException("rightSchema must have single column key"); + } + return new ForeignKeyJoinParams<>( + createKeyExtractor(leftSchema, leftJoinColumnName), + new KsqlValueJoiner(leftSchema.value().size(), rightSchema.value().size(), 0), + createSchema(leftSchema, rightSchema) + ); + } + + public static LogicalSchema createSchema( + final LogicalSchema leftSchema, + final LogicalSchema rightSchema + ) { + final Builder builder = LogicalSchema.builder() + .keyColumns(leftSchema.key()) + .valueColumns(leftSchema.value()) + .valueColumns(rightSchema.value()); + + return builder.build(); + } + + private static KsqlKeyExtractor createKeyExtractor( + final LogicalSchema leftSchema, + final ColumnName leftJoinColumnName) { + + final Optional leftJoinColumn = leftSchema.findValueColumn(leftJoinColumnName); + if (!leftJoinColumn.isPresent()) { + throw new IllegalStateException("Could not find join column in left input table."); + } + + return new KsqlKeyExtractor<>(leftJoinColumn.get().index()); + } +} diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/ForeignKeyTableTableJoinBuilder.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/ForeignKeyTableTableJoinBuilder.java new file mode 100644 index 000000000000..da2c14f66a05 --- /dev/null +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/ForeignKeyTableTableJoinBuilder.java @@ -0,0 +1,66 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.streams; + +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.execution.plan.ForeignKeyTableTableJoin; +import io.confluent.ksql.execution.plan.KTableHolder; +import io.confluent.ksql.schema.ksql.LogicalSchema; +import org.apache.kafka.streams.kstream.KTable; + +public final class ForeignKeyTableTableJoinBuilder { + + private ForeignKeyTableTableJoinBuilder() { + } + + public static KTableHolder build( + final KTableHolder left, + final KTableHolder right, + final ForeignKeyTableTableJoin join + ) { + final LogicalSchema leftSchema = left.getSchema(); + final LogicalSchema rightSchema = right.getSchema(); + + final ForeignKeyJoinParams joinParams = ForeignKeyJoinParamsFactory + .create(join.getLeftJoinColumnName(), leftSchema, rightSchema); + + final KTable result; + switch (join.getJoinType()) { + case LEFT: + result = left.getTable().leftJoin( + right.getTable(), + joinParams.getKeyExtractor(), + joinParams.getJoiner() + ); + break; + case INNER: + result = left.getTable().join( + right.getTable(), + joinParams.getKeyExtractor(), + joinParams.getJoiner() + ); + break; + default: + throw new IllegalStateException("invalid join type: " + join.getJoinType()); + } + + return KTableHolder.unmaterialized( + result, + joinParams.getSchema(), + left.getExecutionKeyFactory() + ); + } +} \ No newline at end of file diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/KSPlanBuilder.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/KSPlanBuilder.java index f71805e4d961..02a145e6adca 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/KSPlanBuilder.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/KSPlanBuilder.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Confluent Inc. + * Copyright 2021 Confluent Inc. * * Licensed under the Confluent Community License (the "License"); you may not use * this file except in compliance with the License. You may obtain a copy of the @@ -16,6 +16,7 @@ package io.confluent.ksql.execution.streams; import io.confluent.ksql.GenericKey; +import io.confluent.ksql.execution.plan.ForeignKeyTableTableJoin; import io.confluent.ksql.execution.plan.KGroupedStreamHolder; import io.confluent.ksql.execution.plan.KGroupedTableHolder; import io.confluent.ksql.execution.plan.KStreamHolder; @@ -398,4 +399,16 @@ public KTableHolder visitTableTableJoin( final KTableHolder right = tableTableJoin.getRightSource().build(this, planInfo); return TableTableJoinBuilder.build(left, right, tableTableJoin); } + + @Override + public KTableHolder visitForeignKeyTableTableJoin( + final ForeignKeyTableTableJoin foreignKeyTableTableJoin, + final PlanInfo planInfo) { + + final KTableHolder left = + foreignKeyTableTableJoin.getLeftSource().build(this, planInfo); + final KTableHolder right = + foreignKeyTableTableJoin.getRightSource().build(this, planInfo); + return ForeignKeyTableTableJoinBuilder.build(left, right, foreignKeyTableTableJoin); + } } diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/KsqlKeyExtractor.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/KsqlKeyExtractor.java new file mode 100644 index 000000000000..bbb44a31170b --- /dev/null +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/KsqlKeyExtractor.java @@ -0,0 +1,66 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.streams; + +import static com.google.common.base.Preconditions.checkArgument; + +import io.confluent.ksql.GenericRow; +import java.util.Objects; +import java.util.function.Function; + +public final class KsqlKeyExtractor implements Function { + + private final int leftJoinColumnIndex; + + KsqlKeyExtractor(final int leftJoinColumnIndex) { + checkArgument( + leftJoinColumnIndex >= 0, + "leftJoinColumnIndex negative: " + leftJoinColumnIndex + ); + + this.leftJoinColumnIndex = leftJoinColumnIndex; + } + + @SuppressWarnings("unchecked") + @Override + public KRightT apply(final GenericRow left) { + return (KRightT) left.get(leftJoinColumnIndex); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final KsqlKeyExtractor that = (KsqlKeyExtractor) o; + return leftJoinColumnIndex == that.leftJoinColumnIndex; + } + + @Override + public int hashCode() { + return Objects.hash(leftJoinColumnIndex); + } + + @Override + public String toString() { + return "KsqlKeyExtractor{" + + "leftJoinColumnIndex=" + leftJoinColumnIndex + + '}'; + } +} diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/KsqlValueJoiner.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/KsqlValueJoiner.java index 87f779a98e74..e2fd252ea702 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/KsqlValueJoiner.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/KsqlValueJoiner.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Confluent Inc. + * Copyright 2018 Confluent Inc. * * Licensed under the Confluent Community License (the "License"); you may not use * this file except in compliance with the License. You may obtain a copy of the diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StepSchemaResolver.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StepSchemaResolver.java index 61b95444c2e9..99ea17926923 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StepSchemaResolver.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StepSchemaResolver.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Confluent Inc. + * Copyright 2021 Confluent Inc. * * Licensed under the Confluent Community License (the "License"); you may not use * this file except in compliance with the License. You may obtain a copy of the @@ -19,6 +19,7 @@ import io.confluent.ksql.execution.codegen.CompiledExpression; import io.confluent.ksql.execution.expression.tree.FunctionCall; import io.confluent.ksql.execution.plan.ExecutionStep; +import io.confluent.ksql.execution.plan.ForeignKeyTableTableJoin; import io.confluent.ksql.execution.plan.SelectExpression; import io.confluent.ksql.execution.plan.SourceStep; import io.confluent.ksql.execution.plan.StreamAggregate; @@ -68,7 +69,7 @@ public final class StepSchemaResolver { @SuppressWarnings("rawtypes") private static final HandlerMaps.ClassHandlerMapR2 HANDLERS - = HandlerMaps.forClass(ExecutionStep.class) + = HandlerMaps.forClass(ExecutionStep.class) .withArgTypes(StepSchemaResolver.class, LogicalSchema.class) .withReturnType(LogicalSchema.class) .put(StreamAggregate.class, StepSchemaResolver::handleStreamAggregate) @@ -99,12 +100,13 @@ public final class StepSchemaResolver { @SuppressWarnings("rawtypes") private static final HandlerMaps.ClassHandlerMapR2 JOIN_HANDLERS - = HandlerMaps.forClass(ExecutionStep.class) + = HandlerMaps.forClass(ExecutionStep.class) .withArgTypes(StepSchemaResolver.class, JoinSchemas.class) .withReturnType(LogicalSchema.class) .put(StreamTableJoin.class, StepSchemaResolver::handleStreamTableJoin) .put(StreamStreamJoin.class, StepSchemaResolver::handleStreamStreamJoin) .put(TableTableJoin.class, StepSchemaResolver::handleTableTableJoin) + .put(ForeignKeyTableTableJoin.class, StepSchemaResolver::handleForeignKeyTableTableJoin) .build(); private final KsqlConfig ksqlConfig; @@ -303,6 +305,13 @@ private LogicalSchema handleTableTableJoin( return handleJoin(schemas, step.getKeyColName()); } + private LogicalSchema handleForeignKeyTableTableJoin( + final JoinSchemas schemas, + final ForeignKeyTableTableJoin step + ) { + return ForeignKeyJoinParamsFactory.createSchema(schemas.left, schemas.right); + } + private LogicalSchema handleJoin( final JoinSchemas schemas, final ColumnName keyColName diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/TableTableJoinBuilder.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/TableTableJoinBuilder.java index 0b8a11c7a837..7ab507d24147 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/TableTableJoinBuilder.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/TableTableJoinBuilder.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Confluent Inc. + * Copyright 2021 Confluent Inc. * * Licensed under the Confluent Community License (the "License"); you may not use * this file except in compliance with the License. You may obtain a copy of the @@ -49,7 +49,7 @@ public static KTableHolder build( result = left.getTable().outerJoin(right.getTable(), joinParams.getJoiner()); break; default: - throw new IllegalStateException("invalid join type"); + throw new IllegalStateException("invalid join type: " + join.getJoinType()); } return KTableHolder.unmaterialized( diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/AggregateParamsFactoryTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/AggregateParamsFactoryTest.java index ffd2b002b341..12ca00270148 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/AggregateParamsFactoryTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/AggregateParamsFactoryTest.java @@ -1,3 +1,18 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + package io.confluent.ksql.execution.streams; import static io.confluent.ksql.GenericRow.genericRow; @@ -125,6 +140,7 @@ public void shouldCreateAggregatorWithCorrectParams() { verify(udafFactory).create(2, ImmutableList.of(agg0, agg1)); } + @SuppressWarnings("unchecked") @Test public void shouldCreateUndoAggregatorWithCorrectParams() { // When: @@ -143,7 +159,7 @@ public void shouldCreateUndoAggregatorWithCorrectParams() { @Test public void shouldReturnCorrectAggregator() { // When: - final KudafAggregator aggregator = aggregateParams.getAggregator(); + final KudafAggregator aggregator = aggregateParams.getAggregator(); // Then: assertThat(aggregator, is(aggregator)); diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/ForeignKeyJoinParamsFactoryTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/ForeignKeyJoinParamsFactoryTest.java new file mode 100644 index 000000000000..d4b4b1c1ab9b --- /dev/null +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/ForeignKeyJoinParamsFactoryTest.java @@ -0,0 +1,82 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.streams; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.junit.Assert.assertThrows; + +import io.confluent.ksql.name.ColumnName; +import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.schema.ksql.types.SqlTypes; +import org.junit.Test; + +public class ForeignKeyJoinParamsFactoryTest { + + private static final LogicalSchema LEFT_SCHEMA = LogicalSchema.builder() + .keyColumn(ColumnName.of("L_K"), SqlTypes.STRING) + .valueColumn(ColumnName.of("L_BLUE"), SqlTypes.STRING) + .valueColumn(ColumnName.of("L_FOREIGN_KEY"), SqlTypes.INTEGER) + .valueColumn(ColumnName.of("L_K"), SqlTypes.STRING) + .build(); + + private static final LogicalSchema RIGHT_SCHEMA = LogicalSchema.builder() + .keyColumn(ColumnName.of("R_K"), SqlTypes.INTEGER) + .valueColumn(ColumnName.of("R_RED"), SqlTypes.BIGINT) + .valueColumn(ColumnName.of("R_ORANGE"), SqlTypes.DOUBLE) + .valueColumn(ColumnName.of("R_K"), SqlTypes.INTEGER) + .build(); + + @Test + public void shouldBuildCorrectKeyedSchema() { + // Given: + final ColumnName leftJoinColumnName = ColumnName.of("L_FOREIGN_KEY"); + + // When: + final ForeignKeyJoinParams joinParams = + ForeignKeyJoinParamsFactory.create(leftJoinColumnName, LEFT_SCHEMA, RIGHT_SCHEMA); + + // Then: + assertThat(joinParams.getSchema(), is(LogicalSchema.builder() + .keyColumn(ColumnName.of("L_K"), SqlTypes.STRING) + .valueColumn(ColumnName.of("L_BLUE"), SqlTypes.STRING) + .valueColumn(ColumnName.of("L_FOREIGN_KEY"), SqlTypes.INTEGER) + .valueColumn(ColumnName.of("L_K"), SqlTypes.STRING) + .valueColumn(ColumnName.of("R_RED"), SqlTypes.BIGINT) + .valueColumn(ColumnName.of("R_ORANGE"), SqlTypes.DOUBLE) + .valueColumn(ColumnName.of("R_K"), SqlTypes.INTEGER) + .build()) + ); + } + + @Test + public void shouldThrowIfJoinColumnNotFound() { + // Given: + final ColumnName leftJoinColumnName = ColumnName.of("L_UNKNOWN"); + + final Exception e = assertThrows( + IllegalStateException.class, + () -> ForeignKeyJoinParamsFactory.create(leftJoinColumnName, LEFT_SCHEMA, RIGHT_SCHEMA) + ); + + // Then: + assertThat( + e.getMessage(), + containsString("Could not find join column in left input table.") + ); + } +} \ No newline at end of file diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/ForeignKeyTableTableJoinBuilderTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/ForeignKeyTableTableJoinBuilderTest.java new file mode 100644 index 000000000000..ce40a14eec62 --- /dev/null +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/ForeignKeyTableTableJoinBuilderTest.java @@ -0,0 +1,321 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.streams; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.execution.context.QueryContext; +import io.confluent.ksql.execution.materialization.MaterializationInfo; +import io.confluent.ksql.execution.plan.ExecutionKeyFactory; +import io.confluent.ksql.execution.plan.ExecutionStep; +import io.confluent.ksql.execution.plan.ExecutionStepPropertiesV1; +import io.confluent.ksql.execution.plan.ForeignKeyTableTableJoin; +import io.confluent.ksql.execution.plan.JoinType; +import io.confluent.ksql.execution.plan.KTableHolder; +import io.confluent.ksql.execution.plan.PlanBuilder; +import io.confluent.ksql.execution.plan.PlanInfo; +import io.confluent.ksql.execution.runtime.RuntimeBuildContext; +import io.confluent.ksql.name.ColumnName; +import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.schema.ksql.types.SqlTypes; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.streams.kstream.KTable; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class ForeignKeyTableTableJoinBuilderTest { + + private static final ColumnName L_KEY = ColumnName.of("L_KEY"); + private static final ColumnName L_KEY_2 = ColumnName.of("L_KEY_2"); + private static final ColumnName R_KEY = ColumnName.of("R_KEY"); + private static final ColumnName JOIN_COLUMN = ColumnName.of("L_FOREIGN_KEY"); + + private static final LogicalSchema LEFT_SCHEMA = LogicalSchema.builder() + .keyColumn(L_KEY, SqlTypes.STRING) + .valueColumn(ColumnName.of("L_GREEN"), SqlTypes.INTEGER) + .valueColumn(JOIN_COLUMN, SqlTypes.STRING) + .valueColumn(L_KEY, SqlTypes.STRING) // Copy of key in value + .build(); + + private static final LogicalSchema LEFT_SCHEMA_MULTI_KEY = LogicalSchema.builder() + .keyColumn(L_KEY, SqlTypes.STRING) + .keyColumn(L_KEY_2, SqlTypes.STRING) + .valueColumn(JOIN_COLUMN, SqlTypes.STRING) + .valueColumn(ColumnName.of("L_GREEN"), SqlTypes.INTEGER) + .valueColumn(L_KEY, SqlTypes.STRING) // Copy of key in value + .valueColumn(L_KEY_2, SqlTypes.STRING) // Copy of key in value + .build(); + + private static final LogicalSchema RIGHT_SCHEMA = LogicalSchema.builder() + .keyColumn(R_KEY, SqlTypes.STRING) + .valueColumn(ColumnName.of("R_RED"), SqlTypes.BIGINT) + .valueColumn(ColumnName.of("R_ORANGE"), SqlTypes.DOUBLE) + .valueColumn(R_KEY, SqlTypes.STRING) // Copy of key in value + .build(); + + @Mock + private QueryContext ctx; + @Mock + private KTable leftKTable; + @Mock + private KTable leftKTableMultiKey; + @Mock + private KTable rightKTable; + @Mock + private KTable resultKTable; + @Mock + private ExecutionStep> left; + @Mock + private ExecutionStep> leftMultiKey; + @Mock + private ExecutionStep> right; + @Mock + private ExecutionKeyFactory executionKeyFactory; + @Mock + private PlanInfo planInfo; + @Mock + private MaterializationInfo.Builder materializationBuilder; + + private PlanBuilder planBuilder; + private ForeignKeyTableTableJoin join; + + @SuppressWarnings("unchecked") + @Before + public void init() { + when(left.build(any(), eq(planInfo))).thenReturn( + KTableHolder.materialized(leftKTable, LEFT_SCHEMA, executionKeyFactory, materializationBuilder)); + when(leftMultiKey.build(any(), eq(planInfo))).thenReturn( + KTableHolder.materialized(leftKTableMultiKey, LEFT_SCHEMA_MULTI_KEY, executionKeyFactory, materializationBuilder)); + when(right.build(any(), eq(planInfo))).thenReturn( + KTableHolder.materialized(rightKTable, RIGHT_SCHEMA, executionKeyFactory, materializationBuilder)); + + when(leftKTable.leftJoin(any(KTable.class), any(KsqlKeyExtractor.class), any())).thenReturn(resultKTable); + when(leftKTable.join(any(KTable.class), any(KsqlKeyExtractor.class), any())).thenReturn(resultKTable); + when(leftKTableMultiKey.leftJoin(any(KTable.class), any(KsqlKeyExtractor.class), any())).thenReturn(resultKTable); + when(leftKTableMultiKey.join(any(KTable.class), any(KsqlKeyExtractor.class), any())).thenReturn(resultKTable); + + planBuilder = new KSPlanBuilder( + mock(RuntimeBuildContext.class), + mock(SqlPredicateFactory.class), + mock(AggregateParamsFactory.class), + mock(StreamsFactories.class) + ); + } + + @Test + public void shouldDoLeftJoinOnNonKey() { + // Given: + givenLeftJoin(left, JOIN_COLUMN); + + // When: + final KTableHolder result = join.build(planBuilder, planInfo); + + // Then: + verify(leftKTable).leftJoin( + same(rightKTable), + eq(new KsqlKeyExtractor<>(1)), + eq(new KsqlValueJoiner(LEFT_SCHEMA.value().size(), RIGHT_SCHEMA.value().size(), 0)) + ); + verifyNoMoreInteractions(leftKTable, rightKTable, resultKTable); + assertThat(result.getTable(), is(resultKTable)); + assertThat(result.getExecutionKeyFactory(), is(executionKeyFactory)); + } + + // this is actually a PK-PK join and the logical planner would not compile a FK-join plan + // for this case atm + // however, from a physical plan POV this should still work, so we would like to keep this test + // + // it might be possible to actually change the logical planner to compile a PK-PK join as + // FK-join if input tables are not co-partitioned (instead of throwing an error an rejecting + // the query), ie, if key-format or partition-count do not match -- it's an open question + // if it would be a good idea to do this though + @Test + public void shouldDoLeftJoinOnKey() { + // Given: + givenLeftJoin(left, L_KEY); + + // When: + final KTableHolder result = join.build(planBuilder, planInfo); + + // Then: + verify(leftKTable).leftJoin( + same(rightKTable), + eq(new KsqlKeyExtractor<>(2)), + eq(new KsqlValueJoiner(LEFT_SCHEMA.value().size(), RIGHT_SCHEMA.value().size(), 0)) + ); + verifyNoMoreInteractions(leftKTable, rightKTable, resultKTable); + assertThat(result.getTable(), is(resultKTable)); + assertThat(result.getExecutionKeyFactory(), is(executionKeyFactory)); + } + + @Test + public void shouldDoLeftJoinOnSubKey() { + // Given: + givenLeftJoin(leftMultiKey, L_KEY_2); + + // When: + final KTableHolder result = join.build(planBuilder, planInfo); + + // Then: + verify(leftKTableMultiKey).leftJoin( + same(rightKTable), + eq(new KsqlKeyExtractor<>(3)), + eq(new KsqlValueJoiner(LEFT_SCHEMA_MULTI_KEY.value().size(), RIGHT_SCHEMA.value().size(), 0)) + ); + verifyNoMoreInteractions(leftKTable, rightKTable, resultKTable); + assertThat(result.getTable(), is(resultKTable)); + assertThat(result.getExecutionKeyFactory(), is(executionKeyFactory)); + } + + @Test + public void shouldDoInnerJoinOnNonKey() { + // Given: + givenInnerJoin(left, JOIN_COLUMN); + + // When: + final KTableHolder result = join.build(planBuilder, planInfo); + + // Then: + verify(leftKTable).join( + same(rightKTable), + eq(new KsqlKeyExtractor<>(1)), + eq(new KsqlValueJoiner(LEFT_SCHEMA.value().size(), RIGHT_SCHEMA.value().size(), 0)) + ); + verifyNoMoreInteractions(leftKTable, rightKTable, resultKTable); + assertThat(result.getTable(), is(resultKTable)); + assertThat(result.getExecutionKeyFactory(), is(executionKeyFactory)); + } + + // this is actually a PK-PK join and the logical planner would not compile a FK-join plan + // for this case atm + // however, from a physical plan POV this should still work, so we would like to keep this test + // + // it might be possible to actually change the logical planner to compile a PK-PK join as + // FK-join if input tables are not co-partitioned (instead of throwing an error an rejecting + // the query), ie, if key-format or partition-count do not match -- it's an open question + // if it would be a good idea to do this though + @Test + public void shouldDoInnerJoinOnKey() { + // Given: + givenInnerJoin(left, L_KEY); + + // When: + final KTableHolder result = join.build(planBuilder, planInfo); + + // Then: + verify(leftKTable).join( + same(rightKTable), + eq(new KsqlKeyExtractor<>(2)), + eq(new KsqlValueJoiner(LEFT_SCHEMA.value().size(), RIGHT_SCHEMA.value().size(), 0)) + ); + verifyNoMoreInteractions(leftKTable, rightKTable, resultKTable); + assertThat(result.getTable(), is(resultKTable)); + assertThat(result.getExecutionKeyFactory(), is(executionKeyFactory)); + } + + @Test + public void shouldDoInnerJoinOnSubKey() { + // Given: + givenInnerJoin(leftMultiKey, L_KEY_2); + + // When: + final KTableHolder result = join.build(planBuilder, planInfo); + + // Then: + verify(leftKTableMultiKey).join( + same(rightKTable), + eq(new KsqlKeyExtractor<>(3)), + eq(new KsqlValueJoiner(LEFT_SCHEMA_MULTI_KEY.value().size(), RIGHT_SCHEMA.value().size(), 0)) + ); + verifyNoMoreInteractions(leftKTable, rightKTable, resultKTable); + assertThat(result.getTable(), is(resultKTable)); + assertThat(result.getExecutionKeyFactory(), is(executionKeyFactory)); + } + + @Test + public void shouldReturnCorrectSchema() { + // Given: + givenInnerJoin(left, JOIN_COLUMN); + + // When: + final KTableHolder result = join.build(planBuilder, planInfo); + + // Then: + assertThat( + result.getSchema(), + is(LogicalSchema.builder() + .keyColumns(LEFT_SCHEMA.key()) + .valueColumns(LEFT_SCHEMA.value()) + .valueColumns(RIGHT_SCHEMA.value()) + .build() + ) + ); + } + + @Test + public void shouldReturnCorrectSchemaMultiKey() { + // Given: + givenInnerJoin(leftMultiKey, L_KEY); + + // When: + final KTableHolder result = join.build(planBuilder, planInfo); + + // Then: + assertThat( + result.getSchema(), + is(LogicalSchema.builder() + .keyColumns(LEFT_SCHEMA_MULTI_KEY.key()) + .valueColumns(LEFT_SCHEMA_MULTI_KEY.value()) + .valueColumns(RIGHT_SCHEMA.value()) + .build()) + ); + } + + private void givenLeftJoin(final ExecutionStep> left, + final ColumnName leftJoinColumnName) { + join = new ForeignKeyTableTableJoin<>( + new ExecutionStepPropertiesV1(ctx), + JoinType.LEFT, + leftJoinColumnName, + left, + right + ); + } + + private void givenInnerJoin(final ExecutionStep> left, + final ColumnName leftJoinColumnName) { + join = new ForeignKeyTableTableJoin<>( + new ExecutionStepPropertiesV1(ctx), + JoinType.INNER, + leftJoinColumnName, + left, + right + ); + } +} diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/JoinParamsFactoryTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/JoinParamsFactoryTest.java index 6c481cc3859e..15321f74d289 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/JoinParamsFactoryTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/JoinParamsFactoryTest.java @@ -1,3 +1,18 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + package io.confluent.ksql.execution.streams; import static org.hamcrest.CoreMatchers.is; @@ -28,15 +43,13 @@ public class JoinParamsFactoryTest { .valueColumn(ColumnName.of("R_K"), SqlTypes.STRING) // Copy of key in value .build(); - private JoinParams joinParams; - @Test public void shouldBuildCorrectLeftKeyedSchema() { // Given: final ColumnName keyName = Iterables.getOnlyElement(LEFT_SCHEMA.key()).name(); // When: - joinParams = JoinParamsFactory.create(keyName, LEFT_SCHEMA, RIGHT_SCHEMA); + final JoinParams joinParams = JoinParamsFactory.create(keyName, LEFT_SCHEMA, RIGHT_SCHEMA); // Then: assertThat(joinParams.getSchema(), is(LogicalSchema.builder() @@ -57,7 +70,7 @@ public void shouldBuildCorrectRightKeyedSchema() { final ColumnName keyName = Iterables.getOnlyElement(RIGHT_SCHEMA.key()).name(); // When: - joinParams = JoinParamsFactory.create(keyName, LEFT_SCHEMA, RIGHT_SCHEMA); + final JoinParams joinParams = JoinParamsFactory.create(keyName, LEFT_SCHEMA, RIGHT_SCHEMA); // Then: assertThat(joinParams.getSchema(), is(LogicalSchema.builder() @@ -78,7 +91,7 @@ public void shouldBuildCorrectSyntheticKeyedSchema() { final ColumnName keyName = ColumnName.of("OTHER"); // When: - joinParams = JoinParamsFactory.create(keyName, LEFT_SCHEMA, RIGHT_SCHEMA); + final JoinParams joinParams = JoinParamsFactory.create(keyName, LEFT_SCHEMA, RIGHT_SCHEMA); // Then: assertThat(joinParams.getSchema(), is(LogicalSchema.builder() diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/KsqlValueJoinerTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/KsqlValueJoinerTest.java index e5007085b319..120b48341d2e 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/KsqlValueJoinerTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/KsqlValueJoinerTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2018 Confluent Inc. + * Copyright 2019 Confluent Inc. * * Licensed under the Confluent Community License (the "License"); you may not use * this file except in compliance with the License. You may obtain a copy of the diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/MaterializedFactoryTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/MaterializedFactoryTest.java index 4eb8e1424def..d320fc79a95f 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/MaterializedFactoryTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/MaterializedFactoryTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2018 Confluent Inc. + * Copyright 2021 Confluent Inc. * * Licensed under the Confluent Community License (the "License"); you may not use * this file except in compliance with the License. You may obtain a copy of the @@ -47,7 +47,7 @@ public class MaterializedFactoryTest { private Optional retention; @Test - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "rawtypes"}) public void shouldCreateJoinedCorrectlyWhenOptimizationsEnabled() { // Given: final Materialized asName = mock(Materialized.class); diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/PartitionByParamsFactoryTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/PartitionByParamsFactoryTest.java index 4a29d3f61921..84db7f1ba381 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/PartitionByParamsFactoryTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/PartitionByParamsFactoryTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Confluent Inc. + * Copyright 2021 Confluent Inc. * * Licensed under the Confluent Community License (the "License"); you may not use * this file except in compliance with the License. You may obtain a copy of the @@ -316,7 +316,7 @@ public void shouldPartitionByNullAnyRowsWhereFailedToExtractKey() { final KeyValue result = mapper.apply(key, value); // Then: - assertThat(result.key, is(genericKey((Object) null, COL1_VALUE))); + assertThat(result.key, is(genericKey(null, COL1_VALUE))); } @Test diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/PlanInfoExtractorTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/PlanInfoExtractorTest.java index 0fce07a500d2..abe22e3d251a 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/PlanInfoExtractorTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/PlanInfoExtractorTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Confluent Inc. + * Copyright 2021 Confluent Inc. * * Licensed under the Confluent Community License (final the "License"); you may not use * this file except in compliance with the License. You may obtain a copy of the @@ -22,7 +22,6 @@ import io.confluent.ksql.GenericKey; import io.confluent.ksql.execution.context.QueryContext; import io.confluent.ksql.execution.expression.tree.Expression; -import io.confluent.ksql.execution.expression.tree.UnqualifiedColumnReferenceExp; import io.confluent.ksql.execution.plan.ExecutionStepPropertiesV1; import io.confluent.ksql.execution.plan.Formats; import io.confluent.ksql.execution.plan.JoinType; @@ -34,7 +33,6 @@ import io.confluent.ksql.execution.plan.TableSource; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.schema.ksql.LogicalSchema; -import java.util.List; import java.util.Optional; import org.junit.Before; import org.junit.Test; diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/SourceBuilderTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/SourceBuilderTest.java index 9034b2de0fbb..0149cb956fc5 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/SourceBuilderTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/SourceBuilderTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2018 Confluent Inc. + * Copyright 2021 Confluent Inc. * * Licensed under the Confluent Community License (the "License"); you may not use * this file except in compliance with the License. You may obtain a copy of the @@ -52,7 +52,6 @@ import io.confluent.ksql.execution.timestamp.TimestampColumn; import io.confluent.ksql.logging.processing.ProcessingLogger; import io.confluent.ksql.name.ColumnName; -import io.confluent.ksql.query.QueryId; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.PhysicalSchema; import io.confluent.ksql.schema.ksql.types.SqlTypes; @@ -93,7 +92,6 @@ import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; - public class SourceBuilderTest { private static final ColumnName K0 = ColumnName.of("k0"); @@ -202,7 +200,7 @@ public class SourceBuilderTest { public final MockitoRule mockitoRule = MockitoJUnit.rule(); @Before - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "rawtypes"}) public void setup() { when(buildContext.getApplicationId()).thenReturn("appid"); when(buildContext.getStreamsBuilder()).thenReturn(streamsBuilder); @@ -797,7 +795,8 @@ private ValueTransformerWithKey getTransformerFro ) { streamSource.build(planBuilder, planInfo); verify(kStream).transformValues(transformSupplierCaptor.capture()); - final ValueTransformerWithKey transformer = transformSupplierCaptor.getValue().get(); + final ValueTransformerWithKey transformer = + (ValueTransformerWithKey) transformSupplierCaptor.getValue().get(); transformer.init(processorCtx); return transformer; } @@ -808,7 +807,8 @@ private ValueTransformerWithKey getTransformerFro ) { streamSource.build(planBuilder, planInfo); verify(kTable).transformValues(transformSupplierCaptor.capture()); - final ValueTransformerWithKey transformer = transformSupplierCaptor.getValue().get(); + final ValueTransformerWithKey transformer = + (ValueTransformerWithKey) transformSupplierCaptor.getValue().get(); transformer.init(processorCtx); return transformer; } diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StepSchemaResolverTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StepSchemaResolverTest.java index 96cee7035a15..7c93975e173f 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StepSchemaResolverTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StepSchemaResolverTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Confluent Inc. + * Copyright 2021 Confluent Inc. * * Licensed under the Confluent Community License (the "License"); you may not use * this file except in compliance with the License. You may obtain a copy of the @@ -575,7 +575,7 @@ private void givenTableFunction(final String name, final SqlType returnType) { when(tableFunction.getReturnType(any())).thenReturn(returnType); } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "rawtypes"}) private void givenAggregateFunction(final String name, final SqlType returnType) { final KsqlAggregateFunction aggregateFunction = mock(KsqlAggregateFunction.class); when(functionRegistry.getAggregateFunction(eq(FunctionName.of(name)), any(), any())) diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java index d8cf68184bdb..bf5fe6578bfc 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Confluent Inc. + * Copyright 2021 Confluent Inc. * * Licensed under the Confluent Community License (the "License"); you may not use * this file except in compliance with the License. You may obtain a copy of the @@ -203,7 +203,7 @@ public class StreamAggregateBuilderTest { private StreamAggregate aggregate; private StreamWindowedAggregate windowedAggregate; - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "rawtypes"}) @Before public void init() { when(sourceStep.build(any(), eq(planInfo))).thenReturn(KGroupedStreamHolder.of(groupedStream, INPUT_SCHEMA)); @@ -234,7 +234,7 @@ public void init() { ); } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "rawtypes"}) private void givenUnwindowedAggregate() { when(materializedFactory.>create(any(), any(), any())) .thenReturn(materialized); @@ -250,7 +250,7 @@ private void givenUnwindowedAggregate() { ); } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "rawtypes"}) private void givenTimeWindowedAggregate() { when(materializedFactory.>create(any(), any(), any(), any())) .thenReturn(timeWindowMaterialized); @@ -298,7 +298,7 @@ private void givenHoppingWindowedAggregate() { ); } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "rawtypes"}) private void givenSessionWindowedAggregate() { when(materializedFactory.>create(any(), any(), any(), any())) .thenReturn(sessionWindowMaterialized); diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamFilterBuilderTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamFilterBuilderTest.java index dfc0a298eb52..23d85b1062f6 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamFilterBuilderTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamFilterBuilderTest.java @@ -1,3 +1,18 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + package io.confluent.ksql.execution.streams; import static org.hamcrest.MatcherAssert.assertThat; @@ -24,7 +39,6 @@ import io.confluent.ksql.execution.transform.sqlpredicate.SqlPredicate; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.logging.processing.ProcessingLogger; -import io.confluent.ksql.query.QueryId; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.util.KsqlConfig; import java.util.Optional; @@ -81,7 +95,7 @@ public class StreamFilterBuilderTest { public final MockitoRule mockitoRule = MockitoJUnit.rule(); @Before - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "rawtypes"}) public void init() { when(buildContext.getKsqlConfig()).thenReturn(ksqlConfig); when(buildContext.getFunctionRegistry()).thenReturn(functionRegistry); diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamGroupByBuilderTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamGroupByBuilderTest.java index 60a52046fb61..001d926ff390 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamGroupByBuilderTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamGroupByBuilderTest.java @@ -1,3 +1,18 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + package io.confluent.ksql.execution.streams; import static org.hamcrest.MatcherAssert.assertThat; diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamGroupByBuilderV1Test.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamGroupByBuilderV1Test.java index 42b4400b7edc..25406ea576e7 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamGroupByBuilderV1Test.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamGroupByBuilderV1Test.java @@ -1,3 +1,18 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + package io.confluent.ksql.execution.streams; import static org.hamcrest.MatcherAssert.assertThat; diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSelectKeyBuilderV1Test.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSelectKeyBuilderV1Test.java index 27dfe53a6c98..ab9634e3c8cd 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSelectKeyBuilderV1Test.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSelectKeyBuilderV1Test.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Confluent Inc. + * Copyright 2021 Confluent Inc. * * Licensed under the Confluent Community License (the "License"); you may not use * this file except in compliance with the License. You may obtain a copy of the @@ -111,7 +111,7 @@ public class StreamSelectKeyBuilderV1Test { private StreamSelectKeyV1 selectKey; @Before - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "rawtypes"}) public void init() { when(buildContext.getFunctionRegistry()).thenReturn(functionRegistry); when(buildContext.getKsqlConfig()).thenReturn(new KsqlConfig(ImmutableMap.of())); diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamTableJoinBuilderTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamTableJoinBuilderTest.java index bedc93ebcb5d..8cf85d2de670 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamTableJoinBuilderTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/StreamTableJoinBuilderTest.java @@ -1,3 +1,18 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + package io.confluent.ksql.execution.streams; import io.confluent.ksql.execution.materialization.MaterializationInfo; diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/TableAggregateBuilderTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/TableAggregateBuilderTest.java index cfe102d00cd2..176a4e20fe19 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/TableAggregateBuilderTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/TableAggregateBuilderTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Confluent Inc. + * Copyright 2021 Confluent Inc. * * Licensed under the Confluent Community License (the "License"); you may not use * this file except in compliance with the License. You may obtain a copy of the @@ -155,7 +155,7 @@ public class TableAggregateBuilderTest { private TableAggregate aggregate; @Before - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "rawtypes"}) public void init() { when(buildContext.buildKeySerde(any(), any(), any())).thenReturn(keySerde); when(buildContext.buildValueSerde(any(), any(), any())).thenReturn(valueSerde); diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/TableFilterBuilderTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/TableFilterBuilderTest.java index f6e52d224856..3d942d54349d 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/TableFilterBuilderTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/TableFilterBuilderTest.java @@ -1,47 +1,61 @@ - package io.confluent.ksql.execution.streams; - - import static org.hamcrest.MatcherAssert.assertThat; - import static org.hamcrest.Matchers.is; - import static org.mockito.ArgumentMatchers.any; - import static org.mockito.ArgumentMatchers.eq; - import static org.mockito.Mockito.mock; - import static org.mockito.Mockito.verify; - import static org.mockito.Mockito.when; - - import io.confluent.ksql.GenericRow; - import io.confluent.ksql.execution.runtime.RuntimeBuildContext; - import io.confluent.ksql.execution.context.QueryContext; - import io.confluent.ksql.execution.expression.tree.Expression; - import io.confluent.ksql.execution.materialization.MaterializationInfo; - import io.confluent.ksql.execution.materialization.MaterializationInfo.TransformFactory; - import io.confluent.ksql.execution.plan.ExecutionStep; - import io.confluent.ksql.execution.plan.ExecutionStepPropertiesV1; - import io.confluent.ksql.execution.plan.PlanInfo; - import io.confluent.ksql.execution.plan.KTableHolder; - import io.confluent.ksql.execution.plan.ExecutionKeyFactory; - import io.confluent.ksql.execution.plan.PlanBuilder; - import io.confluent.ksql.execution.plan.TableFilter; - import io.confluent.ksql.execution.transform.KsqlProcessingContext; - import io.confluent.ksql.execution.transform.KsqlTransformer; - import io.confluent.ksql.execution.transform.sqlpredicate.SqlPredicate; - import io.confluent.ksql.function.FunctionRegistry; - import io.confluent.ksql.logging.processing.ProcessingLogger; - import io.confluent.ksql.query.QueryId; - import io.confluent.ksql.schema.ksql.LogicalSchema; - import io.confluent.ksql.util.KsqlConfig; - import java.util.Optional; - import org.apache.kafka.connect.data.Struct; - import org.apache.kafka.streams.kstream.KTable; - import org.apache.kafka.streams.kstream.Named; - import org.apache.kafka.streams.kstream.ValueMapper; - import org.junit.Before; - import org.junit.Rule; - import org.junit.Test; - import org.mockito.ArgumentCaptor; - import org.mockito.Captor; - import org.mockito.Mock; - import org.mockito.junit.MockitoJUnit; - import org.mockito.junit.MockitoRule; +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.streams; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.execution.runtime.RuntimeBuildContext; +import io.confluent.ksql.execution.context.QueryContext; +import io.confluent.ksql.execution.expression.tree.Expression; +import io.confluent.ksql.execution.materialization.MaterializationInfo; +import io.confluent.ksql.execution.materialization.MaterializationInfo.TransformFactory; +import io.confluent.ksql.execution.plan.ExecutionStep; +import io.confluent.ksql.execution.plan.ExecutionStepPropertiesV1; +import io.confluent.ksql.execution.plan.PlanInfo; +import io.confluent.ksql.execution.plan.KTableHolder; +import io.confluent.ksql.execution.plan.ExecutionKeyFactory; +import io.confluent.ksql.execution.plan.PlanBuilder; +import io.confluent.ksql.execution.plan.TableFilter; +import io.confluent.ksql.execution.transform.KsqlProcessingContext; +import io.confluent.ksql.execution.transform.KsqlTransformer; +import io.confluent.ksql.execution.transform.sqlpredicate.SqlPredicate; +import io.confluent.ksql.function.FunctionRegistry; +import io.confluent.ksql.logging.processing.ProcessingLogger; +import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.util.KsqlConfig; +import java.util.Optional; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Named; +import org.apache.kafka.streams.kstream.ValueMapper; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; public class TableFilterBuilderTest { @@ -102,14 +116,14 @@ public class TableFilterBuilderTest { public final MockitoRule mockitoRule = MockitoJUnit.rule(); @Before - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "rawtypes"}) public void init() { when(buildContext.getKsqlConfig()).thenReturn(ksqlConfig); when(buildContext.getFunctionRegistry()).thenReturn(functionRegistry); when(buildContext.getProcessingLogger(any())).thenReturn(processingLogger); when(sourceStep.getProperties()).thenReturn(sourceProperties); when(sourceKTable.transformValues(any(), any(Named.class))).thenReturn((KTable)preKTable); - when(preKTable.filter(any(), any(Named.class))).thenReturn((KTable)filteredKTable); + when(preKTable.filter(any(), any(Named.class))).thenReturn(filteredKTable); when(filteredKTable.mapValues(any(ValueMapper.class), any(Named.class))).thenReturn(postKTable); when(predicateFactory.create(any(), any(), any(), any())).thenReturn(sqlPredicate); when(sqlPredicate.getTransformer(any())).thenReturn((KsqlTransformer) preTransformer); diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/TableSuppressBuilderTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/TableSuppressBuilderTest.java index 4564a2ae623a..a67953189dbe 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/TableSuppressBuilderTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/TableSuppressBuilderTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Confluent Inc. + * Copyright 2021 Confluent Inc. * * Licensed under the Confluent Community License (the "License"); you may not use * this file except in compliance with the License. You may obtain a copy of the @@ -123,7 +123,7 @@ public void init() { } @Test - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "rawtypes"}) public void shouldSuppressSourceTable() { // When: final KTableHolder result = builder.build(tableHolder, tableSuppress, buildContext, executionKeyFactory, physicalSchemaFactory, materializedFactory); diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/TableTableJoinBuilderTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/TableTableJoinBuilderTest.java index ec4771e98964..e2e475213f19 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/TableTableJoinBuilderTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/TableTableJoinBuilderTest.java @@ -1,3 +1,18 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + package io.confluent.ksql.execution.streams; import io.confluent.ksql.execution.materialization.MaterializationInfo; @@ -46,14 +61,14 @@ public class TableTableJoinBuilderTest { .keyColumn(L_KEY, SqlTypes.STRING) .valueColumn(ColumnName.of("L_BLUE"), SqlTypes.STRING) .valueColumn(ColumnName.of("L_GREEN"), SqlTypes.INTEGER) - .valueColumn(L_KEY, SqlTypes.STRING) + .valueColumn(L_KEY, SqlTypes.STRING) // Copy of key in value .build(); private static final LogicalSchema RIGHT_SCHEMA = LogicalSchema.builder() .keyColumn(R_KEY, SqlTypes.STRING) .valueColumn(ColumnName.of("R_RED"), SqlTypes.BIGINT) .valueColumn(ColumnName.of("R_ORANGE"), SqlTypes.DOUBLE) - .valueColumn(R_KEY, SqlTypes.STRING) + .valueColumn(R_KEY, SqlTypes.STRING) // Copy of key in value .build(); @Mock @@ -193,7 +208,12 @@ public void shouldReturnCorrectSchema() { // Then: assertThat( result.getSchema(), - is(JoinParamsFactory.create(R_KEY, LEFT_SCHEMA, RIGHT_SCHEMA).getSchema()) + is(LogicalSchema.builder() + .keyColumns(RIGHT_SCHEMA.key()) + .valueColumns(LEFT_SCHEMA.value()) + .valueColumns(RIGHT_SCHEMA.value()) + .build() + ) ); } @@ -208,7 +228,12 @@ public void shouldReturnCorrectSchemaWithSyntheticKey() { // Then: assertThat( result.getSchema(), - is(JoinParamsFactory.create(SYNTH_KEY, LEFT_SCHEMA, RIGHT_SCHEMA).getSchema()) + is(LogicalSchema.builder() + .keyColumn(SYNTH_KEY, SqlTypes.STRING) + .valueColumns(LEFT_SCHEMA.value()) + .valueColumns(RIGHT_SCHEMA.value()) + .valueColumn(SYNTH_KEY, SqlTypes.STRING) + .build()) ); } @@ -229,7 +254,12 @@ public void shouldReturnCorrectLegacySchema() { // Then: assertThat( result.getSchema(), - is(JoinParamsFactory.create(ROWKEY_NAME, LEFT_SCHEMA, RIGHT_SCHEMA).getSchema()) + is(LogicalSchema.builder() + .keyColumn(ROWKEY_NAME, SqlTypes.STRING) + .valueColumns(LEFT_SCHEMA.value()) + .valueColumns(RIGHT_SCHEMA.value()) + .valueColumn(ROWKEY_NAME, SqlTypes.STRING) + .build()) ); }