Skip to content

Commit

Permalink
feat: ungate support for foreign key joins (#7591)
Browse files Browse the repository at this point in the history
- fix: update expected outputs in tests
- chore: add another fk join internal topic to qtt framework ignore list
- test: remove obsolete test
  • Loading branch information
vcrfxia committed May 26, 2021
1 parent 0c4cf23 commit 061fb4a
Show file tree
Hide file tree
Showing 68 changed files with 12,254 additions and 368 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ public class KsqlConfig extends AbstractConfig {
+ "'CREATE STREAM S AS ...' will create a topic 'thing-S', where as the statement "
+ "'CREATE STREAM S WITH(KAFKA_TOPIC = 'foo') AS ...' will create a topic 'foo'.";

public static final String KSQL_FOREIGN_KEY_JOINS_ENABLED = "ksql.joins.foreign.key.enable";

public static final String KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG =
"ksql.query.persistent.active.limit";
private static final int KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_DEFAULT = Integer.MAX_VALUE;
Expand Down Expand Up @@ -801,12 +799,6 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_DEFAULT,
Importance.MEDIUM,
KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_DOC
).define(
KSQL_FOREIGN_KEY_JOINS_ENABLED,
Type.BOOLEAN,
false,
Importance.MEDIUM,
"Feature flag for foreign key joins, currently under development."
).define(
KSQL_SHUTDOWN_TIMEOUT_MS_CONFIG,
Type.LONG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -726,73 +726,63 @@ private Optional<ColumnReferenceExp> verifyForeignKeyJoin(
final Expression leftExpression = joinInfo.getLeftJoinExpression();
final Expression rightExpression = joinInfo.getRightJoinExpression();

if (ksqlConfig.getBoolean(KsqlConfig.KSQL_FOREIGN_KEY_JOINS_ENABLED)) {

if (joinInfo.getType().equals(JoinType.OUTER)) {
throw new KsqlException(String.format(
"Invalid join type:"
+ " full-outer join not supported for foreign-key table-table join."
+ " Got %s %s %s.",
joinInfo.getLeftSource().getDataSource().getName().text(),
joinType,
joinInfo.getRightSource().getDataSource().getName().text()
));
}

// after we lift this n-way join restriction, we should be able to support FK-joins
// at any level in the join tree, even after we add right-deep/bushy join tree support,
// because a FK-join output table has the same PK as its left input table
if (!(leftNode instanceof DataSourceNode)
|| !(rightNode instanceof DataSourceNode)) {
throw new KsqlException(String.format(
"Invalid join condition:"
+ " foreign-key table-table joins are not supported as part of n-way joins."
+ " Got %s = %s.",
joinInfo.getFlippedLeftJoinExpression(),
joinInfo.getFlippedRightJoinExpression()
));
}

if (!(leftExpression instanceof ColumnReferenceExp)) {
throw new KsqlException(String.format(
"Invalid join condition:"
+ " foreign-key table-table joins with expressions are not supported yet."
+ " Got %s = %s.",
joinInfo.getFlippedLeftJoinExpression(),
joinInfo.getFlippedRightJoinExpression()
));
}

// we need to extend this to support expressions later on
final ColumnReferenceExp fkColumnReference = (ColumnReferenceExp) leftExpression;

final SqlType fkColumnType =
leftNode.getSchema().findColumn(fkColumnReference.getColumnName()).get().type();
final SqlType rightKeyType = Iterables.getOnlyElement(rightNode.getSchema().key()).type();

verifyJoinConditionTypes(
fkColumnType,
rightKeyType,
leftExpression,
rightExpression,
joinInfo.hasFlippedJoinCondition()
);
if (joinInfo.getType().equals(JoinType.OUTER)) {
throw new KsqlException(String.format(
"Invalid join type:"
+ " full-outer join not supported for foreign-key table-table join."
+ " Got %s %s %s.",
joinInfo.getLeftSource().getDataSource().getName().text(),
joinType,
joinInfo.getRightSource().getDataSource().getName().text()
));
}

if (((DataSourceNode) rightNode).isWindowed()) {
throw new KsqlException(
"Foreign-key table-table joins are not supported on windowed tables."
);
}
// after we lift this n-way join restriction, we should be able to support FK-joins
// at any level in the join tree, even after we add right-deep/bushy join tree support,
// because a FK-join output table has the same PK as its left input table
if (!(leftNode instanceof DataSourceNode)
|| !(rightNode instanceof DataSourceNode)) {
throw new KsqlException(String.format(
"Invalid join condition:"
+ " foreign-key table-table joins are not supported as part of n-way joins."
+ " Got %s = %s.",
joinInfo.getFlippedLeftJoinExpression(),
joinInfo.getFlippedRightJoinExpression()
));
}

return Optional.of(fkColumnReference);
} else {
if (!(leftExpression instanceof ColumnReferenceExp)) {
throw new KsqlException(String.format(
"Invalid join condition:"
+ " foreign-key table-table joins are not supported. Got %s = %s.",
+ " foreign-key table-table joins with expressions are not supported yet."
+ " Got %s = %s.",
joinInfo.getFlippedLeftJoinExpression(),
joinInfo.getFlippedRightJoinExpression()
));
}

// we need to extend this to support expressions later on
final ColumnReferenceExp fkColumnReference = (ColumnReferenceExp) leftExpression;

final SqlType fkColumnType =
leftNode.getSchema().findColumn(fkColumnReference.getColumnName()).get().type();
final SqlType rightKeyType = Iterables.getOnlyElement(rightNode.getSchema().key()).type();

verifyJoinConditionTypes(
fkColumnType,
rightKeyType,
leftExpression,
rightExpression,
joinInfo.hasFlippedJoinCondition()
);

if (((DataSourceNode) rightNode).isWindowed()) {
throw new KsqlException(
"Foreign-key table-table joins are not supported on windowed tables."
);
}

return Optional.of(fkColumnReference);
}

private static boolean joinOnNonKeyAttribute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,30 +266,6 @@ public void shouldRepartitionRightStreamIfNotCorrectKey() {
));
}

@Test
public void shouldThrowIfLeftTableNotJoiningOnTableKey() {
// Given:
givenKafkaTopicsExist("test4", "test5");
execute(CREATE_TABLE_TEST4 + CREATE_TABLE_TEST5);

// When:
final Exception e = assertThrows(
KsqlException.class,
() -> execute("CREATE TABLE t1 AS "
+ "SELECT * FROM test4 JOIN test5 "
+ "ON test4.col0 = test5.id;")
);

// Then:
assertThat(
e.getMessage(),
containsString(
"Invalid join condition:"
+ " foreign-key table-table joins are not supported. Got TEST4.COL0 = TEST5.ID."
)
);
}

@Test
public void shouldThrowIfRightTableNotJoiningOnTableKey() {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,13 @@
import io.confluent.ksql.test.tools.TestCase;
import io.confluent.ksql.test.tools.TestCaseBuilder;
import io.confluent.ksql.test.tools.TopologyAndConfigs;
import io.confluent.ksql.util.KsqlConfig;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public final class PlannedTestUtils {
Expand All @@ -49,8 +47,7 @@ public static boolean isPlannedTestCase(final TestCase testCase) {

public static boolean isNotExcluded(final TestCase testCase) {
// Place temporary logic here to exclude test cases based on feature flags, etc.
final Map<String, Object> props = testCase.properties();
return !(boolean) props.getOrDefault(KsqlConfig.KSQL_FOREIGN_KEY_JOINS_ENABLED, false);
return true;
}

public static boolean isSamePlan(
Expand Down
Loading

0 comments on commit 061fb4a

Please sign in to comment.