Skip to content

Commit

Permalink
feat: build physical plan for FK table-table joins (#7517)
Browse files Browse the repository at this point in the history
  • Loading branch information
mjsax committed May 14, 2021
1 parent 605e99b commit 744fa36
Show file tree
Hide file tree
Showing 36 changed files with 1,150 additions and 99 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Confluent Inc.
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
Expand All @@ -19,6 +19,7 @@
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.execution.context.QueryLoggerUtil;
import io.confluent.ksql.execution.plan.ExecutionStep;
import io.confluent.ksql.execution.plan.ForeignKeyTableTableJoin;
import io.confluent.ksql.execution.plan.SourceStep;
import io.confluent.ksql.execution.plan.StreamAggregate;
import io.confluent.ksql.execution.plan.StreamFilter;
Expand Down Expand Up @@ -93,6 +94,7 @@ public class PlanSummary {
.put(TableSelectKey.class, "REKEY")
.put(TableSink.class, "SINK")
.put(TableTableJoin.class, "JOIN")
.put(ForeignKeyTableTableJoin.class, "JOIN")
.put(TableSource.class, "SOURCE")
.put(TableSuppress.class, "SUPPRESS")
.put(WindowedTableSource.class, "SOURCE")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Confluent Inc.
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License; you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -53,7 +53,8 @@
@Type(value = TableSelectKey.class, name = "tableSelectKeyV1"),
@Type(value = TableSink.class, name = "tableSinkV1"),
@Type(value = TableSuppress.class, name = "tableSuppressV1"),
@Type(value = TableTableJoin.class, name = "tableTableJoinV1")
@Type(value = TableTableJoin.class, name = "tableTableJoinV1"),
@Type(value = ForeignKeyTableTableJoin.class, name = "fkTableTableJoinV1")
})
@Immutable
public interface ExecutionStep<S> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License; you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.execution.plan;

import static java.util.Objects.requireNonNull;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.name.ColumnName;
import java.util.List;
import java.util.Objects;

@Immutable
public class ForeignKeyTableTableJoin<KLeftT, KRightT>
implements ExecutionStep<KTableHolder<KLeftT>> {

private final ExecutionStepPropertiesV1 properties;
private final JoinType joinType;
private final ColumnName leftJoinColumnName;
private final ExecutionStep<KTableHolder<KLeftT>> leftSource;
private final ExecutionStep<KTableHolder<KRightT>> rightSource;

@JsonCreator
public ForeignKeyTableTableJoin(
@JsonProperty(value = "properties", required = true)
final ExecutionStepPropertiesV1 props,
@JsonProperty(value = "joinType", required = true)
final JoinType joinType,
@JsonProperty(value = "leftJoinColumnName", required = true)
final ColumnName leftJoinColumnName,
@JsonProperty(value = "leftSource", required = true)
final ExecutionStep<KTableHolder<KLeftT>> leftSource,
@JsonProperty(value = "rightSource", required = true)
final ExecutionStep<KTableHolder<KRightT>> rightSource
) {
this.properties = requireNonNull(props, "props");
this.joinType = requireNonNull(joinType, "joinType");
if (joinType == JoinType.OUTER) {
throw new IllegalArgumentException("OUTER join not supported.");
}
this.leftJoinColumnName = requireNonNull(leftJoinColumnName, "leftJoinColumnName");
this.leftSource = requireNonNull(leftSource, "leftSource");
this.rightSource = requireNonNull(rightSource, "rightSource");
}

@Override
public ExecutionStepPropertiesV1 getProperties() {
return properties;
}

@Override
@JsonIgnore
public List<ExecutionStep<?>> getSources() {
return ImmutableList.of(leftSource, rightSource);
}

public ExecutionStep<KTableHolder<KLeftT>> getLeftSource() {
return leftSource;
}

public ExecutionStep<KTableHolder<KRightT>> getRightSource() {
return rightSource;
}

public JoinType getJoinType() {
return joinType;
}

public ColumnName getLeftJoinColumnName() {
return leftJoinColumnName;
}

@Override
public KTableHolder<KLeftT> build(final PlanBuilder builder, final PlanInfo info) {
return builder.visitForeignKeyTableTableJoin(this, info);
}

@Override
public PlanInfo extractPlanInfo(final PlanInfoExtractor extractor) {
return extractor.visitForeignKeyTableTableJoin(this);
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final ForeignKeyTableTableJoin<?, ?> that = (ForeignKeyTableTableJoin<?, ?>) o;
return Objects.equals(properties, that.properties)
&& joinType == that.joinType
&& Objects.equals(leftJoinColumnName, that.leftJoinColumnName)
&& Objects.equals(leftSource, that.leftSource)
&& Objects.equals(rightSource, that.rightSource);
}

@Override
public int hashCode() {
return Objects.hash(properties, joinType, leftJoinColumnName, leftSource, rightSource);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Confluent Inc.
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
Expand Down Expand Up @@ -82,4 +82,9 @@ KTableHolder<Windowed<GenericKey>> visitStreamWindowedAggregate(
<K> KTableHolder<K> visitTableSuppress(TableSuppress<K> tableSuppress, PlanInfo planInfo);

<K> KTableHolder<K> visitTableTableJoin(TableTableJoin<K> tableTableJoin, PlanInfo planInfo);

<KLeftT, KRightT> KTableHolder<KLeftT> visitForeignKeyTableTableJoin(
ForeignKeyTableTableJoin<KLeftT, KRightT> foreignKeyTableTableJoin,
PlanInfo planInfo
);
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 Confluent Inc.
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (final the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
Expand Down Expand Up @@ -130,6 +130,12 @@ public <K> PlanInfo visitTableTableJoin(final TableTableJoin<K> tableTableJoin)
return visitJoinStep(tableTableJoin);
}

public <KLeftT, KRightT> PlanInfo visitForeignKeyTableTableJoin(
final ForeignKeyTableTableJoin<KLeftT, KRightT> foreignKeyTableTableJoin) {

return visitJoinStep(foreignKeyTableTableJoin);
}

private PlanInfo visitSourceStep(final ExecutionStep<?> step) {
return new PlanInfo(step);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License; you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.execution.plan;

import static io.confluent.ksql.execution.plan.JoinType.INNER;
import static io.confluent.ksql.execution.plan.JoinType.LEFT;

import com.google.common.testing.EqualsTester;
import io.confluent.ksql.name.ColumnName;
import org.apache.kafka.connect.data.Struct;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class ForeignKeyTableTableJoinTest {

private static final ColumnName JOIN_COLUMN_NAME = ColumnName.of("Bob");
private static final ColumnName JOIN_COLUMN_NAME_2 = ColumnName.of("Vic");

@Mock
private ExecutionStepPropertiesV1 props1;
@Mock
private ExecutionStepPropertiesV1 props2;
@Mock
private ExecutionStep<KTableHolder<Struct>> left1;
@Mock
private ExecutionStep<KTableHolder<Struct>> right1;
@Mock
private ExecutionStep<KTableHolder<Struct>> left2;
@Mock
private ExecutionStep<KTableHolder<Struct>> right2;

@SuppressWarnings("UnstableApiUsage")
@Test
public void shouldImplementEquals() {
new EqualsTester()
.addEqualityGroup(
new ForeignKeyTableTableJoin<>(props1, INNER, JOIN_COLUMN_NAME, left1, right1),
new ForeignKeyTableTableJoin<>(props1, INNER, JOIN_COLUMN_NAME, left1, right1)
)
.addEqualityGroup(
new ForeignKeyTableTableJoin<>(props2, INNER, JOIN_COLUMN_NAME, left1, right1)
)
.addEqualityGroup(
new ForeignKeyTableTableJoin<>(props1, LEFT, JOIN_COLUMN_NAME, left1, right1)
)
.addEqualityGroup(
new ForeignKeyTableTableJoin<>(props1, INNER, JOIN_COLUMN_NAME_2, left1, right1)
)
.addEqualityGroup(
new ForeignKeyTableTableJoin<>(props1, INNER, JOIN_COLUMN_NAME, left2, right1)
)
.addEqualityGroup(
new ForeignKeyTableTableJoin<>(props1, INNER, JOIN_COLUMN_NAME, left1, right2)
).testEquals();
}
}
31 changes: 31 additions & 0 deletions ksqldb-rest-app/src/test/resources/ksql-plan-schema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1044,6 +1044,35 @@
"title" : "tableTableJoinV1",
"required" : [ "@type", "properties", "joinType", "leftSource", "rightSource" ]
},
"ForeignKeyTableTableJoin" : {
"type" : "object",
"additionalProperties" : false,
"properties" : {
"@type" : {
"type" : "string",
"enum" : [ "fkTableTableJoinV1" ],
"default" : "fkTableTableJoinV1"
},
"properties" : {
"$ref" : "#/definitions/ExecutionStepPropertiesV1"
},
"joinType" : {
"type" : "string",
"enum" : [ "INNER", "LEFT", "OUTER" ]
},
"leftJoinColumnName" : {
"type" : "string"
},
"leftSource" : {
"$ref" : "#/definitions/ExecutionStep"
},
"rightSource" : {
"$ref" : "#/definitions/ExecutionStep"
}
},
"title" : "fkTableTableJoinV1",
"required" : [ "@type", "properties", "joinType", "leftJoinColumnName", "leftSource", "rightSource" ]
},
"RefinementInfo" : {
"type" : "object",
"additionalProperties" : false,
Expand Down Expand Up @@ -1108,6 +1137,8 @@
"$ref" : "#/definitions/TableSuppress"
}, {
"$ref" : "#/definitions/TableTableJoin"
}, {
"$ref" : "#/definitions/ForeignKeyTableTableJoin"
} ]
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Confluent Inc.
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License; you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
Expand All @@ -21,6 +21,7 @@
import io.confluent.ksql.execution.expression.tree.FunctionCall;
import io.confluent.ksql.execution.plan.ExecutionStep;
import io.confluent.ksql.execution.plan.ExecutionStepPropertiesV1;
import io.confluent.ksql.execution.plan.ForeignKeyTableTableJoin;
import io.confluent.ksql.execution.plan.Formats;
import io.confluent.ksql.execution.plan.JoinType;
import io.confluent.ksql.execution.plan.KGroupedStreamHolder;
Expand Down Expand Up @@ -320,6 +321,23 @@ public static <K> TableTableJoin<K> tableTableJoin(
);
}

public static <KLeftT, KRightT> ForeignKeyTableTableJoin<KLeftT, KRightT>
foreignKeyTableTableJoin(final QueryContext.Stacker stacker,
final JoinType joinType,
final ColumnName leftJoinColumnName,
final ExecutionStep<KTableHolder<KLeftT>> left,
final ExecutionStep<KTableHolder<KRightT>> right
) {
final QueryContext queryContext = stacker.getQueryContext();
return new ForeignKeyTableTableJoin<>(
new ExecutionStepPropertiesV1(queryContext),
joinType,
leftJoinColumnName,
left,
right
);
}

public static StreamAggregate streamAggregate(
final QueryContext.Stacker stacker,
final ExecutionStep<KGroupedStreamHolder> sourceStep,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.execution.streams;

import io.confluent.ksql.schema.ksql.LogicalSchema;
import java.util.Objects;

public class ForeignKeyJoinParams<KRightT> {
private final KsqlKeyExtractor<KRightT> keyExtractor;
private final KsqlValueJoiner joiner;
private final LogicalSchema schema;

ForeignKeyJoinParams(final KsqlKeyExtractor<KRightT> keyExtractor,
final KsqlValueJoiner joiner,
final LogicalSchema schema) {
this.keyExtractor = Objects.requireNonNull(keyExtractor, "keyExtractor");
this.joiner = Objects.requireNonNull(joiner, "joiner");
this.schema = Objects.requireNonNull(schema, "schema");
}

public LogicalSchema getSchema() {
return schema;
}

public KsqlKeyExtractor<KRightT> getKeyExtractor() {
return keyExtractor;
}

public KsqlValueJoiner getJoiner() {
return joiner;
}
}

0 comments on commit 744fa36

Please sign in to comment.