From ae1238aaf82f1deead11c7cb3bea652c2047c0ed Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Sat, 29 Nov 2025 09:49:29 -0800 Subject: [PATCH 1/5] set flag to true, remove tests, minor cleanup --- .../processor/internals/StreamThreadTest.java | 773 +++++++----------- 1 file changed, 289 insertions(+), 484 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 63783b59aca71..36eb2a6a8deac 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -107,11 +107,10 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.InOrder; import org.mockito.Mock; @@ -142,7 +141,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import java.util.stream.Collectors; -import java.util.stream.Stream; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -173,8 +171,6 @@ import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assumptions.assumeFalse; -import static org.junit.jupiter.api.Assumptions.assumeTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; @@ -227,14 +223,6 @@ public class StreamThreadTest { } }; - static Stream data() { - return Stream.of( - Arguments.of(false, false), - Arguments.of(true, false), - Arguments.of(true, true) - ); - } - @BeforeEach public void setUp() { Thread.currentThread().setName(CLIENT_ID + "-StreamThread-" + threadIdx); @@ -269,7 +257,7 @@ public void tearDown() { private final TaskId task2 = new TaskId(0, 2); private final TaskId task3 = new TaskId(1, 1); - private Properties configProps(final boolean enableEoS, final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + private Properties configProps(final boolean enableEoS, final boolean processingThreadsEnabled) { return mkProperties(mkMap( mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID), mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"), @@ -279,7 +267,7 @@ private Properties configProps(final boolean enableEoS, final boolean stateUpdat mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, enableEoS ? StreamsConfig.EXACTLY_ONCE_V2 : StreamsConfig.AT_LEAST_ONCE), mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName()), mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName()), - mkEntry(InternalConfig.STATE_UPDATER_ENABLED, Boolean.toString(stateUpdaterEnabled)), + mkEntry(InternalConfig.STATE_UPDATER_ENABLED, Boolean.toString(true)), mkEntry(InternalConfig.PROCESSING_THREADS_ENABLED, Boolean.toString(processingThreadsEnabled)), mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "1") )); @@ -297,13 +285,13 @@ private Cluster createCluster() { ); } - private StreamThread createStreamThread(@SuppressWarnings("SameParameterValue") final String clientId, final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - return createStreamThread(clientId, mockTime, stateUpdaterEnabled, processingThreadsEnabled); + private StreamThread createStreamThread(@SuppressWarnings("SameParameterValue") final String clientId, final boolean processingThreadsEnabled) { + return createStreamThread(clientId, mockTime, processingThreadsEnabled); } private StreamThread createStreamThread(@SuppressWarnings("SameParameterValue") final String clientId, - final Time time, final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + final Time time, final boolean processingThreadsEnabled) { + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); return createStreamThread(clientId, config, time); } @@ -377,9 +365,9 @@ public void onChange(final Thread thread, } @ParameterizedTest - @MethodSource("data") - public void shouldChangeStateInRebalanceListener(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - thread = createStreamThread(CLIENT_ID, stateUpdaterEnabled, processingThreadsEnabled); + @ValueSource(booleans = {true, false}) + public void shouldChangeStateInRebalanceListener(final boolean processingThreadsEnabled) { + thread = createStreamThread(CLIENT_ID, processingThreadsEnabled); final StateListenerStub stateListener = new StateListenerStub(); thread.setStateListener(stateListener); @@ -414,9 +402,9 @@ public void shouldChangeStateInRebalanceListener(final boolean stateUpdaterEnabl } @ParameterizedTest - @MethodSource("data") - public void shouldChangeStateAtStartClose(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception { - thread = createStreamThread(CLIENT_ID, new MockTime(1), stateUpdaterEnabled, processingThreadsEnabled); + @ValueSource(booleans = {true, false}) + public void shouldChangeStateAtStartClose(final boolean processingThreadsEnabled) throws Exception { + thread = createStreamThread(CLIENT_ID, new MockTime(1), processingThreadsEnabled); final StateListenerStub stateListener = new StateListenerStub(); thread.setStateListener(stateListener); @@ -438,9 +426,9 @@ public void shouldChangeStateAtStartClose(final boolean stateUpdaterEnabled, fin } @ParameterizedTest - @MethodSource("data") - public void shouldCreateMetricsAtStartup(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - thread = createStreamThread(CLIENT_ID, new MockTime(1), stateUpdaterEnabled, processingThreadsEnabled); + @ValueSource(booleans = {true, false}) + public void shouldCreateMetricsAtStartup(final boolean processingThreadsEnabled) { + thread = createStreamThread(CLIENT_ID, new MockTime(1), processingThreadsEnabled); final String defaultGroupName = "stream-thread-metrics"; final Map defaultTags = Collections.singletonMap( "thread-id", @@ -538,10 +526,10 @@ public void shouldCreateMetricsAtStartup(final boolean stateUpdaterEnabled, fina } @ParameterizedTest - @MethodSource("data") - public void shouldNotCommitBeforeTheCommitInterval(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldNotCommitBeforeTheCommitInterval(final boolean processingThreadsEnabled) { final long commitInterval = 1000L; - final Properties props = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); + final Properties props = configProps(false, processingThreadsEnabled); props.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir); props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(commitInterval)); @@ -565,11 +553,11 @@ public void shouldNotCommitBeforeTheCommitInterval(final boolean stateUpdaterEna } @ParameterizedTest - @MethodSource("data") - public void shouldNotPurgeBeforeThePurgeInterval(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldNotPurgeBeforeThePurgeInterval(final boolean processingThreadsEnabled) { final long commitInterval = 1000L; final long purgeInterval = 2000L; - final Properties props = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); + final Properties props = configProps(false, processingThreadsEnabled); props.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir); props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(commitInterval)); props.setProperty(StreamsConfig.REPARTITION_PURGE_INTERVAL_MS_CONFIG, Long.toString(purgeInterval)); @@ -593,10 +581,10 @@ public void shouldNotPurgeBeforeThePurgeInterval(final boolean stateUpdaterEnabl } @ParameterizedTest - @MethodSource("data") - public void shouldAlsoPurgeWhenNothingGetsCommitted(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldAlsoPurgeWhenNothingGetsCommitted(final boolean processingThreadsEnabled) { final long purgeInterval = 1000L; - final Properties props = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); + final Properties props = configProps(false, processingThreadsEnabled); props.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir); props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(purgeInterval)); props.setProperty(StreamsConfig.REPARTITION_PURGE_INTERVAL_MS_CONFIG, Long.toString(purgeInterval)); @@ -625,11 +613,11 @@ public void shouldAlsoPurgeWhenNothingGetsCommitted(final boolean stateUpdaterEn } @ParameterizedTest - @MethodSource("data") - public void shouldAlsoPurgeBeforeTheCommitInterval(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldAlsoPurgeBeforeTheCommitInterval(final boolean processingThreadsEnabled) { final long purgeInterval = 1000L; final long commitInterval = Long.MAX_VALUE; - final Properties props = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); + final Properties props = configProps(false, processingThreadsEnabled); props.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir); props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(commitInterval)); props.setProperty(StreamsConfig.REPARTITION_PURGE_INTERVAL_MS_CONFIG, Long.toString(purgeInterval)); @@ -657,12 +645,9 @@ public void shouldAlsoPurgeBeforeTheCommitInterval(final boolean stateUpdaterEna verify(taskManager, times(2)).maybePurgeCommittedRecords(); } - @ParameterizedTest - @MethodSource("data") - public void shouldNotProcessWhenPartitionRevoked(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - assumeFalse(processingThreadsEnabled); - - final Properties props = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); + @Test + public void shouldNotProcessWhenPartitionRevoked() { + final Properties props = configProps(false, false); final StreamsConfig config = new StreamsConfig(props); when(mainConsumer.poll(Mockito.any())).thenReturn(ConsumerRecords.empty()); @@ -681,11 +666,9 @@ public void shouldNotProcessWhenPartitionRevoked(final boolean stateUpdaterEnabl Mockito.verify(taskManager, never()).process(Mockito.anyInt(), Mockito.any()); } - @ParameterizedTest - @MethodSource("data") - public void shouldProcessWhenRunning(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - assumeFalse(processingThreadsEnabled); - final Properties props = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); + @Test + public void shouldProcessWhenRunning() { + final Properties props = configProps(false, false); final StreamsConfig config = new StreamsConfig(props); when(mainConsumer.poll(Mockito.any())).thenReturn(ConsumerRecords.empty()); @@ -701,17 +684,14 @@ public void shouldProcessWhenRunning(final boolean stateUpdaterEnabled, final bo thread.setState(State.STARTING); thread.setState(State.PARTITIONS_ASSIGNED); thread.setState(State.RUNNING); - runOnce(processingThreadsEnabled); + runOnce(false); Mockito.verify(taskManager).process(Mockito.anyInt(), Mockito.any()); } - @ParameterizedTest - @MethodSource("data") - public void shouldProcessWhenPartitionAssigned(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - assumeTrue(stateUpdaterEnabled); - assumeFalse(processingThreadsEnabled); - final Properties props = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); + @Test + public void shouldProcessWhenPartitionAssigned() { + final Properties props = configProps(false, false); final StreamsConfig config = new StreamsConfig(props); when(mainConsumer.poll(Mockito.any())).thenReturn(ConsumerRecords.empty()); @@ -726,18 +706,14 @@ public void shouldProcessWhenPartitionAssigned(final boolean stateUpdaterEnabled thread.updateThreadMetadata("admin"); thread.setState(State.STARTING); thread.setState(State.PARTITIONS_ASSIGNED); - runOnce(processingThreadsEnabled); + runOnce(false); Mockito.verify(taskManager).process(Mockito.anyInt(), Mockito.any()); } - @ParameterizedTest - @MethodSource("data") - public void shouldProcessWhenStarting(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - assumeTrue(stateUpdaterEnabled); - assumeFalse(processingThreadsEnabled); - final Properties props = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); - props.setProperty(InternalConfig.STATE_UPDATER_ENABLED, Boolean.toString(true)); + @Test + public void shouldProcessWhenStarting() { + final Properties props = configProps(false, false); final StreamsConfig config = new StreamsConfig(props); when(mainConsumer.poll(Mockito.any())).thenReturn(ConsumerRecords.empty()); @@ -751,16 +727,16 @@ public void shouldProcessWhenStarting(final boolean stateUpdaterEnabled, final b thread = buildStreamThread(mainConsumer, taskManager, config, topologyMetadata); thread.updateThreadMetadata("admin"); thread.setState(State.STARTING); - runOnce(processingThreadsEnabled); + runOnce(false); Mockito.verify(taskManager).process(Mockito.anyInt(), Mockito.any()); } @ParameterizedTest - @MethodSource("data") - public void shouldEnforceRebalanceWhenScheduledAndNotCurrentlyRebalancing(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws InterruptedException { + @ValueSource(booleans = {true, false}) + public void shouldEnforceRebalanceWhenScheduledAndNotCurrentlyRebalancing(final boolean processingThreadsEnabled) throws InterruptedException { final Time mockTime = new MockTime(1); - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl( metrics, APPLICATION_ID, @@ -821,10 +797,10 @@ public void shouldEnforceRebalanceWhenScheduledAndNotCurrentlyRebalancing(final } @ParameterizedTest - @MethodSource("data") - public void shouldNotEnforceRebalanceWhenCurrentlyRebalancing(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws InterruptedException { + @ValueSource(booleans = {true, false}) + public void shouldNotEnforceRebalanceWhenCurrentlyRebalancing(final boolean processingThreadsEnabled) throws InterruptedException { final Time mockTime = new MockTime(1); - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl( metrics, APPLICATION_ID, @@ -915,11 +891,9 @@ AtomicLong nextRebalanceMs() { } } - @ParameterizedTest - @MethodSource("data") - public void shouldRespectNumIterationsInMainLoopWithoutProcessingThreads(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @Test + public void shouldRespectNumIterationsInMainLoopWithoutProcessingThreads() { // With processing threads, there is no guarantee how many iterations will be performed - assumeFalse(processingThreadsEnabled); final List> mockProcessors = new LinkedList<>(); internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); @@ -939,8 +913,8 @@ public void shouldRespectNumIterationsInMainLoopWithoutProcessingThreads(final b ); final Properties properties = new Properties(); - properties.put(InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled); - properties.put(InternalConfig.PROCESSING_THREADS_ENABLED, processingThreadsEnabled); + properties.put(InternalConfig.STATE_UPDATER_ENABLED, true); + properties.put(InternalConfig.PROCESSING_THREADS_ENABLED, false); properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); final StreamsConfig config = new StreamsConfig(StreamsTestUtils.getStreamsConfig(APPLICATION_ID, "localhost:2171", @@ -962,48 +936,48 @@ public void shouldRespectNumIterationsInMainLoopWithoutProcessingThreads(final b mockConsumer.assign(Collections.singleton(t1p1)); mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); - runOnce(processingThreadsEnabled); + runOnce(false); // processed one record, punctuated after the first record, and hence num.iterations is still 1 long offset = -1; addRecord(mockConsumer, ++offset, 0L); - runOnce(processingThreadsEnabled); + runOnce(false); assertThat(thread.currentNumIterations(), equalTo(1)); // processed one more record without punctuation, and bump num.iterations to 2 addRecord(mockConsumer, ++offset, 1L); - runOnce(processingThreadsEnabled); + runOnce(false); assertThat(thread.currentNumIterations(), equalTo(2)); // processed zero records, early exit and iterations stays as 2 - runOnce(processingThreadsEnabled); + runOnce(false); assertThat(thread.currentNumIterations(), equalTo(2)); // system time based punctutation without processing any record, iteration stays as 2 mockTime.sleep(11L); - runOnce(processingThreadsEnabled); + runOnce(false); assertThat(thread.currentNumIterations(), equalTo(2)); // system time based punctutation after processing a record, half iteration to 1 mockTime.sleep(11L); addRecord(mockConsumer, ++offset, 5L); - runOnce(processingThreadsEnabled); + runOnce(false); assertThat(thread.currentNumIterations(), equalTo(1)); // processed two records, bumping up iterations to 3 (1 + 2) addRecord(mockConsumer, ++offset, 5L); addRecord(mockConsumer, ++offset, 6L); - runOnce(processingThreadsEnabled); + runOnce(false); assertThat(thread.currentNumIterations(), equalTo(3)); // stream time based punctutation halves to 1 addRecord(mockConsumer, ++offset, 11L); - runOnce(processingThreadsEnabled); + runOnce(false); assertThat(thread.currentNumIterations(), equalTo(1)); @@ -1011,13 +985,13 @@ public void shouldRespectNumIterationsInMainLoopWithoutProcessingThreads(final b addRecord(mockConsumer, ++offset, 12L); addRecord(mockConsumer, ++offset, 13L); addRecord(mockConsumer, ++offset, 14L); - runOnce(processingThreadsEnabled); + runOnce(false); assertThat(thread.currentNumIterations(), equalTo(3)); mockProcessors.forEach(MockApiProcessor::requestCommit); addRecord(mockConsumer, ++offset, 15L); - runOnce(processingThreadsEnabled); + runOnce(false); // user requested commit should half iteration to 1 assertThat(thread.currentNumIterations(), equalTo(1)); @@ -1026,29 +1000,29 @@ public void shouldRespectNumIterationsInMainLoopWithoutProcessingThreads(final b addRecord(mockConsumer, ++offset, 15L); addRecord(mockConsumer, ++offset, 16L); addRecord(mockConsumer, ++offset, 17L); - runOnce(processingThreadsEnabled); + runOnce(false); assertThat(thread.currentNumIterations(), equalTo(3)); // time based commit without processing, should keep the iteration as 3 mockTime.sleep(90L); - runOnce(processingThreadsEnabled); + runOnce(false); assertThat(thread.currentNumIterations(), equalTo(3)); // time based commit without processing, should half the iteration to 1 mockTime.sleep(90L); addRecord(mockConsumer, ++offset, 18L); - runOnce(processingThreadsEnabled); + runOnce(false); assertThat(thread.currentNumIterations(), equalTo(1)); } @ParameterizedTest - @MethodSource("data") - public void shouldNotCauseExceptionIfNothingCommitted(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldNotCauseExceptionIfNothingCommitted(final boolean processingThreadsEnabled) { final long commitInterval = 1000L; - final Properties props = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); + final Properties props = configProps(false, processingThreadsEnabled); props.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir); props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(commitInterval)); @@ -1074,12 +1048,12 @@ public void shouldNotCauseExceptionIfNothingCommitted(final boolean stateUpdater } @ParameterizedTest - @MethodSource("data") - public void shouldCommitAfterCommitInterval(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldCommitAfterCommitInterval(final boolean processingThreadsEnabled) { final long commitInterval = 100L; final long commitLatency = 10L; - final Properties props = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); + final Properties props = configProps(false, processingThreadsEnabled); props.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir); props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(commitInterval)); @@ -1135,12 +1109,12 @@ int commit(final Collection tasksToCommit) { } @ParameterizedTest - @MethodSource("data") - public void shouldPurgeAfterPurgeInterval(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldPurgeAfterPurgeInterval(final boolean processingThreadsEnabled) { final long commitInterval = 100L; final long purgeInterval = 200L; - final Properties props = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); + final Properties props = configProps(false, processingThreadsEnabled); props.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir); props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(commitInterval)); props.setProperty(StreamsConfig.REPARTITION_PURGE_INTERVAL_MS_CONFIG, Long.toString(purgeInterval)); @@ -1168,8 +1142,8 @@ public void shouldPurgeAfterPurgeInterval(final boolean stateUpdaterEnabled, fin } @ParameterizedTest - @MethodSource("data") - public void shouldRecordCommitLatency(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldRecordCommitLatency(final boolean processingThreadsEnabled) { final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata); when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty()); @@ -1191,7 +1165,7 @@ public void shouldRecordCommitLatency(final boolean stateUpdaterEnabled, final b schedulingTaskManager = null; } - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); @@ -1277,12 +1251,12 @@ int commit(final Collection tasksToCommit) { } @ParameterizedTest - @MethodSource("data") - public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEosDisabled(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEosDisabled(final boolean processingThreadsEnabled) { internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); internalStreamsBuilder.buildAndOptimizeTopology(); - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); thread = createStreamThread(CLIENT_ID, config); thread.setState(StreamThread.State.STARTING); @@ -1317,11 +1291,11 @@ public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEo } @ParameterizedTest - @MethodSource("data") - public void shouldInjectProducerPerThreadUsingClientSupplierOnCreateIfEosV2Enabled(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldInjectProducerPerThreadUsingClientSupplierOnCreateIfEosV2Enabled(final boolean processingThreadsEnabled) { internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); - final Properties props = configProps(true, stateUpdaterEnabled, processingThreadsEnabled); + final Properties props = configProps(true, processingThreadsEnabled); thread = createStreamThread(CLIENT_ID, new StreamsConfig(props)); thread.setState(StreamThread.State.STARTING); @@ -1409,14 +1383,14 @@ public void shouldOnlyCompleteShutdownAfterRebalanceNotInProgress(final boolean } @ParameterizedTest - @MethodSource("data") - public void shouldShutdownTaskManagerOnClose(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldShutdownTaskManagerOnClose(final boolean processingThreadsEnabled) { final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata); when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty()); final TaskManager taskManager = mock(TaskManager.class); - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) @@ -1433,8 +1407,8 @@ public void shouldShutdownTaskManagerOnClose(final boolean stateUpdaterEnabled, } @ParameterizedTest - @MethodSource("data") - public void shouldNotReturnDataAfterTaskMigrated(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldNotReturnDataAfterTaskMigrated(final boolean processingThreadsEnabled) { final TaskManager taskManager = mock(TaskManager.class); final InternalTopologyBuilder internalTopologyBuilder = mock(InternalTopologyBuilder.class); when(internalTopologyBuilder.fullSourceTopicNames()).thenReturn(Collections.singletonList(topic1)); @@ -1449,30 +1423,19 @@ public void shouldNotReturnDataAfterTaskMigrated(final boolean stateUpdaterEnabl final TaskMigratedException taskMigratedException = new TaskMigratedException( "Changelog restore found task migrated", new RuntimeException("restore task migrated")); - ChangelogReader changelogReader = this.changelogReader; - if (stateUpdaterEnabled) { - when(taskManager.checkStateUpdater(anyLong(), any())).thenAnswer(answer -> { - consumer.addRecord(new ConsumerRecord<>(topic1, 1, 11, new byte[0], new byte[0])); - consumer.addRecord(new ConsumerRecord<>(topic1, 1, 12, new byte[1], new byte[0])); + final ChangelogReader changelogReader = this.changelogReader; + when(taskManager.checkStateUpdater(anyLong(), any())).thenAnswer(answer -> { + consumer.addRecord(new ConsumerRecord<>(topic1, 1, 11, new byte[0], new byte[0])); + consumer.addRecord(new ConsumerRecord<>(topic1, 1, 12, new byte[1], new byte[0])); - throw taskMigratedException; - }); - } else { - changelogReader = new MockChangelogReader() { - @Override - public long restore(final Map tasks) { - consumer.addRecord(new ConsumerRecord<>(topic1, 1, 11, new byte[0], new byte[0])); - consumer.addRecord(new ConsumerRecord<>(topic1, 1, 12, new byte[1], new byte[0])); - - throw taskMigratedException; - } - }; - } + throw taskMigratedException; + }); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); - final Properties props = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); + final Properties props = configProps(false, processingThreadsEnabled); final StreamsConfig config = new StreamsConfig(props); thread = new StreamThread( new MockTime(1), @@ -1510,14 +1473,14 @@ public long restore(final Map tasks) { } @ParameterizedTest - @MethodSource("data") - public void shouldShutdownTaskManagerOnCloseWithoutStart(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldShutdownTaskManagerOnCloseWithoutStart(final boolean processingThreadsEnabled) { final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata); when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty()); final TaskManager taskManager = mock(TaskManager.class); - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) @@ -1528,14 +1491,14 @@ public void shouldShutdownTaskManagerOnCloseWithoutStart(final boolean stateUpda } @ParameterizedTest - @MethodSource("data") - public void shouldOnlyShutdownOnce(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldOnlyShutdownOnce(final boolean processingThreadsEnabled) { final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata); when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty()); final TaskManager taskManager = mock(TaskManager.class); - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) @@ -1548,12 +1511,12 @@ public void shouldOnlyShutdownOnce(final boolean stateUpdaterEnabled, final bool } @ParameterizedTest - @MethodSource("data") - public void shouldNotThrowWhenStandbyTasksAssignedAndNoStateStoresForTopology(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldNotThrowWhenStandbyTasksAssignedAndNoStateStoresForTopology(final boolean processingThreadsEnabled) { internalTopologyBuilder.addSource(null, "name", null, null, null, "topic"); internalTopologyBuilder.addSink("out", "output", null, null, null, "name"); - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); thread = createStreamThread(CLIENT_ID, config); thread.setState(StreamThread.State.STARTING); @@ -1570,12 +1533,12 @@ public void shouldNotThrowWhenStandbyTasksAssignedAndNoStateStoresForTopology(fi } @ParameterizedTest - @MethodSource("data") - public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerWasFencedWhileProcessing(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception { + @ValueSource(booleans = {true, false}) + public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerWasFencedWhileProcessing(final boolean processingThreadsEnabled) throws Exception { internalTopologyBuilder.addSource(null, "source", null, null, null, topic1); internalTopologyBuilder.addSink("sink", "dummyTopic", null, null, null, "source"); - final StreamsConfig config = new StreamsConfig(configProps(true, stateUpdaterEnabled, processingThreadsEnabled)); + final StreamsConfig config = new StreamsConfig(configProps(true, processingThreadsEnabled)); thread = createStreamThread(CLIENT_ID, config); final MockConsumer consumer = clientSupplier.consumer; @@ -1641,8 +1604,8 @@ public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerWasFencedWhilePr assertThat(producer.commitCount(), equalTo(1L)); } - private void testThrowingDurringCommitTransactionException(final RuntimeException e, final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws InterruptedException { - final StreamsConfig config = new StreamsConfig(configProps(true, stateUpdaterEnabled, processingThreadsEnabled)); + private void testThrowingDuringCommitTransactionException(final RuntimeException e, final boolean processingThreadsEnabled) throws InterruptedException { + final StreamsConfig config = new StreamsConfig(configProps(true, processingThreadsEnabled)); thread = createStreamThread(CLIENT_ID, config); internalTopologyBuilder.addSource(null, "name", null, null, null, topic1); @@ -1686,20 +1649,20 @@ private void testThrowingDurringCommitTransactionException(final RuntimeExceptio } @ParameterizedTest - @MethodSource("data") - public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommitTransactionWhenSuspendingTasks(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception { - testThrowingDurringCommitTransactionException(new ProducerFencedException("Producer is fenced"), stateUpdaterEnabled, processingThreadsEnabled); + @ValueSource(booleans = {true, false}) + public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommitTransactionWhenSuspendingTasks(final boolean processingThreadsEnabled) throws Exception { + testThrowingDuringCommitTransactionException(new ProducerFencedException("Producer is fenced"), processingThreadsEnabled); } @ParameterizedTest - @MethodSource("data") - public void shouldNotCloseTaskAndRemoveFromTaskManagerIfInvalidPidMappingOccurredInCommitTransactionWhenSuspendingTasks(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception { - testThrowingDurringCommitTransactionException(new InvalidPidMappingException("PidMapping is invalid"), stateUpdaterEnabled, processingThreadsEnabled); + @ValueSource(booleans = {true, false}) + public void shouldNotCloseTaskAndRemoveFromTaskManagerIfInvalidPidMappingOccurredInCommitTransactionWhenSuspendingTasks(final boolean processingThreadsEnabled) throws Exception { + testThrowingDuringCommitTransactionException(new InvalidPidMappingException("PidMapping is invalid"), processingThreadsEnabled); } @ParameterizedTest - @MethodSource("data") - public void shouldReinitializeRevivedTasksInAnyState(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception { - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + @ValueSource(booleans = {true, false}) + public void shouldReinitializeRevivedTasksInAnyState(final boolean processingThreadsEnabled) throws Exception { + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); thread = createStreamThread(CLIENT_ID, config, new MockTime(1)); final String storeName = "store"; @@ -1768,15 +1731,14 @@ public void shouldReinitializeRevivedTasksInAnyState(final boolean stateUpdaterE runOnce(processingThreadsEnabled); // the third actually polls, processes the record, and throws the corruption exception - if (stateUpdaterEnabled) { - TestUtils.waitForCondition( - () -> thread.taskManager().checkStateUpdater( - mockTime.milliseconds(), - topicPartitions -> mockConsumer.seekToBeginning(singleton(t1p1)) - ), - 10 * 1000, - "State updater never returned tasks."); - } + TestUtils.waitForCondition( + () -> thread.taskManager().checkStateUpdater( + mockTime.milliseconds(), + topicPartitions -> mockConsumer.seekToBeginning(singleton(t1p1)) + ), + 10 * 1000, + "State updater never returned tasks."); + addRecord(mockConsumer, 0L); shouldThrow.set(true); final TaskCorruptedException taskCorruptedException; @@ -1788,15 +1750,14 @@ public void shouldReinitializeRevivedTasksInAnyState(final boolean stateUpdaterE // Now, we can handle the corruption thread.taskManager().handleCorruption(taskCorruptedException.corruptedTasks()); - if (stateUpdaterEnabled) { - TestUtils.waitForCondition( - () -> thread.taskManager().checkStateUpdater( - mockTime.milliseconds(), - topicPartitions -> mockConsumer.seekToBeginning(singleton(t1p1)) - ), - 10 * 1000, - "State updater never returned tasks."); - } + TestUtils.waitForCondition( + () -> thread.taskManager().checkStateUpdater( + mockTime.milliseconds(), + topicPartitions -> mockConsumer.seekToBeginning(singleton(t1p1)) + ), + 10 * 1000, + "State updater never returned tasks."); + // again, complete the restoration runOnce(processingThreadsEnabled); @@ -1815,11 +1776,11 @@ public void shouldReinitializeRevivedTasksInAnyState(final boolean stateUpdaterE } } - private void testNotCloseTaskAndRemoveFromTaskManagerInCommitTransactionWhenCommitting(final RuntimeException e, final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + private void testNotCloseTaskAndRemoveFromTaskManagerInCommitTransactionWhenCommitting(final RuntimeException e, final boolean processingThreadsEnabled) { // only have source but no sink so that we would not get fenced in producer.send internalTopologyBuilder.addSource(null, "source", null, null, null, topic1); - final StreamsConfig config = new StreamsConfig(configProps(true, stateUpdaterEnabled, processingThreadsEnabled)); + final StreamsConfig config = new StreamsConfig(configProps(true, processingThreadsEnabled)); thread = createStreamThread(CLIENT_ID, config); final MockConsumer consumer = clientSupplier.consumer; @@ -1871,21 +1832,21 @@ private void testNotCloseTaskAndRemoveFromTaskManagerInCommitTransactionWhenComm } @ParameterizedTest - @MethodSource("data") - public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommitTransactionWhenCommitting(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - testNotCloseTaskAndRemoveFromTaskManagerInCommitTransactionWhenCommitting(new ProducerFencedException("Producer is fenced"), stateUpdaterEnabled, processingThreadsEnabled); + @ValueSource(booleans = {true, false}) + public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommitTransactionWhenCommitting(final boolean processingThreadsEnabled) { + testNotCloseTaskAndRemoveFromTaskManagerInCommitTransactionWhenCommitting(new ProducerFencedException("Producer is fenced"), processingThreadsEnabled); } @ParameterizedTest - @MethodSource("data") - public void shouldNotCloseTaskAndRemoveFromTaskManagerIfPidMappingIsInvalidInCommitTransactionWhenCommitting(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - testNotCloseTaskAndRemoveFromTaskManagerInCommitTransactionWhenCommitting(new InvalidPidMappingException("PID Mapping is invalid"), stateUpdaterEnabled, processingThreadsEnabled); + @ValueSource(booleans = {true, false}) + public void shouldNotCloseTaskAndRemoveFromTaskManagerIfPidMappingIsInvalidInCommitTransactionWhenCommitting(final boolean processingThreadsEnabled) { + testNotCloseTaskAndRemoveFromTaskManagerInCommitTransactionWhenCommitting(new InvalidPidMappingException("PID Mapping is invalid"), processingThreadsEnabled); } @ParameterizedTest - @MethodSource("data") - public void shouldNotCloseTaskProducerWhenSuspending(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception { - final StreamsConfig config = new StreamsConfig(configProps(true, stateUpdaterEnabled, processingThreadsEnabled)); + @ValueSource(booleans = {true, false}) + public void shouldNotCloseTaskProducerWhenSuspending(final boolean processingThreadsEnabled) throws Exception { + final StreamsConfig config = new StreamsConfig(configProps(true, processingThreadsEnabled)); thread = createStreamThread(CLIENT_ID, config); internalTopologyBuilder.addSource(null, "name", null, null, null, topic1); @@ -1931,9 +1892,9 @@ public void shouldNotCloseTaskProducerWhenSuspending(final boolean stateUpdaterE } @ParameterizedTest - @MethodSource("data") - public void shouldReturnActiveTaskMetadataWhileRunningState(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + @ValueSource(booleans = {true, false}) + public void shouldReturnActiveTaskMetadataWhileRunningState(final boolean processingThreadsEnabled) { + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); internalTopologyBuilder.addSource(null, "source", null, null, null, topic1); clientSupplier.setCluster(createCluster()); @@ -2008,9 +1969,9 @@ public void shouldReturnActiveTaskMetadataWhileRunningState(final boolean stateU } @ParameterizedTest - @MethodSource("data") - public void shouldReturnStandbyTaskMetadataWhileRunningState(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + @ValueSource(booleans = {true, false}) + public void shouldReturnStandbyTaskMetadataWhileRunningState(final boolean processingThreadsEnabled) { + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); internalStreamsBuilder.stream(Collections.singleton(topic1), consumed) .groupByKey().count(Materialized.as("count-one")); @@ -2054,45 +2015,6 @@ public void shouldReturnStandbyTaskMetadataWhileRunningState(final boolean state assertTrue(threadMetadata.activeTasks().isEmpty()); } - @SuppressWarnings("unchecked") - @ParameterizedTest - @MethodSource("data") - public void shouldUpdateStandbyTask(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception { - // Updating standby tasks on the stream thread only happens when the state updater is disabled - assumeFalse(stateUpdaterEnabled); - - final String storeName1 = "count-one"; - final String storeName2 = "table-two"; - final String changelogName1 = APPLICATION_ID + "-" + storeName1 + "-changelog"; - final String changelogName2 = APPLICATION_ID + "-" + storeName2 + "-changelog"; - final Properties props = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); - final StreamsConfig config = new StreamsConfig(props); - thread = createStreamThread(CLIENT_ID, config); - final MockConsumer restoreConsumer = clientSupplier.restoreConsumer; - - setupThread(storeName1, storeName2, changelogName1, changelogName2, restoreConsumer, false); - - runOnce(processingThreadsEnabled); - - final StandbyTask standbyTask1 = standbyTask(thread.taskManager(), t1p1); - final StandbyTask standbyTask2 = standbyTask(thread.taskManager(), t2p1); - assertEquals(task1, standbyTask1.id()); - assertEquals(task3, standbyTask2.id()); - - final KeyValueStore store1 = (KeyValueStore) standbyTask1.store(storeName1); - final KeyValueStore store2 = (KeyValueStore) standbyTask2.store(storeName2); - - assertEquals(0L, store1.approximateNumEntries()); - assertEquals(0L, store2.approximateNumEntries()); - - addStandbyRecordsToRestoreConsumer(restoreConsumer); - - runOnce(processingThreadsEnabled); - - assertEquals(10L, store1.approximateNumEntries()); - assertEquals(4L, store2.approximateNumEntries()); - } - private void addActiveRecordsToRestoreConsumer(final MockConsumer restoreConsumer) { for (long i = 0L; i < 10L; i++) { restoreConsumer.addRecord(new ConsumerRecord<>( @@ -2178,71 +2100,10 @@ private void setupThread(final String storeName1, thread.rebalanceListener().onPartitionsAssigned(Collections.emptyList()); } - @SuppressWarnings("unchecked") - @ParameterizedTest - @MethodSource("data") - public void shouldNotUpdateStandbyTaskWhenPaused(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception { - // Updating standby tasks on the stream thread only happens when the state updater is disabled - assumeFalse(stateUpdaterEnabled); - - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); - final String storeName1 = "count-one"; - final String storeName2 = "table-two"; - final String changelogName1 = APPLICATION_ID + "-" + storeName1 + "-changelog"; - final String changelogName2 = APPLICATION_ID + "-" + storeName2 + "-changelog"; - thread = createStreamThread(CLIENT_ID, config); - final MockConsumer restoreConsumer = clientSupplier.restoreConsumer; - - setupThread(storeName1, storeName2, changelogName1, changelogName2, restoreConsumer, true); - - runOnce(processingThreadsEnabled); - - final StreamTask activeTask1 = activeTask(thread.taskManager(), t1p2); - final StandbyTask standbyTask1 = standbyTask(thread.taskManager(), t1p1); - final StandbyTask standbyTask2 = standbyTask(thread.taskManager(), t2p1); - assertEquals(task1, standbyTask1.id()); - assertEquals(task3, standbyTask2.id()); - - final KeyValueStore activeStore = (KeyValueStore) activeTask1.store(storeName1); - - final KeyValueStore store1 = (KeyValueStore) standbyTask1.store(storeName1); - final KeyValueStore store2 = (KeyValueStore) standbyTask2.store(storeName2); - - assertEquals(0L, activeStore.approximateNumEntries()); - assertEquals(0L, store1.approximateNumEntries()); - assertEquals(0L, store2.approximateNumEntries()); - - // Add some records that the active task would handle - addActiveRecordsToRestoreConsumer(restoreConsumer); - // let the store1 be restored from 0 to 10; store2 be restored from 5 (checkpointed) to 10 - addStandbyRecordsToRestoreConsumer(restoreConsumer); - - // Simulate pause - thread.taskManager().topologyMetadata().pauseTopology(TopologyMetadata.UNNAMED_TOPOLOGY); - runOnce(processingThreadsEnabled); - - assertEquals(0L, activeStore.approximateNumEntries()); - assertEquals(0L, store1.approximateNumEntries()); - assertEquals(0L, store2.approximateNumEntries()); - - // Simulate resume - thread.taskManager().topologyMetadata().resumeTopology(TopologyMetadata.UNNAMED_TOPOLOGY); - runOnce(processingThreadsEnabled); - - assertEquals(10L, activeStore.approximateNumEntries()); - assertEquals(0L, store1.approximateNumEntries()); - assertEquals(0L, store2.approximateNumEntries()); - - runOnce(processingThreadsEnabled); - assertEquals(10L, activeStore.approximateNumEntries()); - assertEquals(10L, store1.approximateNumEntries()); - assertEquals(4L, store2.approximateNumEntries()); - } - @ParameterizedTest - @MethodSource("data") - public void shouldCreateStandbyTask(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + @ValueSource(booleans = {true, false}) + public void shouldCreateStandbyTask(final boolean processingThreadsEnabled) { + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); setupInternalTopologyWithoutState(config); internalTopologyBuilder.addStateStore(new MockKeyValueStoreBuilder("myStore", true), "processor1"); @@ -2250,18 +2111,18 @@ public void shouldCreateStandbyTask(final boolean stateUpdaterEnabled, final boo } @ParameterizedTest - @MethodSource("data") - public void shouldNotCreateStandbyTaskWithoutStateStores(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + @ValueSource(booleans = {true, false}) + public void shouldNotCreateStandbyTaskWithoutStateStores(final boolean processingThreadsEnabled) { + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); setupInternalTopologyWithoutState(config); assertThat(createStandbyTask(config), empty()); } @ParameterizedTest - @MethodSource("data") - public void shouldNotCreateStandbyTaskIfStateStoresHaveLoggingDisabled(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + @ValueSource(booleans = {true, false}) + public void shouldNotCreateStandbyTaskIfStateStoresHaveLoggingDisabled(final boolean processingThreadsEnabled) { + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); setupInternalTopologyWithoutState(config); final StoreBuilder> storeBuilder = new MockKeyValueStoreBuilder("myStore", true); @@ -2271,12 +2132,8 @@ public void shouldNotCreateStandbyTaskIfStateStoresHaveLoggingDisabled(final boo assertThat(createStandbyTask(config), empty()); } - @ParameterizedTest - @MethodSource("data") - @SuppressWarnings("deprecation") - public void shouldPunctuateActiveTask(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - assumeFalse(processingThreadsEnabled); - + @Test + public void shouldPunctuateActiveTask() { final List punctuatedStreamTime = new ArrayList<>(); final List punctuatedWallClockTime = new ArrayList<>(); final ProcessorSupplier punctuateProcessor = @@ -2294,7 +2151,7 @@ public void process(final Record record) {} internalStreamsBuilder.stream(Collections.singleton(topic1), consumed).process(punctuateProcessor); internalStreamsBuilder.buildAndOptimizeTopology(); - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + final StreamsConfig config = new StreamsConfig(configProps(false, false)); thread = createStreamThread(CLIENT_ID, config); thread.setState(StreamThread.State.STARTING); @@ -2314,7 +2171,7 @@ public void process(final Record record) {} clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); - runOnce(processingThreadsEnabled); + runOnce(false); assertEquals(0, punctuatedStreamTime.size()); assertEquals(0, punctuatedWallClockTime.size()); @@ -2333,24 +2190,22 @@ public void process(final Record record) {} new RecordHeaders(), Optional.empty())); - runOnce(processingThreadsEnabled); + runOnce(false); assertEquals(1, punctuatedStreamTime.size()); assertEquals(1, punctuatedWallClockTime.size()); mockTime.sleep(100L); - runOnce(processingThreadsEnabled); + runOnce(false); // we should skip stream time punctuation, only trigger wall-clock time punctuation assertEquals(1, punctuatedStreamTime.size()); assertEquals(2, punctuatedWallClockTime.size()); } - @ParameterizedTest - @MethodSource("data") - public void shouldPunctuateWithTimestampPreservedInProcessorContext(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - assumeFalse(processingThreadsEnabled); + @Test + public void shouldPunctuateWithTimestampPreservedInProcessorContext() { final ProcessorSupplier punctuateProcessor = () -> new Processor<>() { @Override @@ -2375,7 +2230,7 @@ public void process(final Record record) {} internalStreamsBuilder.buildAndOptimizeTopology(); final long currTime = mockTime.milliseconds(); - thread = createStreamThread(CLIENT_ID, stateUpdaterEnabled, processingThreadsEnabled); + thread = createStreamThread(CLIENT_ID, false); thread.setState(StreamThread.State.STARTING); thread.taskManager().init(); @@ -2394,11 +2249,11 @@ public void process(final Record record) {} clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); - runOnce(processingThreadsEnabled); + runOnce(false); assertEquals(0, peekedContextTime.size()); mockTime.sleep(100L); - runOnce(processingThreadsEnabled); + runOnce(false); assertEquals(1, peekedContextTime.size()); assertEquals(currTime + 100L, peekedContextTime.get(0).longValue()); @@ -2416,16 +2271,16 @@ public void process(final Record record) {} new RecordHeaders(), Optional.empty())); - runOnce(processingThreadsEnabled); + runOnce(false); assertEquals(2, peekedContextTime.size()); assertEquals(110L, peekedContextTime.get(1).longValue()); } @ParameterizedTest - @MethodSource("data") - public void shouldAlwaysUpdateTasksMetadataAfterChangingState(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + @ValueSource(booleans = {true, false}) + public void shouldAlwaysUpdateTasksMetadataAfterChangingState(final boolean processingThreadsEnabled) { + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); thread = createStreamThread(CLIENT_ID, config); ThreadMetadata metadata = thread.threadMetadata(); assertEquals(StreamThread.State.CREATED.name(), metadata.threadState()); @@ -2439,14 +2294,14 @@ public void shouldAlwaysUpdateTasksMetadataAfterChangingState(final boolean stat } @ParameterizedTest - @MethodSource("data") - public void shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception { + @ValueSource(booleans = {true, false}) + public void shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore(final boolean processingThreadsEnabled) throws Exception { internalStreamsBuilder.stream(Collections.singleton("topic"), consumed) .groupByKey() .count(Materialized.as("count")); internalStreamsBuilder.buildAndOptimizeTopology(); - thread = createStreamThread("clientId", new MockTime(1), stateUpdaterEnabled, processingThreadsEnabled); + thread = createStreamThread("clientId", new MockTime(1), processingThreadsEnabled); final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer(); final MockConsumer mockRestoreConsumer = (MockConsumer) thread.restoreConsumer(); final MockAdminClient mockAdminClient = (MockAdminClient) thread.adminClient(); @@ -2548,26 +2403,18 @@ public Set partitions() { "K2".getBytes(), "V2".getBytes())); - if (stateUpdaterEnabled) { - TestUtils.waitForCondition( - () -> mockRestoreConsumer.assignment().isEmpty(), - "Never get the assignment"); - } else { - TestUtils.waitForCondition( - () -> { - mockRestoreConsumer.assign(changelogPartitionSet); - return mockRestoreConsumer.position(changelogPartition) == 2L; - }, - "Never finished restore"); - } + TestUtils.waitForCondition( + () -> mockRestoreConsumer.assignment().isEmpty(), + "Never get the assignment"); + } @ParameterizedTest - @MethodSource("data") - public void shouldLogAndRecordSkippedMetricForDeserializationException(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldLogAndRecordSkippedMetricForDeserializationException(final boolean processingThreadsEnabled) { internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); - final Properties properties = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); + final Properties properties = configProps(false, processingThreadsEnabled); properties.setProperty( StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class.getName() @@ -2629,9 +2476,9 @@ public void shouldLogAndRecordSkippedMetricForDeserializationException(final boo } @ParameterizedTest - @MethodSource("data") - public void shouldThrowTaskMigratedExceptionHandlingTaskLost(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + @ValueSource(booleans = {true, false}) + public void shouldThrowTaskMigratedExceptionHandlingTaskLost(final boolean processingThreadsEnabled) { + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); final Set assignedPartitions = Collections.singleton(t1p1); final TaskManager taskManager = mock(TaskManager.class); @@ -2657,9 +2504,9 @@ public void shouldThrowTaskMigratedExceptionHandlingTaskLost(final boolean state } @ParameterizedTest - @MethodSource("data") - public void shouldThrowTaskMigratedExceptionHandlingRevocation(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + @ValueSource(booleans = {true, false}) + public void shouldThrowTaskMigratedExceptionHandlingRevocation(final boolean processingThreadsEnabled) { + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); final Set assignedPartitions = Collections.singleton(t1p1); final TaskManager taskManager = mock(TaskManager.class); @@ -2685,10 +2532,10 @@ public void shouldThrowTaskMigratedExceptionHandlingRevocation(final boolean sta } @ParameterizedTest - @MethodSource("data") + @ValueSource(booleans = {true, false}) @SuppressWarnings("unchecked") - public void shouldCatchHandleCorruptionOnTaskCorruptedExceptionPath(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + public void shouldCatchHandleCorruptionOnTaskCorruptedExceptionPath(final boolean processingThreadsEnabled) { + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); final TaskManager taskManager = mock(TaskManager.class); final Consumer consumer = mock(Consumer.class); final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); @@ -2746,10 +2593,10 @@ void runOnceWithoutProcessingThreads() { } @ParameterizedTest - @MethodSource("data") + @ValueSource(booleans = {true, false}) @SuppressWarnings("unchecked") - public void shouldCatchTimeoutExceptionFromHandleCorruptionAndInvokeExceptionHandler(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + public void shouldCatchTimeoutExceptionFromHandleCorruptionAndInvokeExceptionHandler(final boolean processingThreadsEnabled) { + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); final TaskManager taskManager = mock(TaskManager.class); final Consumer consumer = mock(Consumer.class); final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); @@ -2812,10 +2659,10 @@ void runOnceWithoutProcessingThreads() { } @ParameterizedTest - @MethodSource("data") + @ValueSource(booleans = {true, false}) @SuppressWarnings("unchecked") - public void shouldCatchTaskMigratedExceptionOnOnTaskCorruptedExceptionPath(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + public void shouldCatchTaskMigratedExceptionOnOnTaskCorruptedExceptionPath(final boolean processingThreadsEnabled) { + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); final TaskManager taskManager = mock(TaskManager.class); final Consumer consumer = mock(Consumer.class); final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); @@ -2878,10 +2725,10 @@ void runOnceWithoutProcessingThreads() { } @ParameterizedTest - @MethodSource("data") + @ValueSource(booleans = {true, false}) @SuppressWarnings("unchecked") - public void shouldEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnActiveTask(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - final StreamsConfig config = new StreamsConfig(configProps(true, stateUpdaterEnabled, processingThreadsEnabled)); + public void shouldEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnActiveTask(final boolean processingThreadsEnabled) { + final StreamsConfig config = new StreamsConfig(configProps(true, processingThreadsEnabled)); final TaskManager taskManager = mock(TaskManager.class); final Consumer consumer = mock(Consumer.class); final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); @@ -2943,10 +2790,10 @@ void runOnceWithoutProcessingThreads() { } @ParameterizedTest - @MethodSource("data") + @ValueSource(booleans = {true, false}) @SuppressWarnings("unchecked") - public void shouldNotEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnInactiveTask(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - final StreamsConfig config = new StreamsConfig(configProps(true, stateUpdaterEnabled, processingThreadsEnabled)); + public void shouldNotEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnInactiveTask(final boolean processingThreadsEnabled) { + final StreamsConfig config = new StreamsConfig(configProps(true, processingThreadsEnabled)); final TaskManager taskManager = mock(TaskManager.class); final Consumer consumer = mock(Consumer.class); final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); @@ -3006,9 +2853,9 @@ void runOnceWithoutProcessingThreads() { } @ParameterizedTest - @MethodSource("data") - public void shouldNotCommitNonRunningNonRestoringTasks(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + @ValueSource(booleans = {true, false}) + public void shouldNotCommitNonRunningNonRestoringTasks(final boolean processingThreadsEnabled) { + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); final TaskManager taskManager = mock(TaskManager.class); final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata); @@ -3045,15 +2892,12 @@ public void shouldNotCommitNonRunningNonRestoringTasks(final boolean stateUpdate } @ParameterizedTest - @MethodSource("data") - public void shouldLogAndRecordSkippedRecordsForInvalidTimestamps( - final boolean stateUpdaterEnabled, - final boolean processingThreadsEnabled - ) throws Exception { + @ValueSource(booleans = {true, false}) + public void shouldLogAndRecordSkippedRecordsForInvalidTimestamps(final boolean processingThreadsEnabled) throws Exception { internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); - final Properties properties = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); + final Properties properties = configProps(false, processingThreadsEnabled); properties.setProperty( StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, LogAndSkipOnInvalidTimestamp.class.getName() @@ -3152,9 +2996,9 @@ private void waitForCommit(final MockConsumer mockConsumer, fina } @ParameterizedTest - @MethodSource("data") - public void shouldTransmitTaskManagerMetrics(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + @ValueSource(booleans = {true, false}) + public void shouldTransmitTaskManagerMetrics(final boolean processingThreadsEnabled) { + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata); when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty()); @@ -3179,9 +3023,9 @@ public void shouldTransmitTaskManagerMetrics(final boolean stateUpdaterEnabled, } @ParameterizedTest - @MethodSource("data") - public void shouldConstructAdminMetrics(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + @ValueSource(booleans = {true, false}) + public void shouldConstructAdminMetrics(final boolean processingThreadsEnabled) { + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); final Node broker1 = new Node(0, "dummyHost-1", 1234); final Node broker2 = new Node(1, "dummyHost-2", 1234); final List cluster = Arrays.asList(broker1, broker2); @@ -3236,19 +3080,19 @@ public void shouldConstructAdminMetrics(final boolean stateUpdaterEnabled, final } @ParameterizedTest - @MethodSource("data") - public void shouldNotRecordFailedStreamThread(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - runAndVerifyFailedStreamThreadRecording(false, stateUpdaterEnabled, processingThreadsEnabled); + @ValueSource(booleans = {true, false}) + public void shouldNotRecordFailedStreamThread(final boolean processingThreadsEnabled) { + runAndVerifyFailedStreamThreadRecording(false, processingThreadsEnabled); } @ParameterizedTest - @MethodSource("data") - public void shouldRecordFailedStreamThread(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - runAndVerifyFailedStreamThreadRecording(true, stateUpdaterEnabled, processingThreadsEnabled); + @ValueSource(booleans = {true, false}) + public void shouldRecordFailedStreamThread(final boolean processingThreadsEnabled) { + runAndVerifyFailedStreamThreadRecording(true, processingThreadsEnabled); } - public void runAndVerifyFailedStreamThreadRecording(final boolean shouldFail, final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + public void runAndVerifyFailedStreamThreadRecording(final boolean shouldFail, final boolean processingThreadsEnabled) { + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata); when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty()); @@ -3305,10 +3149,9 @@ void runOnceWithoutProcessingThreads() { } @ParameterizedTest - @MethodSource("data") - public void shouldCheckStateUpdater(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - assumeTrue(stateUpdaterEnabled); - final Properties streamsConfigProps = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); + @ValueSource(booleans = {true, false}) + public void shouldCheckStateUpdater(final boolean processingThreadsEnabled) { + final Properties streamsConfigProps = configProps(false, processingThreadsEnabled); thread = setUpThread(streamsConfigProps); final TaskManager taskManager = thread.taskManager(); thread.setState(State.STARTING); @@ -3322,28 +3165,24 @@ public void shouldCheckStateUpdater(final boolean stateUpdaterEnabled, final boo } } - @ParameterizedTest - @MethodSource("data") - public void shouldCheckStateUpdaterInBetweenProcessCalls(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - assumeTrue(stateUpdaterEnabled); - assumeFalse(processingThreadsEnabled); - - final Properties streamsConfigProps = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); + @Test + public void shouldCheckStateUpdaterInBetweenProcessCalls() { + final Properties streamsConfigProps = configProps(false, false); thread = setUpThread(streamsConfigProps); final TaskManager taskManager = thread.taskManager(); thread.setState(State.STARTING); // non-zero return of process will cause a second call to process when(taskManager.process(Mockito.anyInt(), Mockito.any())).thenReturn(1).thenReturn(0); - runOnce(processingThreadsEnabled); + runOnce(false); Mockito.verify(taskManager, times(2)).checkStateUpdater(anyLong(), Mockito.any()); } @ParameterizedTest - @MethodSource("data") - public void shouldUpdateLagsAfterPolling(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - final Properties streamsConfigProps = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); + @ValueSource(booleans = {true, false}) + public void shouldUpdateLagsAfterPolling(final boolean processingThreadsEnabled) { + final Properties streamsConfigProps = configProps(false, processingThreadsEnabled); thread = setUpThread(streamsConfigProps); thread.setState(State.STARTING); thread.setState(State.PARTITIONS_ASSIGNED); @@ -3359,9 +3198,9 @@ public void shouldUpdateLagsAfterPolling(final boolean stateUpdaterEnabled, fina @ParameterizedTest - @MethodSource("data") - public void shouldResumePollingForPartitionsWithAvailableSpaceBeforePolling(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - final Properties streamsConfigProps = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); + @ValueSource(booleans = {true, false}) + public void shouldResumePollingForPartitionsWithAvailableSpaceBeforePolling(final boolean processingThreadsEnabled) { + final Properties streamsConfigProps = configProps(false, processingThreadsEnabled); thread = setUpThread(streamsConfigProps); thread.setState(State.STARTING); thread.setState(State.PARTITIONS_ASSIGNED); @@ -3374,10 +3213,9 @@ public void shouldResumePollingForPartitionsWithAvailableSpaceBeforePolling(fina } @ParameterizedTest - @MethodSource("data") - public void shouldRespectPollTimeInPartitionsAssignedStateWithStateUpdater(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - assumeTrue(stateUpdaterEnabled); - final Properties streamsConfigProps = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); + @ValueSource(booleans = {true, false}) + public void shouldRespectPollTimeInPartitionsAssignedStateWithStateUpdater(final boolean processingThreadsEnabled) { + final Properties streamsConfigProps = configProps(false, processingThreadsEnabled); final StreamsConfig config = new StreamsConfig(streamsConfigProps); final Duration pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)); thread = setUpThread(streamsConfigProps); @@ -3390,32 +3228,18 @@ public void shouldRespectPollTimeInPartitionsAssignedStateWithStateUpdater(final } @ParameterizedTest - @MethodSource("data") - public void shouldNotBlockWhenPollingInPartitionsAssignedStateWithoutStateUpdater(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - assumeFalse(stateUpdaterEnabled); - final Properties streamsConfigProps = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); - thread = setUpThread(streamsConfigProps); - thread.setState(State.STARTING); - thread.setState(State.PARTITIONS_ASSIGNED); - - runOnce(processingThreadsEnabled); - - Mockito.verify(mainConsumer).poll(Duration.ZERO); - } - - @ParameterizedTest - @MethodSource("data") - public void shouldGetMainAndRestoreConsumerInstanceId(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception { - getClientInstanceId(false, stateUpdaterEnabled, processingThreadsEnabled); + @ValueSource(booleans = {true, false}) + public void shouldGetMainAndRestoreConsumerInstanceId(final boolean processingThreadsEnabled) throws Exception { + getClientInstanceId(false, processingThreadsEnabled); } @ParameterizedTest - @MethodSource("data") - public void shouldGetMainAndRestoreConsumerInstanceIdWithInternalTimeout(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception { - getClientInstanceId(true, stateUpdaterEnabled, processingThreadsEnabled); + @ValueSource(booleans = {true, false}) + public void shouldGetMainAndRestoreConsumerInstanceIdWithInternalTimeout(final boolean processingThreadsEnabled) throws Exception { + getClientInstanceId(true, processingThreadsEnabled); } - private void getClientInstanceId(final boolean injectTimeException, final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception { + private void getClientInstanceId(final boolean injectTimeException, final boolean processingThreadsEnabled) throws Exception { final Uuid consumerInstanceId = Uuid.randomUuid(); clientSupplier.consumer.setClientInstanceId(consumerInstanceId); if (injectTimeException) { @@ -3434,7 +3258,7 @@ private void getClientInstanceId(final boolean injectTimeException, final boolea } clientSupplier.prepareProducer(producer); - thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled); + thread = createStreamThread("clientId", processingThreadsEnabled); thread.setState(State.STARTING); thread.taskManager().init(); @@ -3457,9 +3281,9 @@ private void getClientInstanceId(final boolean injectTimeException, final boolea } @ParameterizedTest - @MethodSource("data") - public void shouldReturnErrorIfMainConsumerInstanceIdNotInitialized(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled); + @ValueSource(booleans = {true, false}) + public void shouldReturnErrorIfMainConsumerInstanceIdNotInitialized(final boolean processingThreadsEnabled) { + thread = createStreamThread("clientId", processingThreadsEnabled); thread.setState(State.STARTING); thread.taskManager().init(); @@ -3474,9 +3298,9 @@ public void shouldReturnErrorIfMainConsumerInstanceIdNotInitialized(final boolea } @ParameterizedTest - @MethodSource("data") - public void shouldReturnErrorIfRestoreConsumerInstanceIdNotInitialized(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled); + @ValueSource(booleans = {true, false}) + public void shouldReturnErrorIfRestoreConsumerInstanceIdNotInitialized(final boolean processingThreadsEnabled) { + thread = createStreamThread("clientId", processingThreadsEnabled); thread.setState(State.STARTING); thread.taskManager().init(); @@ -3491,9 +3315,9 @@ public void shouldReturnErrorIfRestoreConsumerInstanceIdNotInitialized(final boo } @ParameterizedTest - @MethodSource("data") - public void shouldReturnErrorIfProducerInstanceIdNotInitialized(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled); + @ValueSource(booleans = {true, false}) + public void shouldReturnErrorIfProducerInstanceIdNotInitialized(final boolean processingThreadsEnabled) { + thread = createStreamThread("clientId", processingThreadsEnabled); thread.setState(State.STARTING); thread.taskManager().init(); @@ -3508,10 +3332,10 @@ public void shouldReturnErrorIfProducerInstanceIdNotInitialized(final boolean st } @ParameterizedTest - @MethodSource("data") - public void shouldReturnNullIfMainConsumerTelemetryDisabled(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception { + @ValueSource(booleans = {true, false}) + public void shouldReturnNullIfMainConsumerTelemetryDisabled(final boolean processingThreadsEnabled) throws Exception { clientSupplier.consumer.disableTelemetry(); - thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled); + thread = createStreamThread("clientId", processingThreadsEnabled); thread.setState(State.STARTING); thread.taskManager().init(); @@ -3525,11 +3349,11 @@ public void shouldReturnNullIfMainConsumerTelemetryDisabled(final boolean stateU } @ParameterizedTest - @MethodSource("data") - public void shouldReturnNullIfRestoreConsumerTelemetryDisabled(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception { + @ValueSource(booleans = {true, false}) + public void shouldReturnNullIfRestoreConsumerTelemetryDisabled(final boolean processingThreadsEnabled) throws Exception { clientSupplier.restoreConsumer.disableTelemetry(); - thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled); + thread = createStreamThread("clientId", processingThreadsEnabled); thread.setState(State.STARTING); thread.taskManager().init(); @@ -3543,13 +3367,13 @@ public void shouldReturnNullIfRestoreConsumerTelemetryDisabled(final boolean sta } @ParameterizedTest - @MethodSource("data") - public void shouldReturnNullIfProducerTelemetryDisabled(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception { + @ValueSource(booleans = {true, false}) + public void shouldReturnNullIfProducerTelemetryDisabled(final boolean processingThreadsEnabled) throws Exception { final MockProducer producer = new MockProducer<>(); producer.disableTelemetry(); clientSupplier.prepareProducer(producer); - thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled); + thread = createStreamThread("clientId", processingThreadsEnabled); thread.setState(State.STARTING); thread.taskManager().init(); @@ -3563,11 +3387,11 @@ public void shouldReturnNullIfProducerTelemetryDisabled(final boolean stateUpdat } @ParameterizedTest - @MethodSource("data") - public void shouldTimeOutOnMainConsumerInstanceId(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldTimeOutOnMainConsumerInstanceId(final boolean processingThreadsEnabled) { clientSupplier.consumer.setClientInstanceId(Uuid.randomUuid()); clientSupplier.consumer.injectTimeoutException(-1); - thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled); + thread = createStreamThread("clientId", processingThreadsEnabled); thread.setState(State.STARTING); thread.taskManager().init(); @@ -3588,11 +3412,11 @@ public void shouldTimeOutOnMainConsumerInstanceId(final boolean stateUpdaterEnab @ParameterizedTest - @MethodSource("data") - public void shouldTimeOutOnRestoreConsumerInstanceId(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldTimeOutOnRestoreConsumerInstanceId(final boolean processingThreadsEnabled) { clientSupplier.restoreConsumer.setClientInstanceId(Uuid.randomUuid()); clientSupplier.restoreConsumer.injectTimeoutException(-1); - thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled); + thread = createStreamThread("clientId", processingThreadsEnabled); thread.setState(State.STARTING); thread.taskManager().init(); @@ -3613,14 +3437,14 @@ public void shouldTimeOutOnRestoreConsumerInstanceId(final boolean stateUpdaterE } @ParameterizedTest - @MethodSource("data") - public void shouldTimeOutOnProducerInstanceId(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldTimeOutOnProducerInstanceId(final boolean processingThreadsEnabled) { final MockProducer producer = new MockProducer<>(); producer.setClientInstanceId(Uuid.randomUuid()); producer.injectTimeoutException(-1); clientSupplier.prepareProducer(producer); - thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled); + thread = createStreamThread("clientId", processingThreadsEnabled); thread.setState(State.STARTING); thread.taskManager().init(); @@ -3642,7 +3466,7 @@ public void shouldTimeOutOnProducerInstanceId(final boolean stateUpdaterEnabled, @ParameterizedTest @ValueSource(booleans = {true, false}) public void testNamedTopologyWithStreamsProtocol(final boolean stateUpdaterEnabled) { - final Properties props = configProps(false, stateUpdaterEnabled, false); + final Properties props = configProps(false, false); props.setProperty(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.toString()); final StreamsConfig config = new StreamsConfig(props); final InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder( @@ -3692,7 +3516,7 @@ public void testNamedTopologyWithStreamsProtocol(final boolean stateUpdaterEnabl @Test public void testStreamsRebalanceDataWithClassicProtocol() { - final Properties props = configProps(false, false, false); + final Properties props = configProps(false, false); props.setProperty(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.toString()); thread = createStreamThread(CLIENT_ID, new StreamsConfig(props)); assertTrue(thread.streamsRebalanceData().isEmpty()); @@ -3701,7 +3525,7 @@ public void testStreamsRebalanceDataWithClassicProtocol() { @ParameterizedTest @ValueSource(booleans = {true, false}) public void testStreamsRebalanceDataWithExtraCopartition(final boolean stateUpdaterEnabled) { - final Properties props = configProps(false, stateUpdaterEnabled, false); + final Properties props = configProps(false, false); props.setProperty(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.toString()); internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); @@ -3751,7 +3575,7 @@ public void testStreamsRebalanceDataWithExtraCopartition(final boolean stateUpda @Test public void testStreamsRebalanceDataWithStreamsProtocol() { - final Properties props = configProps(false, false, false); + final Properties props = configProps(false, false); props.setProperty(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.toString()); props.setProperty(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:1234"); props.setProperty(StreamsConfig.REPLICATION_FACTOR_CONFIG, "1"); @@ -3858,7 +3682,7 @@ public void testStreamsProtocolRunOnceWithoutProcessingThreads() { ); final Runnable shutdownErrorHook = mock(Runnable.class); - final Properties props = configProps(false, false, false); + final Properties props = configProps(false, false); final StreamsMetadataState streamsMetadataState = new StreamsMetadataState( new TopologyMetadata(internalTopologyBuilder, new StreamsConfig(props)), StreamsMetadataState.UNKNOWN_HOST, @@ -3917,7 +3741,7 @@ public void testStreamsProtocolRunOnceWithoutProcessingThreadsMissingSourceTopic ); final Runnable shutdownErrorHook = mock(Runnable.class); - final Properties props = configProps(false, false, false); + final Properties props = configProps(false, false); final StreamsMetadataState streamsMetadataState = new StreamsMetadataState( new TopologyMetadata(internalTopologyBuilder, new StreamsConfig(props)), StreamsMetadataState.UNKNOWN_HOST, @@ -3986,7 +3810,7 @@ public void testStreamsProtocolIncorrectlyPartitionedTopics() { ); final Runnable shutdownErrorHook = mock(Runnable.class); - final Properties props = configProps(false, false, false); + final Properties props = configProps(false, false); final StreamsMetadataState streamsMetadataState = new StreamsMetadataState( new TopologyMetadata(internalTopologyBuilder, new StreamsConfig(props)), StreamsMetadataState.UNKNOWN_HOST, @@ -4046,7 +3870,7 @@ public void testStreamsProtocolRunOnceWithProcessingThreads() { Map.of() ); - final Properties props = configProps(false, false, false); + final Properties props = configProps(false, false); final Runnable shutdownErrorHook = mock(Runnable.class); final StreamsConfig config = new StreamsConfig(props); final StreamsMetadataState streamsMetadataState = new StreamsMetadataState( @@ -4105,7 +3929,7 @@ public void testStreamsProtocolRunOnceWithProcessingThreadsMissingSourceTopic() Map.of() ); - final Properties props = configProps(false, false, false); + final Properties props = configProps(false, false); final Runnable shutdownErrorHook = mock(Runnable.class); final StreamsConfig config = new StreamsConfig(props); final StreamsMetadataState streamsMetadataState = new StreamsMetadataState( @@ -4174,7 +3998,7 @@ public void testStreamsProtocolMissingSourceTopicRecovery() { Map.of() ); - final Properties props = configProps(false, false, false); + final Properties props = configProps(false, false); final Runnable shutdownErrorHook = mock(Runnable.class); final StreamsConfig config = new StreamsConfig(props); final StreamsMetadataState streamsMetadataState = new StreamsMetadataState( @@ -4376,25 +4200,6 @@ private void addRecord(final MockConsumer mockConsumer, Optional.empty())); } - StreamTask activeTask(final TaskManager taskManager, final TopicPartition partition) { - final Stream standbys = taskManager.allTasks().values().stream().filter(Task::isActive); - for (final Task task : (Iterable) standbys::iterator) { - if (task.inputPartitions().contains(partition)) { - return (StreamTask) task; - } - } - return null; - } - StandbyTask standbyTask(final TaskManager taskManager, final TopicPartition partition) { - final Stream standbys = taskManager.standbyTaskMap().values().stream(); - for (final Task task : (Iterable) standbys::iterator) { - if (task.inputPartitions().contains(partition)) { - return (StandbyTask) task; - } - } - return null; - } - private StreamThread buildStreamThread(final Consumer consumer, final TaskManager taskManager, final StreamsConfig config, From d053600f81b42a70adcc9b453e96e4ba13847d07 Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Sat, 29 Nov 2025 09:50:22 -0800 Subject: [PATCH 2/5] remove unused methods --- .../processor/internals/StreamThreadTest.java | 85 ------------------- 1 file changed, 85 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 36eb2a6a8deac..e66623d49be54 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -2015,91 +2015,6 @@ public void shouldReturnStandbyTaskMetadataWhileRunningState(final boolean proce assertTrue(threadMetadata.activeTasks().isEmpty()); } - private void addActiveRecordsToRestoreConsumer(final MockConsumer restoreConsumer) { - for (long i = 0L; i < 10L; i++) { - restoreConsumer.addRecord(new ConsumerRecord<>( - STREAM_THREAD_TEST_COUNT_ONE_CHANGELOG, - 2, - i, - ("K" + i).getBytes(), - ("V" + i).getBytes())); - } - } - - private void addStandbyRecordsToRestoreConsumer(final MockConsumer restoreConsumer) { - // let the store1 be restored from 0 to 10; store2 be restored from 5 (checkpointed) to 10 - for (long i = 0L; i < 10L; i++) { - restoreConsumer.addRecord(new ConsumerRecord<>( - STREAM_THREAD_TEST_COUNT_ONE_CHANGELOG, - 1, - i, - ("K" + i).getBytes(), - ("V" + i).getBytes())); - restoreConsumer.addRecord(new ConsumerRecord<>( - STREAM_THREAD_TEST_TABLE_TWO_CHANGELOG, - 1, - i, - ("K" + i).getBytes(), - ("V" + i).getBytes())); - } - } - - private void setupThread(final String storeName1, - final String storeName2, - final String changelogName1, - final String changelogName2, - final MockConsumer restoreConsumer, - final boolean addActiveTask) throws IOException { - final TopicPartition activePartition = new TopicPartition(changelogName1, 2); - final TopicPartition partition1 = new TopicPartition(changelogName1, 1); - final TopicPartition partition2 = new TopicPartition(changelogName2, 1); - - internalStreamsBuilder - .stream(Collections.singleton(topic1), consumed) - .groupByKey() - .count(Materialized.as(storeName1)); - final MaterializedInternal> materialized - = new MaterializedInternal<>(Materialized.as(storeName2), internalStreamsBuilder, ""); - internalStreamsBuilder.table(topic2, new ConsumedInternal<>(Consumed.with(null, null)), materialized); - - internalStreamsBuilder.buildAndOptimizeTopology(); - restoreConsumer.updatePartitions(changelogName1, - Collections.singletonList(new PartitionInfo(changelogName1, 1, null, new Node[0], new Node[0])) - ); - - restoreConsumer.updateEndOffsets(Collections.singletonMap(activePartition, 10L)); - restoreConsumer.updateBeginningOffsets(Collections.singletonMap(activePartition, 0L)); - ((MockAdminClient) (thread.adminClient())).updateBeginningOffsets(Collections.singletonMap(activePartition, 0L)); - ((MockAdminClient) (thread.adminClient())).updateEndOffsets(Collections.singletonMap(activePartition, 10L)); - - restoreConsumer.updateEndOffsets(Collections.singletonMap(partition1, 10L)); - restoreConsumer.updateBeginningOffsets(Collections.singletonMap(partition1, 0L)); - restoreConsumer.updateEndOffsets(Collections.singletonMap(partition2, 10L)); - restoreConsumer.updateBeginningOffsets(Collections.singletonMap(partition2, 0L)); - final OffsetCheckpoint checkpoint - = new OffsetCheckpoint(new File(stateDirectory.getOrCreateDirectoryForTask(task3), CHECKPOINT_FILE_NAME)); - checkpoint.write(Collections.singletonMap(partition2, 5L)); - - thread.setState(StreamThread.State.STARTING); - thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet()); - - final Map> activeTasks = new HashMap<>(); - final Map> standbyTasks = new HashMap<>(); - - if (addActiveTask) { - activeTasks.put(task2, Collections.singleton(t1p2)); - } - - // assign single partition - standbyTasks.put(task1, Collections.singleton(t1p1)); - standbyTasks.put(task3, Collections.singleton(t2p1)); - - thread.taskManager().handleAssignment(activeTasks, standbyTasks); - thread.taskManager().tryToCompleteRestoration(mockTime.milliseconds(), null); - - thread.rebalanceListener().onPartitionsAssigned(Collections.emptyList()); - } - @ParameterizedTest @ValueSource(booleans = {true, false}) public void shouldCreateStandbyTask(final boolean processingThreadsEnabled) { From 84b7866584429218e3481f9c6d5749a8c23cb410 Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Sat, 29 Nov 2025 09:56:36 -0800 Subject: [PATCH 3/5] remove unused imports --- .../kafka/streams/processor/internals/StreamThreadTest.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index e66623d49be54..9b9624fac4edf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -76,7 +76,6 @@ import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.internals.ConsumedInternal; import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder; -import org.apache.kafka.streams.kstream.internals.MaterializedInternal; import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; @@ -94,7 +93,6 @@ import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; -import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.apache.kafka.test.MockApiProcessor; import org.apache.kafka.test.MockClientSupplier; import org.apache.kafka.test.MockKeyValueStoreBuilder; @@ -119,8 +117,6 @@ import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; -import java.io.File; -import java.io.IOException; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -150,7 +146,6 @@ import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkProperties; import static org.apache.kafka.streams.processor.internals.ClientUtils.adminClientId; -import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME; import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statelessTask; import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; import static org.apache.kafka.test.TestUtils.waitForCondition; From 3027f6111252e61a0b57ab8cd4855d2ad5d5c8a2 Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Sat, 29 Nov 2025 23:30:54 -0800 Subject: [PATCH 4/5] remove more unused imports --- .../kafka/streams/processor/internals/StreamThreadTest.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 9b9624fac4edf..7786f967cb9bd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -55,7 +55,6 @@ import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; @@ -105,7 +104,6 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; @@ -187,7 +185,6 @@ public class StreamThreadTest { private static final UUID PROCESS_ID = UUID.fromString("87bf53a8-54f2-485f-a4b6-acdbec0a8b3d"); private static final String CLIENT_ID = APPLICATION_ID + "-" + PROCESS_ID; public static final String STREAM_THREAD_TEST_COUNT_ONE_CHANGELOG = "stream-thread-test-count-one-changelog"; - public static final String STREAM_THREAD_TEST_TABLE_TWO_CHANGELOG = "stream-thread-test-table-two-changelog"; private final int threadIdx = 1; private final Metrics metrics = new Metrics(); From ec15b7cfa07c241aecaae7ff78d242a14993cb56 Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Sat, 29 Nov 2025 23:38:52 -0800 Subject: [PATCH 5/5] update test --- .../processor/internals/StreamThreadTest.java | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 7786f967cb9bd..550d1cbfbc1c3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -1321,18 +1321,11 @@ public void shouldInjectProducerPerThreadUsingClientSupplierOnCreateIfEosV2Enabl } @ParameterizedTest - @MethodSource("data") - public void shouldOnlyCompleteShutdownAfterRebalanceNotInProgress(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws InterruptedException { - // The state updater is disabled for this test because this test relies on the fact the mainConsumer.resume() - // is not called. This is not true when the state updater is enabled which leads to - // java.lang.IllegalStateException: No current assignment for partition topic1-2. - // Since this tests verifies an aspect that is independent from the state updater, it is OK to disable - // the state updater and leave the rewriting of the test to later, when the code path for disabled state updater - // is removed. - assumeFalse(stateUpdaterEnabled); + @ValueSource(booleans = {true, false}) + public void shouldOnlyCompleteShutdownAfterRebalanceNotInProgress(final boolean processingThreadsEnabled) throws InterruptedException { internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); - final Properties props = configProps(true, stateUpdaterEnabled, processingThreadsEnabled); + final Properties props = configProps(true, processingThreadsEnabled); thread = createStreamThread(CLIENT_ID, new StreamsConfig(props), new MockTime(1)); @@ -1346,6 +1339,14 @@ public void shouldOnlyCompleteShutdownAfterRebalanceNotInProgress(final boolean activeTasks.put(task1, Collections.singleton(t1p1)); activeTasks.put(task2, Collections.singleton(t1p2)); + // set up mock consumer + final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer(); + mockConsumer.subscribe(Collections.singleton(topic1)); + mockConsumer.updateBeginningOffsets(Map.of(t1p1, 0L, t1p2, 0L)); + + // assign partitions before starting the thread + mockConsumer.rebalance(assignedPartitions); + thread.taskManager().handleAssignment(activeTasks, emptyMap()); thread.start(); @@ -1362,7 +1363,7 @@ public void shouldOnlyCompleteShutdownAfterRebalanceNotInProgress(final boolean assertTrue(thread.isAlive()); Thread.sleep(1000); - assertEquals(Set.of(task1, task2), thread.taskManager().activeTaskIds()); + assertEquals(Set.of(task1, task2), thread.taskManager().allTasks().keySet()); assertEquals(StreamThread.State.PENDING_SHUTDOWN, thread.state()); thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); @@ -1371,7 +1372,7 @@ public void shouldOnlyCompleteShutdownAfterRebalanceNotInProgress(final boolean () -> thread.state() == StreamThread.State.DEAD, 10 * 1000, "Thread never shut down."); - assertEquals(Collections.emptySet(), thread.taskManager().activeTaskIds()); + assertEquals(Collections.emptySet(), thread.taskManager().allTasks().keySet()); } @ParameterizedTest