Skip to content

[PIP-30] Improve Paimon committer in Flink#7963

Open
fishfishfishfishaa wants to merge 6 commits into
apache:masterfrom
fishfishfishfishaa:yg-pip3-2026
Open

[PIP-30] Improve Paimon committer in Flink#7963
fishfishfishfishaa wants to merge 6 commits into
apache:masterfrom
fishfishfishfishaa:yg-pip3-2026

Conversation

@fishfishfishfishaa
Copy link
Copy Markdown

Purpose

Following PIP-30 (Improvement For Paimon Committer In Flink) , this PR implements the Paimon Write Coordinator (PWC) to replace the current CommitOperator with a JobManager-level OperatorCoordinator, eliminating the network shuffle bottleneck for commit messages.

Key Design Decision: Custom HDFS State

Instead of using Flink's native StateBackend, I chose custom HDFS state management because:

  1. Timing mismatch: Flink's operator state snapshot occurs after snapshotState() returns, but PWC needs to persist aggregated messages before acknowledging WriteOperators.
  2. Explicit recovery control: Enables clean resetToCheckpoint() logic with idempotent commit.
  3. Decoupled cleanup: HDFS state deletion independent of Flink checkpoint retention policies.

State path: <flink-checkpoint-dir>/pwc/<operatorId>/checkpoint-{ckId}.state

Current Scope

  • Supported: FixedBucketSink

Testing

  • Added unit tests for recovery logic

Related Issue

fix #2641

Copy link
Copy Markdown
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a significant architectural change (PIP-30). A few high-level concerns:

Design Questions:

  1. Custom HDFS state vs Flink StateBackend: I understand the timing mismatch rationale, but custom state management introduces its own complexity:

    • Who cleans up stale state files when jobs are cancelled/fail without proper cleanup?
    • What happens if the HDFS path is not accessible (permissions, HA failover)?
    • How does this interact with Flink's checkpoint retention policies?
  2. State path collision: The path <flink-checkpoint-dir>/pwc/<operatorId>/checkpoint-{ckId}.state lives inside Flink's checkpoint directory but is managed independently. Could Flink's checkpoint cleanup accidentally delete these files? Or could stale PWC state files accumulate unboundedly?

  3. Scope limitation: Currently only supports FixedBucketSink. What's the plan for UnawareBucketSink and DynamicBucketSink? The design should be extensible to these without requiring a second large refactoring.

Code Comments:

  1. DiscardingSink for committed stream: In FlinkSink.doCommit(), when coordinator is enabled, the committed stream is discarded via written.sinkTo(new DiscardingSink<>()).name("end").setParallelism(1). Why parallelism 1? If the write operator has high parallelism, this creates a bottleneck for the stream before discarding. Can you just set it to the same parallelism as writers?

  2. Error handling in coordinator: What happens when the coordinator fails to commit (e.g., conflict with another writer)? How is this surfaced to the Flink job? Does it trigger a failover?

  3. Testing: The current tests cover recovery logic, but I'd like to see:

    • An end-to-end IT test that validates data correctness after checkpoint/restore
    • A test for concurrent writes (multiple subtasks committing in the same checkpoint)
    • A test for the cancelled job → stale state scenario

This is promising work but given its complexity, it might benefit from a more detailed design doc on the wiki for community discussion before merging.

Copy link
Copy Markdown
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this feature is strongly bound to HDFS, then there must be a problem, and we definitely need to support object storage.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] Support bucket commit in paimon

2 participants