[FLINK-39495][table] Fix FROM_CHANGELOG silently dropping rows with unmapped operation codes#27973
[FLINK-39495][table] Fix FROM_CHANGELOG silently dropping rows with unmapped operation codes#27973twalthr merged 3 commits intoapache:masterfrom
Conversation
…nmapped operation codes
| throw new TableRuntimeException( | ||
| String.format( | ||
| "Received invalid op code '%s'. Defined op codes are: %s.", | ||
| opCode, opMap.keySet())); |
There was a problem hiding this comment.
Please double check if the current document from_changelog has been synchronized and modified
There was a problem hiding this comment.
Updated the docs with the new behavior
| @@ -137,7 +138,10 @@ public void eval( | |||
| final StringData opCode = input.getString(opColumnIndex); | |||
There was a problem hiding this comment.
Is it possible for opCode to be null at present
There was a problem hiding this comment.
It's an important point. We can classify this as an invalid op code and and fail/skip as configured by the user https://cwiki.apache.org/confluence/display/FLINK/FLIP-564%3A+Support+FROM_CHANGELOG+and+TO_CHANGELOG+built-in+PTFs#431-invalid-operation-codes
There was a problem hiding this comment.
For now we go with default behavior, which would be failing during runtime.
…ed op codes Reflect the behavior change from silent drop to TableRuntimeException in the FROM_CHANGELOG documentation: update the op_mapping description and add an explicit note after the default mapping table.
Throw TableRuntimeException when an input row carries a NULL op code instead of silently producing an INSERT. Document the behavior in the op parameter description and add a NULL_OP_CODE semantic test.
gustavodemorais
left a comment
There was a problem hiding this comment.
Thanks for the updates, @raminqaf. lgtm!
What is the purpose of the change
FROM_CHANGELOGsilently drops input rows when the operation code column contains a value not present in theop_mapping(or not matching the defaultmapping). This makes data loss invisible and very hard to debug — users have no indication that rows are being discarded.
This PR changes the default behavior to throw a
TableRuntimeExceptionwhen an unmapped operation code is encountered, making data issues visible immediatelyrather than causing silent data loss.
Brief change log
Changed
FromChangelogFunctionto throw aTableRuntimeExceptioninstead of silently returning when an unmapped op code is encounteredRemoved the
UNMAPPED_CODES_DROPPEDsemantic test that asserted silent-drop behaviorRemoved a duplicate input type strategy test ("Valid with UPDATE_BEFORE" was identical to "Valid with custom mapping")
Added
ChangelogFunctionITCasewith a runtime test that verifies the exception is thrown for unmapped op codesVerifying this change
This change added tests and can be verified as follows:
Added
ChangelogFunctionITCase.testFromChangelogFailsOnUnmappedOpCode— sets up a source with an"UNKNOWN"op code and asserts thatFROM_CHANGELOGthrows a
TableRuntimeExceptioncontaining the invalid op code and the defined op codesDoes this pull request potentially affect one of the following parts:
Dependencies (does it add or upgrade a dependency): no
The public API, i.e., is any changed class annotated with
@Public(Evolving): noThe serializers: no
The runtime per-record code paths (performance sensitive): yes (adds a branch in the per-record
evalpath, but only on the error case which waspreviously a silent return)
Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
The S3 file system connector: no
Documentation
Does this pull request introduce a new feature? no
If yes, how is the feature documented? not applicable
Was generative AI tooling used to co-author this PR?
Claude Code (Claude Opus 4.7)