Skip to content

[Fix][Connector-V2][Iceberg] Fix IcebergAggregatedCommitter atomic commit per checkpoint#10714

Merged
dybyte merged 10 commits intoapache:devfrom
suhyeon729:fix/iceberg-aggregated-committer
Apr 22, 2026
Merged

[Fix][Connector-V2][Iceberg] Fix IcebergAggregatedCommitter atomic commit per checkpoint#10714
dybyte merged 10 commits intoapache:devfrom
suhyeon729:fix/iceberg-aggregated-committer

Conversation

@suhyeon729
Copy link
Copy Markdown
Contributor

@suhyeon729 suhyeon729 commented Apr 5, 2026

Fixes: #10710

Purpose of this pull request

IcebergAggregatedCommitter had two related correctness issues:

  1. Non-atomic commits (N snapshots per checkpoint)
    commitFiles() called filesCommitter.doCommit() once per worker's IcebergCommitInfo, producing N Iceberg snapshots per checkpoint
    (where N = parallelism) instead of a single atomic snapshot. This caused partial data visibility if any commit failed mid-way,
    and snapshot proliferation proportional to parallelism.

  2. Duplicate rows on recovery (idempotency gap)
    On pipeline restore, IcebergSinkWriter.preCommit() re-submitted already-committed WriteResults, causing duplicate rows in Iceberg
    tables.

Changes:

  • Merge all workers' WriteResults into a single doCommit() call per checkpoint → one atomic Iceberg snapshot per checkpoint
  • Add checkpoint ID as Iceberg snapshot summary property (seatunnel.checkpoint-id) for idempotency tracking
  • Implement restoreCommit() in IcebergFilesCommitter to skip already-committed checkpoints on recovery
  • Move commit responsibility fully to IcebergAggregatedCommitter; remove filesCommitter/preCommit() from IcebergSinkWriter
  • Add checkpointId to IcebergCommitInfo and propagate via prepareCommit(checkpointId)
  • Simplify IcebergSinkState to no longer carry WriteResults

Does this PR introduce any user-facing change?

Yes. Previously, with parallelism N, each checkpoint produced N Iceberg snapshots instead of 1. This caused:

  • Partial data visibility: if a commit failed mid-way, readers could see incomplete checkpoint data
  • Duplicate rows on restore: preCommit() on pipeline restore re-committed already-committed files
  • Snapshot proliferation: Iceberg metadata grew proportionally to parallelism

After this fix, each checkpoint produces exactly 1 atomic snapshot regardless of parallelism, and recovery correctly skips already-committed checkpoints.

How was this patch tested?

Unit tests:

  • IcebergAggregatedCommitterTest — verifies N workers produce exactly 1 doCommit() call; empty results are skipped; multiple
    checkpoints each produce exactly one snapshot
  • IcebergFilesCommitterTest — verifies checkpoint ID is written to snapshot summary;
    isAlreadyCommitted() correctly identifies already-committed checkpoints
    (including legacy path without checkpoint-id property)
  • IcebergSinkWriterTest — verifies prepareCommit() propagates checkpoint ID and no longer holds WriteResults

E2E tests (IcebergSinkParallelCommitIT):

  • Streaming parallel commit — N workers, each checkpoint produces exactly 1 snapshot
  • Batch parallel commit — same atomicity guarantee in batch mode
  • RowDelta upsert — atomicity preserved with merge-on-read writes
  • Crash-recovery — no duplicate rows after restore from mid-checkpoint failure

Check list

Comment thread seatunnel-connectors-v2/connector-iceberg/pom.xml Outdated
@nzw921rx
Copy link
Copy Markdown
Collaborator

nzw921rx commented Apr 5, 2026

good job,and the unit tests cover the merge logic well.

Could we also add an E2E test for this? Since it changes checkpoint commit behavior, it would be good to validate in streaming mode with parallelism > 1 and verify one snapshot per checkpoint.

Copy link
Copy Markdown

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pulled this locally and traced both the normal checkpoint-complete path and the restore path.

The change in IcebergAggregatedCommitter.commitFiles() does fix the main aggregated-commit path: SinkAggregatedCommitterTask.notifyCheckpointComplete() now merges all workers' WriteResults into a single filesCommitter.doCommit(...), so the normal checkpoint-close path produces one Iceberg commit instead of N per-worker commits.

However, I don't think the PR fully fixes the restore-side semantics described in the issue.

On recovery, the engine replays both:

  • writer state via SinkFlowLifeCycle.restoreState() -> IcebergSink.restoreWriter() -> IcebergSinkWriter.preCommit(states) (IcebergSinkWriter.java:81-91)
  • aggregated commit state via SinkAggregatedCommitterTask.restoreState() -> aggregatedCommitter.restoreCommit(...) (SinkAggregatedCommitterTask.java:262-276, SinkAggregatedCommitter.java:42-45)

This PR only changes the aggregated committer path. IcebergSinkWriter.preCommit() still calls filesCommitter.doCommit(...) directly for each restored writer state, so after a recovery we can still replay already-committed files outside the new merged path.

Because of that, I don't think the broader "exactly 1 snapshot per checkpoint regardless of parallelism" / "duplicate rows on restore are fixed" claim is fully established yet.

Could we either:

  1. make the aggregated committer the only replay path when this sink uses an aggregated committer, or
  2. make the writer-side replay explicitly idempotent (for example by using commitUser / checkpoint metadata) and add a recovery regression test?

Since issue #10710 explicitly calls out restore behavior, I think we need to close this gap before calling the fix complete.

Copy link
Copy Markdown

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Superseded by my blocking review #10714 (review), which keeps the clearer restore-path conclusion for this PR. I am shortening this earlier note to avoid a duplicate mixed signal.

@suhyeon729
Copy link
Copy Markdown
Contributor Author

@DanielLeens

Thanks for the detailed review — this helped clarify the gap on the restore path.

Based on your feedback, I’m planning to make the aggregated committer the single commit path for both normal execution and recovery. In particular, I’ll remove the writer-side commit during restore so that IcebergSinkWriter no longer calls doCommit() in preCommit(). All commit and replay flows will go through IcebergAggregatedCommitter, including an explicit restoreCommit() implementation that merges all worker results and performs a single commit.

During recovery, since restoreCommit() becomes the only commit entry point, we need a way to distinguish whether a failure happened before or after the commit. To handle this, I’m planning to use a checkpoint ID–based approach: persist the checkpointId in the Iceberg snapshot summary at commit time, and on recovery check whether a snapshot with the same checkpoint ID already exists in the metastore. If it exists, the commit will be skipped; otherwise, it will proceed. For backward compatibility (older checkpoints without checkpointId), I’m considering a manifest-based fallback.

Let me know if this direction makes sense, or if you’d recommend a different approach for handling idempotency during recovery.

@DanielLeens
Copy link
Copy Markdown

Thanks for the detailed follow-up, and yes, the direction makes sense to me.

I re-checked the current code at the latest PR head, and the blocking gap is still present today because the implementation has not changed yet:

  • IcebergSink.restoreWriter(...) still restores writer state into IcebergSinkWriter.of(..., states)
  • the writer constructor still calls preCommit(states)
  • preCommit(...) still does filesCommitter.doCommit(...) directly for each restored writer state

So the restore path is still bypassing the aggregated committer in the current code, even though the normal checkpoint-complete path is fixed.

I do think your proposed direction is the right one: make the aggregated committer the single restore/commit path, and remove the writer-side restore commit. If you still need idempotency after that, storing checkpointId in the snapshot summary and checking it during restoreCommit() sounds reasonable to me.

My suggestion would be to keep the next patch focused on that first structural step:

  1. remove the writer-side restore commit path
  2. let restore replay go only through IcebergAggregatedCommitter
  3. then add the checkpoint-level idempotency guard there

Once that code lands, I'm happy to re-review it. You're definitely moving in the right direction here.

davidzollo
davidzollo previously approved these changes Apr 9, 2026
Copy link
Copy Markdown
Contributor

@davidzollo davidzollo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 if CI passes.
LGTM, it'll be much better to add E2E just as @nzw921rx said.
In conclusion, thank you so much for contributing to Apache SeaTunnel — your participation is greatly welcomed!

Copy link
Copy Markdown

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for iterating on this. I pulled the latest branch locally and rechecked the restore path. The writer no longer commits restored state in its constructor, prepareCommit(long checkpointId) now carries checkpoint identity, and the aggregated committer restores through the idempotency check instead of double-committing restored files. That addresses the correctness issue I raised earlier. I don't see a new code blocker from the current implementation. The remaining item is the red Build check.

@suhyeon729 suhyeon729 force-pushed the fix/iceberg-aggregated-committer branch from ed88ace to 98ec9b2 Compare April 10, 2026 11:14
suhyeon729 and others added 7 commits April 10, 2026 22:09
…rkers per checkpoint and prevent duplicate commits on recovery

  Previously, IcebergAggregatedCommitter committed each worker's WriteResults                                                                                                                                                                                           as a separate Iceberg snapshot, breaking atomicity. On recovery, SinkWriter's
  preCommit() could re-apply already-committed checkpoints, causing duplicates.

  Changes:
  - Merge all workers' WriteResults into a single Iceberg snapshot per checkpoint
  - Add checkpoint ID as Iceberg snapshot summary property (seatunnel.checkpoint-id)
    for idempotency tracking
  - Implement restoreCommit() to skip already-committed checkpoints on recovery
  - Move commit responsibility fully to IcebergAggregatedCommitter; remove
    filesCommitter/results/preCommit() from IcebergSinkWriter
  - Add checkpointId to IcebergCommitInfo and propagate via prepareCommit(checkpointId)
  - Simplify IcebergSinkState to no longer carry WriteResults
  - Add unit tests (IcebergAggregatedCommitterTest, IcebergFilesCommitterTest,
    IcebergSinkWriterTest) and E2E tests covering streaming, batch, RowDelta upsert,
    and crash-recovery scenarios
…) for Java 8 compatibility in IcebergSinkWriterTest

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@suhyeon729 suhyeon729 force-pushed the fix/iceberg-aggregated-committer branch from 98ec9b2 to 8ec0684 Compare April 10, 2026 13:10
…ng scenario

  - Rename table/method iceberg_parallel_sink_table → iceberg_parallel_streaming_table to distinguish the streaming-parallelism test from the recovery test
  - Replace awaitility polling with direct assertions now that FakeSource terminates naturally on Zeta after all splits are consumed
  - Add @DisabledOnContainer(SPARK, FLINK) because FakeSource in STREAMING mode does not terminate on those engines
  - Set split.read-interval=3000 ms and split.num=4 so data is spread across multiple checkpoints, making the one-snapshot-per-checkpoint invariant observable
Copy link
Copy Markdown

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pulled the latest branch locally and re-traced the Iceberg sink write and restore paths.

Current runtime chain:

normal write
  -> IcebergSinkWriter.prepareCommit(checkpointId)
      -> returns IcebergCommitInfo only
  -> SinkAggregatedCommitterTask.notifyCheckpointComplete(...)
      -> IcebergAggregatedCommitter.commit(...)
      -> IcebergFilesCommitter.doCommit(..., checkpointId)

restore path
  -> IcebergSinkWriter.of(..., states)
      -> restores commitUser only, no writer-side doCommit()
  -> SinkAggregatedCommitterTask.restoreState(...)
      -> IcebergAggregatedCommitter.restoreCommit(...)
      -> IcebergFilesCommitter.isAlreadyCommitted(checkpointId, ...)
      -> skip already-persisted checkpoints

I also checked the delta since my previous review. The new commit is test-only:

  • IcebergSinkParallelCommitIT now scopes the streaming assertion to the Zeta engine, where FakeSource terminates naturally after exhausting splits.
  • fake_to_iceberg_parallel_streaming.conf now uses split.read-interval=3000 and split.num=4 so multiple checkpoints are guaranteed and the “one snapshot per checkpoint” invariant is actually observable.

I do not see a new code blocker in the current revision. The remaining item is the pending Build check.

@suhyeon729
Copy link
Copy Markdown
Contributor Author

@DanielLeens
Thank you for the detailed review and for tracing through the runtime paths carefully!

Looking back, I made two mistakes in the original test:

  1. Wrong assumption about FakeSource termination
    I assumed FakeSource would terminate naturally on Zeta after exhausting all splits in STREAMING mode. However, as the CI logs showed (running for 2.6 hours up to checkpoint ), STREAMING mode never emits a COMPLETED_POINT_TYPE barrier. This caused the blocking executeJob() call to wait for new data indefinitely, which was the direct cause of the build timeout.

  2. Inaccurate @DisabledOnContainer reasons
    The method-level disable for Flink was based on the same incorrect assumption above. Also, the class-level SPARK reason ("does not support aggregated committer semantics") was inaccurate, as the Spark translation layer does indeed wire up the SinkAggregatedCommitter.

I followed the patterns in IcebergSinkCDCIT and KafkaIT for handling streaming jobs:
using CompletableFuture.supplyAsync() for a fire-and-forget execution, combined with Awaitility.given().atMost(60s).untilAsserted(...). The job is now stopped implicitly during the test teardown when the Docker container is destroyed.

I also removed the unnecessary @DisabledOnContainer(SPARK, FLINK) at the method level and updated the class-level SPARK reason to accurately reflect the limitations of spark-submit --master local in this test environment.

I’ll keep an eye on the pending build check to ensure everything passes this time. Sorry for the noise, and thanks again for catching these issues!

…nd poll snapshot invariants

FakeSource in STREAMING mode never self-terminates, causing executeJob() to block indefinitely.
Replace synchronous execution with CompletableFuture.supplyAsync() and wrap snapshot assertions
in Awaitility polling (atMost 60s), removing the Flink/Spark @DisabledOnContainer restriction.
Copy link
Copy Markdown

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I re-pulled the latest HEAD locally and checked only the delta since my previous approval.

Current E2E chain is now:
start streaming job asynchronously
-> poll Iceberg table snapshots
-> assert distinct checkpoint ids in snapshot summary

The new commit only changes the test harness in IcebergSinkParallelCommitIT so it no longer blocks on synchronous completion of a non-terminating streaming FakeSource. The production write / restore path I approved earlier is unchanged, and the updated test still checks the same per-checkpoint snapshot invariant.

I do not see a new code blocker in the current revision. The remaining item is the pending Build check.

…nk containers in IcebergSinkParallelCommitIT
@suhyeon729
Copy link
Copy Markdown
Contributor Author

@nzw921rx @DanielLeens
Updated the E2E test and fixed the blocking issue as you suggested. All CI checks have now passed in my fork. Thanks again for the helpful feedback!

https://github.com/suhyeon729/seatunnel/actions/runs/13149723048

Copy link
Copy Markdown

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pulled the latest branch locally and rechecked both the write path and the restore path.

Current runtime chain:

normal checkpoint
  -> IcebergSinkWriter.prepareCommit(checkpointId)
      -> returns IcebergCommitInfo only
  -> SinkAggregatedCommitterTask.notifyCheckpointComplete(...)
      -> IcebergAggregatedCommitter.commit(...)
      -> IcebergFilesCommitter.doCommit(..., checkpointId)

restore path
  -> IcebergSinkWriter.of(..., states)
      -> restores commitUser only
  -> SinkAggregatedCommitterTask.restoreState(...)
      -> IcebergAggregatedCommitter.restoreCommit(...)
      -> filesCommitter.isAlreadyCommitted(...)
      -> skip already-persisted checkpoints

I also checked the delta since my previous review. The new commit is test-only:

  • IcebergSinkParallelCommitIT now scopes the streaming assertion to the environments where the checkpoint-to-snapshot invariant is actually observable
  • the batch E2E config was aligned with the expected row count

I do not see a new code blocker in the current revision. The remaining item is the pending Build check.

@suhyeon729
Copy link
Copy Markdown
Contributor Author

Hi @davidzollo, I hope this finds you well.

I've added the E2E test class IcebergSinkParallelCommitIT as requested a week ago.
The tests verify the following scenarios:

  • Streaming Parallelism: Confirms exactly one snapshot per checkpoint.
  • Recovery/Idempotency: Validates restoreCommit() avoids duplicate snapshots.
  • Batch Mode: Verifies data/snapshot consistency with parallelism.
  • Upsert Path: Checks RowDelta idempotency during commits.

Could you please take a look when you have a moment?
I’d appreciate your review to move this PR forward. Thanks!

@DanielLeens
Copy link
Copy Markdown

Hi @suhyeon729, I rechecked the current head locally as seatunnel-review-10714. The code head is unchanged since my previous approval; the latest activity is a follow-up ping.

The important chain is still:

IcebergSinkWriter.prepareCommit()
  -> emits checkpoint-bound IcebergCommitInfo
  -> IcebergAggregatedCommitter groups/commits by checkpoint
  -> recovery path uses persisted Iceberg sink state and avoids committing a later checkpoint before an earlier one

The added unit/E2E coverage still exercises the parallel commit and recovery behavior that mattered for this fix. The latest Build check is green.

Conclusion: can merge

No new blocker from my side; thanks for sticking with the recovery-path details.

Copy link
Copy Markdown
Contributor

@davidzollo davidzollo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1
Good job

Copy link
Copy Markdown
Contributor

@dybyte dybyte left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. Thanks for your contribution.

@dybyte dybyte merged commit fe182ba into apache:dev Apr 22, 2026
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug][Connector-V2] IcebergAggregatedCommitter commits per worker instead of single atomic commit per checkpoint

5 participants