[FLINK-38647] Support changelog-mode=upsert + null-tolerance#4437
Open
cansakiroglu wants to merge 1 commit into
Open
[FLINK-38647] Support changelog-mode=upsert + null-tolerance#4437cansakiroglu wants to merge 1 commit into
cansakiroglu wants to merge 1 commit into
Conversation
FLINK-38647 reports a NullPointerException from the Postgres Pipeline source
when a captured table has REPLICA IDENTITY DEFAULT and an UPDATE arrives that
does not change the primary key. Root cause: PostgresDataSource hardcodes
DebeziumChangelogMode.ALL, which makes the deserializer extract a before-image
that is null under DEFAULT replication, NPE'ing in
DebeziumSchemaDataTypeInference.inferStruct.
This change fixes the issue from two angles:
1. Connector side — expose 'changelog-mode' YAML option on the Postgres
Pipeline connector (mirrors the legacy SQL connector). Accepts 'all'
(default, current behaviour) or 'upsert'. In upsert mode the source emits
UPDATE events with before == null and only after populated, so the
pipeline runs cleanly under REPLICA IDENTITY DEFAULT without requiring
FULL (which roughly multiplies WAL volume per UPDATE by column count).
2. Runtime side — make DataChangeEventSerializer null-tolerant. Three changes:
- copy() null-guards each recordDataSerializer.copy() call so the chained
CopyingChainingOutput.pushToOperator path stops NPE'ing on null
before/after.
- serialize() writes 2 leading boolean presence flags before conditionally
writing each record. Required because the previous serialize() already
skipped writing null fields, but deserialize() always tried to read two
records back — the wire format itself couldn't roundtrip a null-before
UPDATE.
- deserialize() reads the 2 flag bytes and skips the corresponding read
when absent.
Wire format change is symmetric but breaking vs current 3.6.x. Production
checkpoints/savepoints written by older versions cannot be restored by the
new code without a snapshot-version-aware deserialize path; see the PR
description for the proposed compat strategy (bumping
DataChangeEventSerializerSnapshot.CURRENT_VERSION to 2 and reading old
format when version == 1). Happy to add that in a follow-up commit on this
PR — wanted reviewers' opinion on the strategy first.
Signed-off-by: Mehmet Can Şakiroğlu <cansakiroglu@gmail.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
FLINK-38647 reports a NullPointerException from the Postgres Pipeline source when a captured table has REPLICA IDENTITY DEFAULT and an UPDATE arrives. Root cause: PostgresDataSource hardcodes DebeziumChangelogMode.ALL, which makes the deserializer extract a before-image that is null under DEFAULT replication, hence NPE'ing.
This change fixes the issue from two angles:
Connector side: expose 'changelog-mode' YAML option on the Postgres Pipeline connector. Accepts 'all' (default, current behaviour) or 'upsert'. In upsert mode the source emits UPDATE events with before == null and only after populated, so the pipeline runs cleanly under REPLICA IDENTITY DEFAULT without requiring FULL (which increases WAL volume).
Runtime side: make serializer null-tolerant. Three changes: