Skip to content

Commit 0412be9

Browse files
authored
KAFKA-19641: Fix flaky RestoreIntegrationTest#shouldInvokeUserDefinedGlobalStateRestoreListener (#20419)
What I observed is that if I run both combinations useNewProtocol=true, useNewProtocol=false it would often fail the second time, but if I only run the second variation useNewProtocol=false it works, and only the first variation useNewProtocol=true also works. So this points to some state that is not cleared between the tests - and indeed, the test creates a topic “inputTopic”, produces to it, but doesn’t delete it, so the second variation will run with produce to it again and then run with twice the data. I also reduced heartbeat interval and session timeout since some of the tests need to wait for the old consumer to leave which (sigh) Kafka Streams doesn't do, so we have to wait that it gets kicked out by session timeout. So previously we waited for 45 seconds, now, we at least wait only 1 second. Reviewers: Bill Bejeck <bbejeck@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
1 parent 04518f4 commit 0412be9

File tree

1 file changed

+3
-0
lines changed

1 file changed

+3
-0
lines changed

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,8 @@ private Properties props(final Properties extraProperties) {
175175
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class);
176176
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class);
177177
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
178+
streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500);
179+
streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
178180
streamsConfiguration.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
179181
streamsConfiguration.putAll(extraProperties);
180182

@@ -191,6 +193,7 @@ public void shutdown() throws Exception {
191193

192194
IntegrationTestUtils.purgeLocalStreamsState(streamsConfigurations);
193195
streamsConfigurations.clear();
196+
CLUSTER.deleteAllTopics();
194197
}
195198

196199
@ParameterizedTest

0 commit comments

Comments
 (0)