Kafka Connect: Fix flaky integration tests by isolating iceberg.control.topic per test#16438
Open
wombatu-kun wants to merge 3 commits into
Open
Kafka Connect: Fix flaky integration tests by isolating iceberg.control.topic per test#16438wombatu-kun wants to merge 3 commits into
wombatu-kun wants to merge 3 commits into
Conversation
manuzhang
reviewed
May 20, 2026
`TestIntegrationDynamicTable#testIcebergSink` was flaky on the `kafka-connect-tests` job — 5 of the last 10 Kafka Connect CI runs on `main` failed on the same assertion (`IntegrationTestBase.java`, `assertThat(table.snapshots()).hasSize(1)` inside an `Awaitility.untilAsserted` block), almost always against the partitioned table `test.tbl1` and concentrated on the `[2] test_branch` parameterization. Larger Awaitility budgets (60s, then 120s) did not help, ruling out a "slow first commit" cause and pointing at cross-test state. Root cause is the shared control topic. `iceberg.control.topic` defaulted to `control-iceberg` and was reused across every connector lifecycle in every integration test method. With `iceberg.kafka.auto.offset.reset=earliest`, every new Coordinator joined a fresh consumer group on that topic and replayed the entire control-topic history from prior tests. Historical `DATA_COMPLETE` events fed to `Coordinator.receive` (see `Coordinator.java:140-145`) can hit `isCommitReady(totalPartitionCount)` and trigger commit cycles before the current test's events are processed, which on the partitioned table can produce a snapshot whose offsets the legitimate commit then fails to validate against via `Coordinator.offsetValidator` (`Coordinator.java:280`). That fits both the failure target (the partitioned `tbl1`) and the immunity to larger timeouts. Generate a unique control topic name in `IntegrationTestBase#baseBefore` (`control-iceberg-<uuid>`), pass it through `createCommonConfig` as `iceberg.control.topic`, and best-effort-delete it in `baseAfter` alongside the test topic so cleanup is symmetric. The control topic auto-creates on first publish so no explicit pre-creation is needed. All three integration test classes (`TestIntegration`, `TestIntegrationMultiTable`, `TestIntegrationDynamicTable`) route through `createCommonConfig`, so the override propagates uniformly. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The integration tests drive their workloads through a testcontainers docker compose stack (kafka, connect, iceberg REST catalog, minio). The Kafka Connect coordinator does the actual snapshot commit work, and its logs live in the Connect container's stdout — never in the JVM test worker. So when an Awaitility timeout surfaced as a bare AssertionError in CI, there was no way to see why the commit did not happen.
Attach a withLogConsumer for the connect, kafka, and iceberg services in TestContext, writing each service's container output to ${rootDir}/build/testlogs/<service>-container.log. The location is passed in from the integrationTest Gradle task via a `dockerLogDir` system property and falls back to a no-op when unset (so the constructor still works under IDEs or ad-hoc runs). The Kafka Connect CI artifact upload already covers `**/build/testlogs`, so on failure the docker logs come along automatically.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…sts in CI
The `integrationTest` Gradle task carried no `addTestOutputListener` and no `testLogging` block, so test-process stdout/stderr was lost and the Gradle console output for CI showed only a bare `FAILED` line and the assertion source location, with no stack trace or AssertJ description. The Kafka Connect CI workflow uploaded only `**/build/testlogs`, which is populated by the unit test task in the root `build.gradle` but not by `integrationTest`.
Mirror the existing `test` block from the root `build.gradle` inside the `integrationTest` task: stream per-test output to `${rootDir}/build/testlogs/${project.name}-integration.log` (a separate file from the unit-test log), and emit verbose `testLogging` events with `exceptionFormat "full"` on CI. Extend the Kafka Connect CI artifact upload to also include `**/build/reports/tests/integrationTest` so the HTML reports with per-test stack traces are preserved.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2cf942e to
1fb4bd3
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
TestIntegrationDynamicTable.testIcebergSinkhas been flaky onkafka-connect-tests— 5 of the last 10 Kafka Connect CI runs onmainitself failed on the same assertion (assertThat(table.snapshots()).hasSize(1)inside anAwaitility.untilAsserted(...)block), almost always against the partitioned tabletest.tbl1and concentrated on the[2] test_branchparameterization. Bumping the Awaitility budget from 30s to 60s and then to 120s did not help — ruling out a "slow first commit" cause and pointing at cross-test state pollution.Root cause
The Iceberg sink's control topic was shared.
iceberg.control.topicdefaulted tocontrol-icebergand was reused across every connector lifecycle in every integration test method. Withiceberg.kafka.auto.offset.reset=earliest, every newCoordinatorjoined a fresh consumer group on that topic and replayed the entire control-topic history from prior tests. HistoricalDATA_COMPLETEevents fed intoCoordinator.receive(seeCoordinator.java:140-145) can hitisCommitReady(totalPartitionCount)and trigger commit cycles before the current test's events are processed, which on a partitioned table can produce a snapshot whose offsets the legitimate commit then fails to validate against viaCoordinator.offsetValidator(Coordinator.java:280). That explains both the concentration on the partitionedtbl1(where the offset validator is strict) and the immunity to larger Awaitility budgets.Fix
Generate a unique control topic name per test in
IntegrationTestBase#baseBefore(control-iceberg-<uuid>), pass it throughcreateCommonConfigasiceberg.control.topic, and best-effort-delete it inbaseAfteralongside the test topic so cleanup is symmetric. The control topic auto-creates on first publish so no explicit pre-creation is needed. All three integration test classes (TestIntegration,TestIntegrationMultiTable,TestIntegrationDynamicTable) route throughcreateCommonConfig, so the override propagates uniformly. No production source changes — only test scaffolding.Diagnostic improvements (bundled in this PR)
Two follow-on commits add CI plumbing that made root-causing this flake possible, and that future Kafka Connect integration-test failures will benefit from automatically:
Capture docker container logs — attach a testcontainers
withLogConsumerfor theconnect,kafka, andicebergservices inTestContext, writing each service's container output to${rootDir}/build/testlogs/<service>-container.log. The Iceberg sink coordinator runs inside the Connect container, and its logs were never reaching the JVM test worker before this change. The Kafka Connect CI artifact upload already covers**/build/testlogs, so docker logs come along on failure automatically.Capture per-test output and integration test reports — mirror the existing
testblock from the rootbuild.gradleinside theintegrationTesttask (addTestOutputListenerwriting to${rootDir}/build/testlogs/${project.name}-integration.log+testLogging { exceptionFormat "full" }), and extend the Kafka Connect CI artifact upload path to also include**/build/reports/tests/integrationTest. Previously a failing integration test surfaced as a bareFAILEDline in CI with no stack trace, no AssertJ description, no sink-side logs — now the HTML reports with per-test stack traces are preserved.Verification
Three consecutive Kafka Connect CI runs passed on this branch with the fix in place (one of them with
ensureConnectorRemovedremoved, confirming that control-topic isolation alone is sufficient and the cleanup wait was redundant). Baseline failure rate onmainwas 5/10; probability of three consecutive random greens is below 1/8, and the fix matches a hypothesis-driven root cause that explains the specific failure target.Commits
Kafka Connect: Isolate iceberg.control.topic per integration test— the actual fixKafka Connect: Capture docker container logs for integration tests— diagnostic plumbingKafka Connect: Capture per-test output and reports for integration tests in CI— diagnostic plumbing