Skip to content

Kafka Connect: Prevent zombie coordinator during rebalance#16156

Open
kumarpritam863 wants to merge 26 commits into
apache:mainfrom
kumarpritam863:fix/kafka-connect-zombie-coordinator
Open

Kafka Connect: Prevent zombie coordinator during rebalance#16156
kumarpritam863 wants to merge 26 commits into
apache:mainfrom
kumarpritam863:fix/kafka-connect-zombie-coordinator

Conversation

@kumarpritam863
Copy link
Copy Markdown
Contributor

Summary

  • Make stopCoordinator() synchronous by joining the CoordinatorThread before clearing the reference, ensuring the old coordinator is fully stopped (Kafka clients closed, executor drained) before a new coordinator can start on the same or different task.

Problem

During a Kafka Connect rebalance, CommitterImpl.close() calls stopCoordinator() which sets the termination flag and shuts down the executor, but returns before the CoordinatorThread has fully exited its run() loop and closed its Kafka producer/consumer/admin clients. If open() is called on the new leader task before the old thread finishes, two coordinators can briefly run simultaneously:

  • The old coordinator's executor thread may still be inside commitToTable() executing an Iceberg commit (e.g. an HTTP call to Glue/HMS) that doesn't respond to interrupts.
  • The old coordinator's Kafka consumer remains in the -coord consumer group, splitting control topic partitions with the new coordinator's consumer during the overlap window.
  • Both coordinators can send events to the control topic and attempt Iceberg commits concurrently.

While Iceberg's optimistic concurrency (CAS) and the SnapshotAncestryValidator prevent corrupt commits, the race window can cause spurious CommitFailedException errors, partial control topic partition visibility, and unnecessary commit retries.

Fix

  • Add coordinatorThread.join() after coordinatorThread.terminate() in stopCoordinator().
  • Since terminate() already calls coordinator.terminate() which blocks up to 1 minute for the executor to drain, the additional join() only waits for the thread's final cleanup (coordinator.stop() — closing Kafka clients), adding negligible overhead.
  • This guarantees the old coordinator is fully dead before stopCoordinator() returns, closing the race window entirely.

@kumarpritam863
Copy link
Copy Markdown
Contributor Author

@danielcweeks can you please review.

@kumarpritam863
Copy link
Copy Markdown
Contributor Author

@bryanck can you please review.

@kumarpritam863
Copy link
Copy Markdown
Contributor Author

@danielcweeks can you please take a look.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant