Skip to content

Kafka Connect: Make CommitState.isCommitReady() O(1)#16453

Open
HenryCaiHaiying wants to merge 2 commits into
apache:mainfrom
HenryCaiHaiying:kafka-connect-commit-state-perf-16361
Open

Kafka Connect: Make CommitState.isCommitReady() O(1)#16453
HenryCaiHaiying wants to merge 2 commits into
apache:mainfrom
HenryCaiHaiying:kafka-connect-commit-state-perf-16361

Conversation

@HenryCaiHaiying
Copy link
Copy Markdown

This PR addresses the issue mentioned in #16361

Previously, each DATA_COMPLETE envelope triggered a re-scan of the entire readyBuffer to count received partitions, making per-commit work O(N^2) in the number of buffered messages. Under control-topic backlog this compounded the backlog and made recovery harder.

The fix: maintain a running receivedPartitionCount that is incremented in addReady() and reset in endCurrentCommit(). isCommitReady() becomes a constant-time comparison against expectedPartitionCount.

The change looks simple but we also verified the edge cases on when the current commit failed and a new start_commit starts, also in the situation when there is 2 Coordinator running (the previous Coordinator didn't terminate and comes back as a zombie coordinator). In both situations, we maintain the logic before and post the code change. Added 2 new unit tests to cover the situation before and after the commit and a situation when there is a DATA message generated from a zombie coordinator.

Closes #16361

@HenryCaiHaiying HenryCaiHaiying force-pushed the kafka-connect-commit-state-perf-16361 branch from 30495cc to e35cdff Compare May 21, 2026 00:14
This PR addresses the issue mentioned in
apache#16361

Previously, each DATA_COMPLETE envelope triggered a re-scan of the
entire readyBuffer to count received partitions, making per-commit
work O(N^2) in the number of buffered messages. Under control-topic
backlog this compounded the backlog and made recovery harder.

The fix: maintain a running receivedPartitionCount that is
incremented in addReady() and reset in endCurrentCommit().
isCommitReady() becomes a constant-time comparison against
expectedPartitionCount.

The change looks simple, but we also verified the edge cases when
the current commit fails and a new start_commit starts, and also
the situation where there are 2 Coordinators running (the previous
Coordinator didn't terminate and comes back as a zombie
coordinator). In both situations, we maintain the same logic before
and after the code change. Added 2 new unit tests to cover the
behavior before and after the commit, and a situation where a DATA
message is generated from a zombie coordinator.

Closes apache#16361
Set offset.flush.interval.ms=500 in IntegrationTestBase.createCommonConfig
so the connector's WorkerSinkTask consumer poll returns promptly (every
500ms) instead of inheriting the 60s default, which exceeds the test's
30s Awaitility budget on a slow CI runner.

This addresses the intermittent CI failure in
TestIntegrationDynamicTable.testIcebergSink at IntegrationTestBase:237
where the assertion timed out waiting for the first snapshot to appear.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Kafka Connect: Coordinator's check on commitState.isCommitReady() is inefficient

1 participant