New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: allow expressions on left table columns in FK-joins #7904
Conversation
final QualifiedColumnReferenceExp node, | ||
final Context<Void> ctx | ||
) { | ||
return Optional.of(new UnqualifiedColumnReferenceExp(node.getColumnName())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ExpressionEvaluator
cannot handle fully qualified column references, thus we rewrite the given expression accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a sync with @mjsax offline about this specifically, and our thoughts are that since the codeGenRunner is used for physical plan generation as well at which stage there's no qualified column refs since we would just prefix the names as table/stream__column
, is that right @vcrfxia ??
If yes, just as a general note here I'm feeling that we should use a separate class than codeGenRunner for traversals in the logical/physical plan generation respectively, than reusing it, since for logical planning all we really need are return type / param types etc, while nevertheless today we always run the same full-fledged traversal multiple times at different stages. Is my understanding reasonable?
@@ -54,26 +55,26 @@ | |||
public void shouldImplementEquals() { | |||
new EqualsTester() | |||
.addEqualityGroup( | |||
new ForeignKeyTableTableJoin<>(props1, INNER, JOIN_COLUMN_NAME, formats1, left1, right1), | |||
new ForeignKeyTableTableJoin<>(props1, INNER, JOIN_COLUMN_NAME, formats1, left1, right1) | |||
new ForeignKeyTableTableJoin<>(props1, INNER, JOIN_COLUMN_NAME, formats1, left1, right1, null), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Open TODO...
@@ -48,7 +51,7 @@ public void shouldBuildCorrectKeyedSchema() { | |||
|
|||
// When: | |||
final ForeignKeyJoinParams<String> joinParams = | |||
ForeignKeyJoinParamsFactory.create(leftJoinColumnName, LEFT_SCHEMA, RIGHT_SCHEMA); | |||
ForeignKeyJoinParamsFactory.create(Optional.of(leftJoinColumnName), Optional.empty(), LEFT_SCHEMA, RIGHT_SCHEMA, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO...
@@ -136,26 +136,26 @@ public void init() { | |||
); | |||
} | |||
|
|||
@Test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO..
columnRef.maybeQualifier().get(), | ||
columnRef.getColumnName()) | ||
: columnRef.getColumnName(); | ||
final VisitParentExpressionVisitor<Optional<Expression>, Context<Void>> aliasRewritter = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the code we replace, we need to rewrite the expression with the alias prefix...
4db756a
to
67cb736
Compare
Needed to update the generated schemas... I think it's safe. Do you agree @vcrfxia ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mjsax -- LGTM with some questions/comments inline. It'd also be nice to add a test for mismatched types involving expressions (ensure that the error message is sensible) and another test with expressions including qualified columns.
final Stacker contextStacker, | ||
final FormatInfo valueFormatInfo | ||
final FormatInfo valueFormatInfo, | ||
final Optional<Expression> expression |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename to leftJoinExpression
to be more informative (and parallel leftJoinColumnName
above)? Same for the other occurrences.
final Context<Void> ctx | ||
) { | ||
return Optional.of(new UnqualifiedColumnReferenceExp( | ||
ColumnNames.generatedJoinColumnAlias(node.getQualifier(), node.getColumnName()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not following the logic with qualifiers here. The old code only sometimes generated the join column alias (if a qualifier is present) but this new code seems to always generate the join column alias (before rewriting the alias to be unqualified). Is this a change in behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it not a behavior change, because we use the visiter pattern and this method is only executed for qualified column references (its name is visitQualifiedColumnReference
). Thus unqualified column references should be untouched, as before.
) { | ||
this.properties = requireNonNull(props, "props"); | ||
this.joinType = requireNonNull(joinType, "joinType"); | ||
if (joinType == JoinType.OUTER) { | ||
throw new IllegalArgumentException("OUTER join not supported."); | ||
} | ||
this.leftJoinColumnName = requireNonNull(leftJoinColumnName, "leftJoinColumnName"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can still require non-null for Optional
types. It'd be nice to add this back, and also add it for leftJoinExpression
below.
final Formats formats, | ||
final ExecutionStep<KTableHolder<KLeftT>> left, | ||
final ExecutionStep<KTableHolder<KRightT>> right | ||
final ExecutionStep<KTableHolder<KRightT>> right, | ||
final Optional<Expression> expression |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename to leftJoinExpression
to be more informative?
) | ||
); | ||
} else { | ||
expressionEvaluator = Optional.empty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of having two separate cases here, can we combine them by always passing an expression evaluator in order to simplify the code? The expression evaluator for the single column case would be simple, just extracting the appropriate column.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea!
KsqlKeyExtractor(final int leftJoinColumnIndex) { | ||
checkArgument( | ||
leftJoinColumnIndex >= 0, | ||
"leftJoinColumnIndex negative: " + leftJoinColumnIndex | ||
); | ||
|
||
this.leftJoinColumnIndex = leftJoinColumnIndex; | ||
expressionEvaluator = Optional.empty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above, it'd be nice if we could simplify this code by always using an expression evaluator.
); | ||
if (leftJoinColumnName.isPresent()) { | ||
return new ForeignKeyJoinParams<>( | ||
createKeyExtractor(leftSchema, leftJoinColumnName.get()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think it's slightly more readable to create the foreign key extractor in an if-statement and then have a single return statement for building the ForeignKeyJoinParams. Also less code duplication.
// Then: | ||
verify(leftKTable).leftJoin( | ||
same(rightKTable), | ||
eq(new KsqlKeyExtractor<>(1)), | ||
any(KsqlKeyExtractor.class), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vcrfxia After the refactoring to always use ExpressionEvaluator
, it pretty hard to verify that KsqlKeyExtractor
is created correctly now... I just simplified this test to make it pass for now. But this change kinda make the test useless... Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we really want to test this, we can use an ArgumentCaptor to capture the KsqlKeyExtractor, add a getter for the expression evaluator (visible for testing only), get the expression from the expression evaluator, and verify that the expression is a column reference expression for the appropriate column. I don't think this is too much effort if we wrap it in a helper method.
As you pointed out in the first version of this PR, test coverage for this (and some of the other unit tests) is lacking at the moment since we only test single columns for the left join expression, and not any non-column expressions. If we want to extend test coverage to include non-column expressions, I think we'll have to do something similar anyway. (I'm happy deferring this to a follow-up PR if you prefer since I know you've got some time constraints.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh nice! #gold
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mjsax ! The new QTT coverage is great. Love the refactor too 🎉
// Then: | ||
verify(leftKTable).leftJoin( | ||
same(rightKTable), | ||
eq(new KsqlKeyExtractor<>(1)), | ||
any(KsqlKeyExtractor.class), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we really want to test this, we can use an ArgumentCaptor to capture the KsqlKeyExtractor, add a getter for the expression evaluator (visible for testing only), get the expression from the expression evaluator, and verify that the expression is a column reference expression for the appropriate column. I don't think this is too much effort if we wrap it in a helper method.
As you pointed out in the first version of this PR, test coverage for this (and some of the other unit tests) is lacking at the moment since we only test single columns for the left join expression, and not any non-column expressions. If we want to extend test coverage to include non-column expressions, I think we'll have to do something similar anyway. (I'm happy deferring this to a follow-up PR if you prefer since I know you've got some time constraints.)
KsqlKeyExtractor(final ExpressionEvaluator expressionEvaluator, | ||
final ProcessingLogger processingLogger) { | ||
this.expressionEvaluator = requireNonNull(expressionEvaluator); | ||
this.processingLogger = processingLogger; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason we don't require this to be non-null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only "forward" it to expressionEvaluator.evaluate()
but don't use it ourselves, so I though we don't need this (makes testing simpler as we can pass null
:))
Assuming we're talking about the execution step/plan schemas, I agree this is safe. The updates reflect the changes to the ForeignKeyTableTableJoin execution step. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not have any major comments, just a question about the tech debt we have for using the same codeGenRunner for logical / physical planning.
@@ -321,9 +321,10 @@ public SchemaKGroupedTable groupBy( | |||
|
|||
public <KRightT> SchemaKTable<K> foreignKeyInnerJoin( | |||
final SchemaKTable<KRightT> schemaKTable, | |||
final ColumnName leftJoinColumnName, | |||
final Optional<ColumnName> leftJoinColumnName, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: move the new param as the third param? Ditto below.
@@ -643,10 +645,10 @@ static JoinKey syntheticColumn() { | |||
} | |||
|
|||
static JoinKey foreignKeyColumn( | |||
final ColumnName foreignKeyColumn, | |||
final Expression foreignKeyExpression, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we just name the function as joinKey(..)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's got with foreignKey()
private final ImmutableList<? extends ColumnReferenceExp> leftSourceKeyColumns; | ||
|
||
static JoinKey of(final ColumnName foreignKeyColumn, | ||
final Collection<QualifiedColumnReferenceExp> leftSourceKeyColumns) { | ||
return new ForeignJoinKey(foreignKeyColumn, leftSourceKeyColumns); | ||
return new ForeignJoinKey( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This static construct seems not used any more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch!
@JsonProperty(value = "formats", required = true) | ||
final Formats formats, | ||
@JsonProperty(value = "leftSource", required = true) | ||
final ExecutionStep<KTableHolder<KLeftT>> leftSource, | ||
@JsonProperty(value = "rightSource", required = true) | ||
final ExecutionStep<KTableHolder<KRightT>> rightSource | ||
final ExecutionStep<KTableHolder<KRightT>> rightSource, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also pretty nit: could we add the new param after leftJoinColumnName
to group them together? Similar for other classes with the new param in constructors.
} | ||
}, { | ||
"@type" : "ksqlPlanV1", | ||
"statementText" : "CREATE TABLE OUTPUT AS SELECT\n LEFT_TABLE.L_ID L_ID,\n LEFT_TABLE.L_ID_2_FOREIGN_KEY L_ID_2_FOREIGN_KEY,\n RIGHT_TABLE.R_ID R_ID,\n LEFT_TABLE.NAME NAME,\n RIGHT_TABLE.F1 F1\nFROM LEFT_TABLE LEFT_TABLE\nINNER JOIN RIGHT_TABLE RIGHT_TABLE ON ((LEFT_TABLE.L_ID_2_FOREIGN_KEY = RIGHT_TABLE.R_ID))\nEMIT CHANGES", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems the left key is not an expression? Did I miss anything?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those are historic plans that are auto-generated...
} | ||
}, { | ||
"@type" : "ksqlPlanV1", | ||
"statementText" : "CREATE TABLE OUTPUT AS SELECT\n LEFT_TABLE.L_ID L_ID,\n RIGHT_TABLE.R_ID R_ID,\n LEFT_TABLE.NAME NAME,\n RIGHT_TABLE.F1 F1\nFROM LEFT_TABLE LEFT_TABLE\nINNER JOIN RIGHT_TABLE RIGHT_TABLE ON ((LEFT_TABLE.FOREIGN_KEY = RIGHT_TABLE.R_ID))\nEMIT CHANGES", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here..?
final QualifiedColumnReferenceExp node, | ||
final Context<Void> ctx | ||
) { | ||
return Optional.of(new UnqualifiedColumnReferenceExp(node.getColumnName())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a sync with @mjsax offline about this specifically, and our thoughts are that since the codeGenRunner is used for physical plan generation as well at which stage there's no qualified column refs since we would just prefix the names as table/stream__column
, is that right @vcrfxia ??
If yes, just as a general note here I'm feeling that we should use a separate class than codeGenRunner for traversals in the logical/physical plan generation respectively, than reusing it, since for logical planning all we really need are return type / param types etc, while nevertheless today we always run the same full-fledged traversal multiple times at different stages. Is my understanding reasonable?
Merging this PR as-is, and do a follow up PR to address the open comments. |
Follow up to confluentinc#7904
Follow up PR: #7916 |
Description
Currently, FK-join require to use a plain column reference for the left join expression. We want to lift this restriction and allow for actual expression in the left join expression.
Testing done
Unit tests and new QTT tests.
Reviewer checklist