Skip to content

[FLINK-37916][table] Add StreamPhysicalMultiJoin #26664

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 12, 2025

Conversation

gustavodemorais
Copy link
Contributor

What is the purpose of the change

Adds StreamPhysicalMultiJoin that creates a StreamExecMultiJoin

Brief change log

  • Adds StreamPhysicalMultiJoin implementing StreamPhysicalRel
  • Add necessary private methods to instantiate params

Verifying this change

This change will be only adds StreamExecMultiJoin. Tests will be added as soon as we can use it/test it.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

@flinkbot
Copy link
Collaborator

flinkbot commented Jun 10, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@gustavodemorais
Copy link
Contributor Author

Rebased

Comment on lines +171 to +172
Collections.emptyMap(), // TODO Enable hint-based state ttl. See ticket
// TODO https://issues.apache.org/jira/browse/FLINK-37936
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure people who will review FLINK-37936 will be able to detect these TODOs...
should they be explicitly mentioned in JIRA?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, wanted to do it and forgot. Updated the Jira!

Comment on lines +113 to +115
newInputs.set(ordinalInParent, p);
this.inputs = List.copyOf(newInputs);
recomputeDigest();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 quetsions

  1. are we sure that ordinalInParent is always inside boundaries?
    2.how likely it could be that value at that index is same as p?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I expect so, I did the same we have for PTFs
  2. I don't expect it's likely, since we're replacing it

Copy link
Contributor Author

@gustavodemorais gustavodemorais Jun 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took a look at it again and

  1. I've added a assertion check since I saw some other places do it
  2. As you've mentioned we're not sure what to do if they call with the same p, so I think it's safer to let the caller control this

I think we should be ok either way. But I think adding the first assertion should be safe!

.item("joinTypes", joinTypes)
.item("joinConditions", joinConditions)
.itemIf("postJoinFilter", postJoinFilter, postJoinFilter != null)
.item("select", String.join(",", getRowType().getFieldNames()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC usually there should be indexes instead of names
otherwise there might be memory issues if there are too many fields with long names
also I'm not sure the output would be clear for cases where there are several nested subqueries and each time fields have a new alias (for instance set manually in sql)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You raise good points. I've also here followed what we did for PTFs

Wdyt @twalthr?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at CommonPhysicalJoin we are also using names. Let's fix this problem when it occurs again and keep the PR as it is for now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair enough

Copy link
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly minor stuff. Should be good from my side in the next iteration.

.item("joinTypes", joinTypes)
.item("joinConditions", joinConditions)
.itemIf("postJoinFilter", postJoinFilter, postJoinFilter != null)
.item("select", String.join(",", getRowType().getFieldNames()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at CommonPhysicalJoin we are also using names. Let's fix this problem when it occurs again and keep the PR as it is for now?

return inputs.stream()
.map(
input -> {
final FlinkRelMetadataQuery fmq =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this line out of the loop / stream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was actually doing input.getCluster().getMetadataQuery() but this might not be necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm intuitively using the input

final FlinkRelMetadataQuery fmq =
                FlinkRelMetadataQuery.reuseOrCreate(input.getCluster().getMetadataQuery());
        return fmq.getUpsertKeys(input);

And that's why it's inside

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've moved it to an own method so the code and kept as it is for now. Similar to what we do here

@twalthr twalthr merged commit 4280dbd into apache:master Jun 12, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants