Skip to content

[FLINK-39349][table] Support TO_CHANGELOG retract/upsert stream to upsert stream with deletion flag (4.2.2.1)#27847

Merged
twalthr merged 4 commits into
apache:masterfrom
gustavodemorais:FLINK-39349
Mar 30, 2026
Merged

[FLINK-39349][table] Support TO_CHANGELOG retract/upsert stream to upsert stream with deletion flag (4.2.2.1)#27847
twalthr merged 4 commits into
apache:masterfrom
gustavodemorais:FLINK-39349

Conversation

@gustavodemorais
Copy link
Copy Markdown
Contributor

What is the purpose of the change

Extends TO_CHANGELOG's op_mapping parameter to support comma-separated RowKind names as keys, enabling multiple RowKinds to map to the same output code. This implements FLIP-564 section 4.2.2.1 - the deletion flag pattern (Kafka Connect
style):

  SELECT * FROM TO_CHANGELOG(                                                                                                                                                                                                                       
    input => TABLE t PARTITION BY id,                                                                                                                                                                                                               
    op => DESCRIPTOR(deleted),                                                                                                                                                                                                                      
    op_mapping => MAP['INSERT, UPDATE_AFTER', 'false', 'DELETE', 'true']                                                                                                                                                                            
  )  

Brief change log

  • Support comma-separated RowKind names in op_mapping keys (validation and runtime)
  • Validate duplicate RowKinds across entries at planning time
  • Add semantic tests for deletion flag pattern and duplicate RowKind validation
  • Document the new capability and add examples (SQL, Table API, JavaDoc)

Verifying this change

  • Added DELETION_FLAG semantic test: retract source with comma-separated keys, verifies correct mapping and that unmapped UPDATE_BEFORE is dropped
    • Added DUPLICATE_ROW_KIND validation test: verifies planning fails when a RowKind appears in multiple entries

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (docs / JavaDocs)

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Mar 29, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

throwOnFailure,
String.format(
"Invalid target mapping for argument 'op_mapping'. "
+ "Unknown RowKind: '%s'. Valid values are: %s.",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RowKind is a Java class. And is confusing to SQL persona. This is why it called op:

Suggested change
+ "Unknown RowKind: '%s'. Valid values are: %s.",
+ "Unknown change operation: '%s'. Valid values are: %s.",

throwOnFailure,
String.format(
"Invalid target mapping for argument 'op_mapping'. "
+ "Duplicate RowKind: '%s'.",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
+ "Duplicate RowKind: '%s'.",
+ "Duplicate change operation: '%s'.",

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please update all other locations

@gustavodemorais
Copy link
Copy Markdown
Contributor Author

Thanks for the review, @twalthr 🙂 Addressed your comments, take a look

@gustavodemorais gustavodemorais requested a review from twalthr March 30, 2026 08:13
final Map<RowKind, String> map = new EnumMap<>(RowKind.class);
opMapping.forEach((name, code) -> map.put(RowKind.valueOf(name), code));
return map;
final Map<RowKind, String> result = new EnumMap<>(RowKind.class);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommend adding logging for conflicts or invalid mappings to help users debug configuration issues.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, for TO_CHANGELOG invalid mappings are already caught at planning time by ToChangelogTypeStrategy, the user will see a validation error

TableTestProgram.of(
"to-changelog-invalid-op-mapping",
"fails when op_mapping has invalid RowKind name")
"fails when op_mapping has invalid change operation name")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommend adding notes on why UPDATE_BEFORE is dropped and practical use cases of deletion flag pattern.

* be valid and appear at most once across all entries.
*/
@SuppressWarnings("rawtypes")
private static Optional<List<DataType>> validateOpMappingKeys(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommend adding comments about whitespace and case handling for better user understanding.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the comment to be more specific about this e8d4956

* map("INSERT", "I", "UPDATE_AFTER", "U").asArgument("op_mapping")
* );
*
* // Deletion flag pattern: comma-separated keys map multiple change operations to the same code
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uggest enhancing Javadoc to clarify how multiple RowKinds mapping to the same output may affect UPDATE_BEFORE behavior.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review, Feat Zhang. The existing JavaDoc and documentation already explain that unmapped operations are dropped. Adding UPDATE_BEFORE-specific details to the generic toChangelog() JavaDoc would be too narrow, since it depends on the use case

@github-actions github-actions Bot added the community-reviewed PR has been reviewed by the community. label Mar 30, 2026
Copy link
Copy Markdown
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @gustavodemorais for the additional improvements. Will merge now...

@twalthr twalthr merged commit 4734953 into apache:master Mar 30, 2026
featzhang pushed a commit to featzhang/flink that referenced this pull request Mar 30, 2026
och5351 pushed a commit to och5351/flink that referenced this pull request Mar 31, 2026
Samrat002 pushed a commit to Samrat002/flink that referenced this pull request Mar 31, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants