Skip to content

Commit

Permalink
KAFKA-15429: reset transactionInFlight on StreamsProducer close (#14326)
Browse files Browse the repository at this point in the history
Resets the value of transactionInFlight to false when closing the
StreamsProducer. This ensures we don't try to commit against a
closed producer

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
  • Loading branch information
rodesai authored and ableegoldman committed Sep 3, 2023
1 parent ac4bdb2 commit 3f1a459
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 2 deletions.
Expand Up @@ -192,12 +192,11 @@ public void resetProducer() {

oldProducerTotalBlockedTime += totalBlockedTime(producer);
final long start = time.nanoseconds();
producer.close();
close();
final long closeTime = time.nanoseconds() - start;
oldProducerTotalBlockedTime += closeTime;

producer = clientSupplier.getProducer(eosV2ProducerConfigs);
transactionInitialized = false;
}

private double getMetricValue(final Map<MetricName, ? extends Metric> metrics,
Expand Down Expand Up @@ -371,6 +370,8 @@ void flush() {

void close() {
producer.close();
transactionInFlight = false;
transactionInitialized = false;
}

// for testing only
Expand Down
Expand Up @@ -207,6 +207,34 @@ public void before() {

// functional tests

@Test
public void shouldResetTransactionInFlightOnClose() {
// given:
eosBetaStreamsProducer.send(
new ProducerRecord<>("topic", new byte[1]), (metadata, error) -> { });
assertThat(eosBetaStreamsProducer.transactionInFlight(), is(true));

// when:
eosBetaStreamsProducer.close();

// then:
assertThat(eosBetaStreamsProducer.transactionInFlight(), is(false));
}

@Test
public void shouldResetTransactionInFlightOnReset() {
// given:
eosBetaStreamsProducer.send(
new ProducerRecord<>("topic", new byte[1]), (metadata, error) -> { });
assertThat(eosBetaStreamsProducer.transactionInFlight(), is(true));

// when:
eosBetaStreamsProducer.resetProducer();

// then:
assertThat(eosBetaStreamsProducer.transactionInFlight(), is(false));
}

@Test
public void shouldCreateProducer() {
assertThat(mockClientSupplier.producers.size(), is(1));
Expand Down

0 comments on commit 3f1a459

Please sign in to comment.