Skip to content

[FLINK-37763][table] Support multiple table arguments in PTFs#26600

Merged
twalthr merged 2 commits intoapache:masterfrom
twalthr:FLINK-37763
May 28, 2025
Merged

[FLINK-37763][table] Support multiple table arguments in PTFs#26600
twalthr merged 2 commits intoapache:masterfrom
twalthr:FLINK-37763

Conversation

@twalthr
Copy link
Copy Markdown
Contributor

@twalthr twalthr commented May 27, 2025

What is the purpose of the change

This adds support for multiple table arguments in PTFs. It supports table arguments with set semantics only, as mentioned in the FLIP. Because MultipleInputStreamOperator already supports n-ary inputs, this PR can add support for more than two inputs in the first version.

Until Calcite fully supports the COPARTITION clause, including the tricky scope resolution of SELECT E.* FROM f(T1 => TABLE(Emp) AS E, ...), we assume that all given tables must be copartitioned. This should be a reasonable initial assumption as cross products are undesirable in a distributed system. Once the COPARTITION is supported, the SQL calling syntax should change from f(T1 => TABLE Emp PARTITION BY c, ...) to f(T1 => TABLE(Emp) AS E PARTITION BY c,..., COPARTITION(E, ...). Thus, currently we are not standard compliant in this regard but with forward compatible considerations.

Brief change log

  • Refactor ProcessTableOperator to a single-input, non-keyed one and a multi-input, keyed one.
  • Add tons of validation relative to pass-through columns, time attributes, and updating behavior.
  • Update various locations that did not support multiple inputs yet.

Verifying this change

This change added tests and can be verified as follows:

ProcessTableFunctionTestPrograms.PROCESS_MULTI_INPUT,
ProcessTableFunctionTestPrograms.PROCESS_STATEFUL_MULTI_INPUT_WITH_TIMEOUT,
ProcessTableFunctionTestPrograms.PROCESS_UPDATING_MULTI_INPUT

and various unit tests in ProcessTableFunctionTest.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): yes
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? docs

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented May 27, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

public static final ConfigOption<Integer> TABLE_OPTIMIZER_PTF_MAX_TABLES =
key("table.optimizer.ptf.max-tables")
.intType()
.defaultValue(20)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of curiosity do we know if there is any other vendor offering PTF feature and what limit do they have?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not many vendors support multi input PTFs. Maybe even just Oracle. In any case, the limit has been discussed with @pnowojski and should be reasonable default that our engine might still be able to handle. All we wanted to avoid is immediately supporting Integer.MAX_VALUE.

Copy link
Copy Markdown
Contributor

@snuyanzin snuyanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for moving this forward

one minor thing: ptfs for chinese version also would make sense to update

@twalthr
Copy link
Copy Markdown
Contributor Author

twalthr commented May 27, 2025

Thank you @snuyanzin. I will add the Chinese docs now, I just wanted to wait until the feedback has settled.

evalCollector = new PassPartitionKeysCollector(output, changelogMode, tableSemantics);
}
onTimerCollector = new PassAllCollector(output, changelogMode);
// Collect with partition keys for each table
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is this comment correct? Seems to be the same for PassPartitionKeysCollector which collects with the partition key

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general the comment is correct. The result is described here, just using a different collector to achieve the same. I will adjust the comment.

@twalthr twalthr merged commit 86ee4b5 into apache:master May 28, 2025
yanand0909 pushed a commit to yanand0909/flink that referenced this pull request Jun 10, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants