diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java index 61de201a4fa7..bc8a3118fe7b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java @@ -192,12 +192,11 @@ public void resetProducer() { oldProducerTotalBlockedTime += totalBlockedTime(producer); final long start = time.nanoseconds(); - producer.close(); + close(); final long closeTime = time.nanoseconds() - start; oldProducerTotalBlockedTime += closeTime; producer = clientSupplier.getProducer(eosV2ProducerConfigs); - transactionInitialized = false; } private double getMetricValue(final Map metrics, @@ -371,6 +370,8 @@ void flush() { void close() { producer.close(); + transactionInFlight = false; + transactionInitialized = false; } // for testing only diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java index 9470a7b166e1..1787e6092f0f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java @@ -207,6 +207,34 @@ public void before() { // functional tests + @Test + public void shouldResetTransactionInFlightOnClose() { + // given: + eosBetaStreamsProducer.send( + new ProducerRecord<>("topic", new byte[1]), (metadata, error) -> { }); + assertThat(eosBetaStreamsProducer.transactionInFlight(), is(true)); + + // when: + eosBetaStreamsProducer.close(); + + // then: + assertThat(eosBetaStreamsProducer.transactionInFlight(), is(false)); + } + + @Test + public void shouldResetTransactionInFlightOnReset() { + // given: + eosBetaStreamsProducer.send( + new ProducerRecord<>("topic", new byte[1]), (metadata, error) -> { }); + assertThat(eosBetaStreamsProducer.transactionInFlight(), is(true)); + + // when: + eosBetaStreamsProducer.resetProducer(); + + // then: + assertThat(eosBetaStreamsProducer.transactionInFlight(), is(false)); + } + @Test public void shouldCreateProducer() { assertThat(mockClientSupplier.producers.size(), is(1));