-
Notifications
You must be signed in to change notification settings - Fork 13.7k
[FLINK-37916][table] Add StreamPhysicalMultiJoin #26664
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
30b1fc4
to
00cfd05
Compare
Rebased |
Collections.emptyMap(), // TODO Enable hint-based state ttl. See ticket | ||
// TODO https://issues.apache.org/jira/browse/FLINK-37936 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure people who will review FLINK-37936 will be able to detect these TODOs...
should they be explicitly mentioned in JIRA?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, wanted to do it and forgot. Updated the Jira!
newInputs.set(ordinalInParent, p); | ||
this.inputs = List.copyOf(newInputs); | ||
recomputeDigest(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2 quetsions
- are we sure that
ordinalInParent
is always inside boundaries?
2.how likely it could be that value at that index is same asp
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- I expect so, I did the same we have for PTFs
Line 137 in 86ee4b5
final List<RelNode> newInputs = new ArrayList<>(inputs); - I don't expect it's likely, since we're replacing it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took a look at it again and
- I've added a assertion check since I saw some other places do it
- As you've mentioned we're not sure what to do if they call with the same
p
, so I think it's safer to let the caller control this
I think we should be ok either way. But I think adding the first assertion should be safe!
.item("joinTypes", joinTypes) | ||
.item("joinConditions", joinConditions) | ||
.itemIf("postJoinFilter", postJoinFilter, postJoinFilter != null) | ||
.item("select", String.join(",", getRowType().getFieldNames())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC usually there should be indexes instead of names
otherwise there might be memory issues if there are too many fields with long names
also I'm not sure the output would be clear for cases where there are several nested subqueries and each time fields have a new alias (for instance set manually in sql)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at CommonPhysicalJoin
we are also using names. Let's fix this problem when it occurs again and keep the PR as it is for now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair enough
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly minor stuff. Should be good from my side in the next iteration.
.../java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMultiJoin.java
Outdated
Show resolved
Hide resolved
.item("joinTypes", joinTypes) | ||
.item("joinConditions", joinConditions) | ||
.itemIf("postJoinFilter", postJoinFilter, postJoinFilter != null) | ||
.item("select", String.join(",", getRowType().getFieldNames())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at CommonPhysicalJoin
we are also using names. Let's fix this problem when it occurs again and keep the PR as it is for now?
.../java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMultiJoin.java
Outdated
Show resolved
Hide resolved
return inputs.stream() | ||
.map( | ||
input -> { | ||
final FlinkRelMetadataQuery fmq = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this line out of the loop / stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was actually doing input.getCluster().getMetadataQuery()
but this might not be necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm intuitively using the input
final FlinkRelMetadataQuery fmq =
FlinkRelMetadataQuery.reuseOrCreate(input.getCluster().getMetadataQuery());
return fmq.getUpsertKeys(input);
And that's why it's inside
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've moved it to an own method so the code and kept as it is for now. Similar to what we do here
What is the purpose of the change
Adds StreamPhysicalMultiJoin that creates a StreamExecMultiJoin
Brief change log
Verifying this change
This change will be only adds StreamExecMultiJoin. Tests will be added as soon as we can use it/test it.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation