Skip to content

[FLINK-39537][table] Apply conditional SET_SEMANTIC_TABLE trait to FROM_CHANGELOG#28025

Open
raminqaf wants to merge 6 commits intoapache:masterfrom
raminqaf:FLINK-39537
Open

[FLINK-39537][table] Apply conditional SET_SEMANTIC_TABLE trait to FROM_CHANGELOG#28025
raminqaf wants to merge 6 commits intoapache:masterfrom
raminqaf:FLINK-39537

Conversation

@raminqaf
Copy link
Copy Markdown
Contributor

What is the purpose of the change

FROM_CHANGELOG is currently locked to row semantics — each row is processed independently, with no way to co-locate rows for the same key in the same parallel operator instance. This is fine for stateless downstreams but limits use cases where the resulting changelog feeds into a stateful operator keyed on the same column.

This PR uses the conditional-traits machinery introduced for TO_CHANGELOG in FLINK-39392 to switch FROM_CHANGELOG to set semantics when the call provides PARTITION BY. Behavior without PARTITION BY is unchanged.

-- Row semantics (unchanged)
SELECT * FROM FROM_CHANGELOG(input => TABLE cdc_stream);                                                                          
                                                                                                                                  
-- Set semantics: planner inserts Exchange(hash[id])                                                                              
SELECT * FROM FROM_CHANGELOG(input => TABLE cdc_stream PARTITION BY id);                                                          

Brief change log

  • BuiltInFunctionDefinitions.FROM_CHANGELOG: input table argument adds withConditionalTrait(SET_SEMANTIC_TABLE, hasPartitionBy())
  • Added a plan test (FromChangelogTest#testSetSemanticsWithPartitionBy) verifying the Exchange(hash[...]) propagation
  • Added a semantic test program (FromChangelogTestPrograms#SET_SEMANTICS_PARTITION_BY) verifying end-to-end output equivalence
  • Updated SQL reference docs (docs/.../changelog.md) and Table#fromChangelog JavaDoc

Verifying this change

This change added tests and can be verified as follows:

  • FromChangelogTest#testSetSemanticsWithPartitionBy — plan test asserting the optimized rel plan contains Exchange(distribution=[hash[id]]) when PARTITION BY id is specified, and the output changelogMode propagates correctly
  • FromChangelogSemanticTests (via SET_SEMANTICS_PARTITION_BY program) — end-to-end run verifying that adding PARTITION BY changes he parallel execution layout but does NOT alter row-level output (same +I, -U, +U, -D sequence as the row-semantics tests)
  • All existing FromChangelogTest and FromChangelogSemanticTests cases continue to pass without modification, confirming the conditional trait does not regress the row-semantics path

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no (only FROM_CHANGELOG's declared traits change; the function signature and Table#fromChangelog API are unchanged)
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no (runtime function FromChangelogFunction is unchanged)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? docs (docs/content/docs/sql/reference/queries/changelog.md) and JavaDocs
    (Table#fromChangelog)

Was generative AI tooling used to co-author this PR?

  • Yes (Claude Code / Opus 4.7)

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Apr 24, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@raminqaf raminqaf changed the title Apply conditional SET_SEMANTIC_TABLE trait to FROM_CHANGELOG [FLINK-39537][table]Apply conditional SET_SEMANTIC_TABLE trait to FROM_CHANGELOG May 4, 2026
@raminqaf raminqaf changed the title [FLINK-39537][table]Apply conditional SET_SEMANTIC_TABLE trait to FROM_CHANGELOG [FLINK-39537][table] Apply conditional SET_SEMANTIC_TABLE trait to FROM_CHANGELOG May 4, 2026
@raminqaf raminqaf force-pushed the FLINK-39537 branch 4 times, most recently from 2232a48 to cb18d7a Compare May 4, 2026 10:11
@raminqaf raminqaf marked this pull request as ready for review May 4, 2026 10:21
Copy link
Copy Markdown
Contributor

@gustavodemorais gustavodemorais left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clean PR, @raminqaf. I've added some suggestions so we get code aligned and the wording for the documentation right

Comment thread docs/content/docs/sql/reference/queries/changelog.md Outdated
Comment thread docs/content/docs/sql/reference/queries/changelog.md
Comment thread docs/content/docs/sql/reference/queries/changelog.md
Copy link
Copy Markdown
Contributor

@gustavodemorais gustavodemorais left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick updates, @raminqaf! Code is looking good. Could you align this with #28120 where I moved our shared utility to ChangelogTypeStrategyUtils.java‎flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java

@raminqaf
Copy link
Copy Markdown
Contributor Author

raminqaf commented May 6, 2026

Thanks for the quick updates, @raminqaf! Code is looking good. Could you align this with #28120 where I moved our shared utility to ChangelogTypeStrategyUtils.java‎flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java

Thanks @gustavodemorais for the feedbacks. I have aligned the latest changes. If the code looks good enough we can processed with the merge and continue the work on the Utils on your PR or a followup PR so the changes are not blocked here.

@github-actions github-actions Bot added the community-reviewed PR has been reviewed by the community. label May 6, 2026
final Optional<Field> opField =
inputFields.stream().filter(f -> f.getName().equals(opColumnName)).findFirst();
if (opField.isEmpty()) {
final OptionalInt opIndex = resolveOpColumnIndex(inputFields, opColumnName);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you compare names in this method do not use DataYpe#getFieldNames then?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, but down the line we use the type

        final Field opField = inputFields.get(opIndex.getAsInt());
        final LogicalType opFieldType = opField.getDataType().getLogicalType();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about composite types which are covered by DataType#getFieldDataTypes and not sure it is covered here

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I play around with it but tbh I like this approach of fetching the Fields once and gathering the infos out of it

Copy link
Copy Markdown
Contributor

@gustavodemorais gustavodemorais left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for the contribution @raminqaf

@gustavodemorais
Copy link
Copy Markdown
Contributor

Hey @raminqaf, can you adjust one last detail: we want to make it clear for users that the schema changes when partition by is used. Look at to_changelog's format ed4c5d7

* <pre>{@code
* Table result = cdcStream
* .partitionBy($("id"))
* .process("FROM_CHANGELOG");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not so nice. Shall we add a method to PartitionedTable

Suggested change
* .process("FROM_CHANGELOG");
* .fromChangelog();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also do this as a follow up and include to_changelog there.

@snuyanzin
Copy link
Copy Markdown
Contributor

@raminqaf seems your PR is conflicting with the one from @gustavodemorais
can you please resolve the conflicts and rebase?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants