Fix EmbeddedKafkaCluster startup/teardown ordering in integration tests#17855
Conversation
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #17855 +/- ##
============================================
- Coverage 63.26% 63.25% -0.01%
- Complexity 1460 1466 +6
============================================
Files 3190 3190
Lines 192011 192011
Branches 29412 29412
============================================
- Hits 121469 121453 -16
- Misses 61026 61044 +18
+ Partials 9516 9514 -2
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
38a7ac1 to
25a5c3e
Compare
- Add getKafkaExtraProperties() hook in BaseClusterIntegrationTest for subclasses to pass custom Kafka broker config - Update EmbeddedKafkaCluster to forward extra config properties to KafkaClusterTestKit builder - Set log.flush.interval.messages=1 in ExactlyOnceKafka test to ensure transactional data is flushed to disk immediately - Fix timeout message mismatch (was "60s", actual deadline is 120s) - Add retry logic for realtime table creation when Kafka topic metadata is not yet available Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
25a5c3e to
3435faf
Compare
There was a problem hiding this comment.
Pull request overview
This PR aims to make integration tests more reliable by enforcing a consistent embedded ZooKeeper/Kafka/Pinot startup and teardown order, reducing Kafka-related leaks and cross-test interference.
Changes:
- Reorders Kafka startup to occur after ZooKeeper but before Controller in multiple integration tests.
- Reorders Kafka shutdown to occur after Controller but before ZooKeeper in suite/test teardowns.
- Adds support for passing extra Kafka broker config into the embedded Kafka cluster and uses it in the ExactlyOnce Kafka integration test.
Reviewed changes
Copilot reviewed 17 out of 17 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/server/EmbeddedKafkaCluster.java | Adds forwarding of extra broker config properties into the Kafka testkit builder. |
| pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java | Introduces getKafkaExtraProperties() and passes it into embedded Kafka startup. |
| pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java | Overrides extra Kafka broker properties; increases verification timeout messaging. |
| pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseRealtimeClusterIntegrationTest.java | Moves startKafka() earlier in startup ordering. |
| pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasePauselessRealtimeIngestionTest.java | Moves startKafka() earlier in startup ordering and removes later duplicate start. |
| pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.java | Moves startKafka() earlier in startup ordering and removes later duplicate start. |
| pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java | Moves startKafka() earlier in startup ordering. |
| pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java | Moves startKafka() earlier in startup ordering. |
| pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalancePauselessIntegrationTest.java | Moves startKafka() earlier in startup ordering. |
| pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StaleSegmentCheckIntegrationTest.java | Moves Kafka start earlier and adds missing stopKafka() in teardown. |
| pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RetentionManagerIntegrationTest.java | Moves Kafka start earlier and adds a new @AfterClass teardown that stops Kafka. |
| pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryWorkloadIntegrationTest.java | Moves Kafka start earlier and adds a new @AfterClass teardown that stops Kafka. |
| pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java | Moves Kafka start earlier and adds a new @AfterClass teardown that stops Kafka. |
| pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkUpsertTableIntegrationTest.java | Moves Kafka start earlier and adds a new @AfterClass teardown that stops Kafka. |
| pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BrokerQueryLimitTest.java | Moves Kafka stop later in teardown ordering. |
| pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java | Moves Kafka stop later in suite teardown ordering. |
| pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java | Moves Kafka stop later in suite teardown ordering. |
You can also share your feedback on Copilot code review. Take the survey.
...ka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/server/EmbeddedKafkaCluster.java
Show resolved
Hide resolved
...sts/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkUpsertTableIntegrationTest.java
Outdated
Show resolved
Hide resolved
...integration-tests/src/test/java/org/apache/pinot/integration/tests/BrokerQueryLimitTest.java
Outdated
Show resolved
Hide resolved
...n-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
Show resolved
Hide resolved
...-tests/src/test/java/org/apache/pinot/integration/tests/RetentionManagerIntegrationTest.java
Outdated
Show resolved
Hide resolved
...ion-tests/src/test/java/org/apache/pinot/integration/tests/QueryWorkloadIntegrationTest.java
Outdated
Show resolved
Hide resolved
...integration-tests/src/test/java/org/apache/pinot/integration/tests/BrokerQueryLimitTest.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java
Show resolved
Hide resolved
…xtra props - Wrap all @afterclass tearDown methods in try/finally with FileUtils.deleteQuietly for reliable temp directory cleanup - Fix BrokerQueryLimitTest duplicate deleteDirectory and wrong ordering - Replace System.err.println with SLF4J LOGGER in ExactlyOnce test - Clear _extraConfigProps at start of init() in EmbeddedKafkaCluster Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 17 out of 17 changed files in this pull request and generated 9 comments.
You can also share your feedback on Copilot code review. Take the survey.
...st/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
Show resolved
Hide resolved
...-tests/src/test/java/org/apache/pinot/integration/tests/RetentionManagerIntegrationTest.java
Show resolved
Hide resolved
...ion-tests/src/test/java/org/apache/pinot/integration/tests/QueryWorkloadIntegrationTest.java
Show resolved
Hide resolved
...integration-tests/src/test/java/org/apache/pinot/integration/tests/BrokerQueryLimitTest.java
Show resolved
Hide resolved
...tests/src/test/java/org/apache/pinot/integration/tests/StaleSegmentCheckIntegrationTest.java
Show resolved
Hide resolved
| } | ||
| } catch (Exception e) { | ||
| System.err.println("[ExactlyOnce] Error counting records with " + isolationLevel + ": " + e.getMessage()); | ||
| LOGGER.error("Error counting records with {}: {}", isolationLevel, e.getMessage()); |
There was a problem hiding this comment.
The error log drops the exception stack trace by only logging e.getMessage(). Including the Throwable as the last parameter will preserve the full stack trace, which is especially useful for diagnosing flaky integration test failures.
| LOGGER.error("Error counting records with {}: {}", isolationLevel, e.getMessage()); | |
| LOGGER.error("Error counting records with {}", isolationLevel, e); |
...st/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
Show resolved
Hide resolved
...sts/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java
Show resolved
Hide resolved
...src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkUpsertTableIntegrationTest.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 17 out of 17 changed files in this pull request and generated 4 comments.
You can also share your feedback on Copilot code review. Take the survey.
Summary
@AfterClasstearDown methods withstopKafka()to 5 tests that were starting Kafka but never stopping itgetKafkaExtraProperties()hook inBaseClusterIntegrationTestfor subclasses to pass custom Kafka broker config toEmbeddedKafkaCluster(used byExactlyOnceKafkaRealtimeClusterIntegrationTestto setlog.flush.interval.messages=1for transactional test stability)@AfterClasstearDown methods intry/finallywithFileUtils.deleteQuietlyfor reliable temp directory cleanupSystem.err.printlnwith SLF4J logging inExactlyOnceKafkaRealtimeClusterIntegrationTestFiles changed
Startup ordering fixes (10 files — moved
startKafka()beforestartController()):BaseRealtimeClusterIntegrationTest.javaCLPEncodingRealtimeIntegrationTest.javaQueryWorkloadIntegrationTest.javaRetentionManagerIntegrationTest.javaStaleSegmentCheckIntegrationTest.javaPinotSinkUpsertTableIntegrationTest.javaPartialUpsertTableRebalanceIntegrationTest.javaNullHandlingIntegrationTest.javaPauselessRealtimeIngestionSegmentCommitFailureTest.javaBasePauselessRealtimeIngestionTest.javaTeardown ordering fixes (3 files — moved
stopKafka()afterstopController()):BrokerQueryLimitTest.javaCustomDataQueryClusterIntegrationTest.javaBaseLogicalTableIntegrationTest.javaAdded missing tearDown with
stopKafka()(5 files):CLPEncodingRealtimeIntegrationTest.java— no tearDown at allQueryWorkloadIntegrationTest.java— no tearDown at allRetentionManagerIntegrationTest.java— no tearDown at allPinotSinkUpsertTableIntegrationTest.java— no tearDown at allStaleSegmentCheckIntegrationTest.java— tearDown existed but was missingstopKafka()Kafka extra properties support (3 files):
EmbeddedKafkaCluster.java— forward extra config props toKafkaClusterTestKitbuilder, clear on re-initBaseClusterIntegrationTest.java— addgetKafkaExtraProperties()hook, merge into Kafka startupExactlyOnceKafkaRealtimeClusterIntegrationTest.java— override hook to setlog.flush.interval.messages=1Teardown robustness (6 files — wrap in try/finally with deleteQuietly):
CLPEncodingRealtimeIntegrationTest.javaQueryWorkloadIntegrationTest.javaRetentionManagerIntegrationTest.javaPinotSinkUpsertTableIntegrationTest.javaBrokerQueryLimitTest.java(also fixed duplicate deleteDirectory and ordering)ExactlyOnceKafkaRealtimeClusterIntegrationTest.java(replaced System.err with SLF4J)Test plan
ExactlyOnceKafkaRealtimeClusterIntegrationTestpasses (9 tests, 0 failures)🤖 Generated with Claude Code