[HUDI-18888] fix(streamer): downgrade source-emitted V2 checkpoint to V1 for v6 writes#18889
[HUDI-18888] fix(streamer): downgrade source-emitted V2 checkpoint to V1 for v6 writes#18889nsivabalan wants to merge 2 commits into
Conversation
… V1 for v6 writes HoodieStreamer was persisting a V2 checkpoint key on writes pinned to hoodie.write.table.version=6. CheckpointUtils.shouldTargetCheckpointV2 returns false for writeTableVersion < 8, so this violated the contract. Root cause: DFSPathSelector.getNextFilePathsAndMaxModificationTime (lines 154 & 160) and many other source helpers construct new StreamerCheckpointV2(...) unconditionally. *DFSSource implementations pipe that V2 checkpoint into InputBatch#getCheckpointForNextBatch, and StreamSync.extractCheckpointMetadata trusted the type blindly. The very next code path two lines below (the no-batch fallback) already calls buildCheckpointFromGeneralSource and gets the version contract right — so this was a missed branch in an existing chokepoint, not a missing primitive. Fix: type-based normalization at the chokepoint in StreamSync.extractCheckpointMetadata. When the source returns a checkpoint, coerce its concrete type to whatever shouldTargetCheckpointV2(versionCode, sourceClassName) permits before calling getCheckpointCommitMetadata. Mirrors the inbound translation already done in Source.translateCheckpoint. Coverage audit: all 17 new StreamerCheckpointV2(...) construction sites in hudi-utilities/src/main funnel through this chokepoint — DFS, Kafka, Kinesis, Pulsar, Jdbc, Sql, Hive, HoodieIncrSource, GcsEventsSource, DatePartitionPathSelector, Debezium, S3EventsMetaSelector. The DATASOURCES_NOT_SUPPORTED_WITH_CKPT_V2 carve-out (S3/Gcs HoodieIncrSource) is preserved: those sources stay V1 even on v8+ writes. Tests: - testExtractCheckpointMetadata_V2FromSourceDowngradedToV1OnV6Write covers 19 source classes for the v6 downgrade direction. - testExtractCheckpointMetadata_V1FromSourceUpgradedToV2OnV8Write covers the v8 upgrade direction plus the S3/Gcs IncrSource carve-out. All 28 TestStreamSync#testExtractCheckpointMetadata* pass. Checkstyle: 0 violations. Fixes: apache#18888
testBulkInsertRowWriterWithSchemaProviderAndTransformer is parameterized
over {SIX, EIGHT, NINE} but the shared assertUseV2Checkpoint helper
asserted V2 unconditionally. Pre-fix that was wrong-but-green for v6
(writer was incorrectly persisting V2). With the StreamSync chokepoint
downgrade in place, v6 commits now correctly carry V1, so the v6
assertion needs to flip.
Branch the assertion on the on-disk table version: V2 for >= 8, V1 for
v6. Mirrors CheckpointUtils.shouldTargetCheckpointV2 boundary.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #18889 +/- ##
============================================
- Coverage 68.81% 68.81% -0.01%
- Complexity 29136 29148 +12
============================================
Files 2515 2515
Lines 139939 139944 +5
Branches 17190 17189 -1
============================================
- Hits 96302 96296 -6
- Misses 35859 35867 +8
- Partials 7778 7781 +3
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the fix! This is a well-targeted normalization at the StreamSync.extractCheckpointMetadata chokepoint so source-emitted V2 checkpoints don't get stamped onto v6 commits. I traced the coercion paths: both StreamerCheckpointV1 and StreamerCheckpointV2 copy constructors copy the base checkpointKey/checkpointResetKey/checkpointIgnoreKey fields, and the S3/Gcs IncrSources stay V1 via the DATASOURCES_NOT_SUPPORTED_WITH_CKPT_V2 carve-out (so the only subclass carrying string-encoded composite state — CloudObjectIncrCheckpoint — never hits the V2→V1 branch in practice). The mirror of Source.translateCheckpoint on the outbound side is a nice symmetry. No correctness issues found. A few style/readability suggestions in the inline comments. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. One naming nit on the test helper and a minor double-call pattern worth cleaning up while the local variable is being introduced.
cc @yihua
| // V1 vs V2 is driven by CheckpointUtils.shouldTargetCheckpointV2 — for v6 tables the contract | ||
| // is V1, so the assertion is conditional on the on-disk table version. | ||
| if (metaClient.getTableConfig().getTableVersion().versionCode() >= HoodieTableVersion.EIGHT.versionCode()) { | ||
| assertTrue(extraMetadata.containsKey(STREAMER_CHECKPOINT_KEY_V2)); |
There was a problem hiding this comment.
🤖 nit: the method is still called assertUseV2Checkpoint but the body now conditionally checks either V1 or V2 depending on table version — could you rename it to something like assertCheckpointMatchesTableVersion so future callers aren't surprised?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| if (inputBatch.getCheckpointForNextBatch() != null) { | ||
| return inputBatch.getCheckpointForNextBatch() | ||
| .getCheckpointCommitMetadata(cfg.checkpoint, cfg.ignoreCheckpoint); | ||
| Checkpoint sourceCheckpoint = inputBatch.getCheckpointForNextBatch(); |
There was a problem hiding this comment.
🤖 nit: since you're already introducing a local sourceCheckpoint, could you assign it before the if and null-check the variable instead? That avoids calling getCheckpointForNextBatch() twice and reads more cleanly as Checkpoint sourceCheckpoint = inputBatch.getCheckpointForNextBatch(); if (sourceCheckpoint != null) { … }.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| boolean targetV2 = CheckpointUtils.shouldTargetCheckpointV2(versionCode, cfg.sourceClassName); | ||
| if (targetV2 && sourceCheckpoint instanceof StreamerCheckpointV1) { |
There was a problem hiding this comment.
We should fix individual Source implementations to pick the right checkpoint version instead of converting the checkpoint here, as each source can have its own logic of determining which checkpoint version to use.
Change Logs
When
HoodieStreameringests withhoodie.write.table.version=6, the persisted commit metadata was carrying the V2 checkpoint keystreamer.checkpoint.key.v2instead of the V1 keydeltastreamer.checkpoint.key.CheckpointUtils.shouldTargetCheckpointV2returns false forwriteTableVersion < 8, so this violated the contract.Root cause:
DFSPathSelector.getNextFilePathsAndMaxModificationTime(lines 154 & 160) and many other source helpers unconditionally constructnew StreamerCheckpointV2(...).*DFSSourceimplementations pipe that V2 checkpoint intoInputBatch#getCheckpointForNextBatch, andStreamSync.extractCheckpointMetadatatrusted the type blindly. The very next code path two lines below (the no-batch fallback) already callsbuildCheckpointFromGeneralSourceand gets the version contract right — so this was a missed branch in an existing chokepoint, not a missing primitive.Fix: type-based normalization at the chokepoint in
StreamSync.extractCheckpointMetadata. When the source returns a checkpoint, coerce its concrete type to whatevershouldTargetCheckpointV2(versionCode, sourceClassName)permits before callinggetCheckpointCommitMetadata. Mirrors the inbound translation already done inSource.translateCheckpoint.Coverage audit: all 17
new StreamerCheckpointV2(...)construction sites inhudi-utilities/src/mainfunnel through this chokepoint — DFS family, Kafka family, Kinesis, Pulsar, Jdbc, Sql, Hive, HoodieIncrSource, GcsEventsSource, DatePartitionPathSelector, Debezium, S3EventsMetaSelector. TheDATASOURCES_NOT_SUPPORTED_WITH_CKPT_V2carve-out (S3EventsHoodieIncrSource / GcsEventsHoodieIncrSource) is preserved: those sources stay V1 even on v8+ writes.Impact
Bug fix: prevents the wrong checkpoint key from being persisted on v6 tables. Resumed ingestion against a v6 table will now correctly continue reading
deltastreamer.checkpoint.keyfrom prior commits.Risk level
low
Documentation Update
No user-facing config or API change.
Contributor's checklist
Test Plan
testExtractCheckpointMetadata_V2FromSourceDowngradedToV1OnV6Write— 19 source classes covering DFS, Kafka, Kinesis, Pulsar, Jdbc, Sql, SqlFileBased, Hive, Hoodie/S3Events/GcsEvents IncrSources, GcsEventsSource. All pass.testExtractCheckpointMetadata_V1FromSourceUpgradedToV2OnV8Write— 6 cases including the S3/Gcs IncrSource carve-out. All pass.TestStreamSync#testExtractCheckpointMetadata*— 28/28 pass.mvn -pl hudi-utilities checkstyle:check— 0 violations.Closes #18888