diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java index f72fdb990a..9f8f40bc2d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java @@ -43,6 +43,7 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.serialization.Serializer; @@ -122,6 +123,8 @@ public class DefaultKafkaProducerFactory implements ProducerFactory, private final Map> consumerProducers = new HashMap<>(); + private final ThreadLocal> threadBoundProducers = new ThreadLocal<>(); + private final AtomicInteger clientIdCounter = new AtomicInteger(); private Supplier> keySerializerSupplier; @@ -138,8 +141,6 @@ public class DefaultKafkaProducerFactory implements ProducerFactory, private boolean producerPerThread; - private ThreadLocal> threadBoundProducers; - private String clientIdPrefix; private volatile CloseSafeProducer producer; @@ -252,7 +253,6 @@ protected String getTransactionIdPrefix() { */ public void setProducerPerThread(boolean producerPerThread) { this.producerPerThread = producerPerThread; - this.threadBoundProducers = new ThreadLocal<>(); } /** @@ -316,8 +316,11 @@ public boolean transactionCapable() { @SuppressWarnings("resource") @Override public void destroy() { - CloseSafeProducer producerToClose = this.producer; - this.producer = null; + CloseSafeProducer producerToClose; + synchronized (this) { + producerToClose = this.producer; + this.producer = null; + } if (producerToClose != null) { producerToClose.getDelegate().close(this.physicalCloseTimeout); } @@ -391,19 +394,19 @@ private Producer doCreateProducer(@Nullable String txIdPrefix) { if (this.producerPerThread) { CloseSafeProducer tlProducer = this.threadBoundProducers.get(); if (tlProducer == null) { - tlProducer = new CloseSafeProducer<>(createKafkaProducer()); + tlProducer = new CloseSafeProducer<>(createKafkaProducer(), this::removeProducer, + this.physicalCloseTimeout); this.threadBoundProducers.set(tlProducer); } return tlProducer; } - if (this.producer == null) { - synchronized (this) { - if (this.producer == null) { - this.producer = new CloseSafeProducer<>(createKafkaProducer()); - } + synchronized (this) { + if (this.producer == null) { + this.producer = new CloseSafeProducer<>(createKafkaProducer(), this::removeProducer, + this.physicalCloseTimeout); } + return this.producer; } - return this.producer; } /** @@ -458,6 +461,20 @@ private void removeConsumerProducer(CloseSafeProducer producerToRemove) { } } + /** + * Remove the single shared producer and a thread-bound instance if present. + * @param producerToRemove the producer; + * @since 2.2.13 + */ + protected final synchronized void removeProducer( + @SuppressWarnings("unused") CloseSafeProducer producerToRemove) { + + if (producerToRemove.equals(this.producer)) { + this.producer = null; + } + this.threadBoundProducers.remove(); + } + /** * Subclasses must return a producer from the {@link #getCache()} or a * new raw producer wrapped in a {@link CloseSafeProducer}. @@ -493,7 +510,7 @@ private CloseSafeProducer doCreateTxProducer(String prefix, String suffix, newProducer = createRawProducer(newProducerConfigs); newProducer.initTransactions(); return new CloseSafeProducer<>(newProducer, getCache(prefix), remover, - (String) newProducerConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)); + (String) newProducerConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG), this.physicalCloseTimeout); } protected Producer createRawProducer(Map configs) { @@ -556,34 +573,43 @@ protected static class CloseSafeProducer implements Producer { private final BlockingQueue> cache; - private final Consumer> removeConsumerProducer; + private final Consumer> removeProducer; private final String txId; - private volatile Exception txFailed; + private final Duration closeTimeout; + + private volatile Exception producerFailed; + + private volatile boolean closed; + + CloseSafeProducer(Producer delegate, Consumer> removeProducer, + Duration closeTimeout) { - CloseSafeProducer(Producer delegate) { - this(delegate, null, null); + this(delegate, null, removeProducer, null, closeTimeout); Assert.isTrue(!(delegate instanceof CloseSafeProducer), "Cannot double-wrap a producer"); } - CloseSafeProducer(Producer delegate, BlockingQueue> cache) { - this(delegate, cache, null); + CloseSafeProducer(Producer delegate, BlockingQueue> cache, + Duration closeTimeout) { + this(delegate, cache, null, closeTimeout); } - CloseSafeProducer(Producer delegate, @Nullable BlockingQueue> cache, - @Nullable Consumer> removeConsumerProducer) { + CloseSafeProducer(Producer delegate, BlockingQueue> cache, + @Nullable Consumer> removeConsumerProducer, Duration closeTimeout) { - this(delegate, cache, removeConsumerProducer, null); + this(delegate, cache, removeConsumerProducer, null, closeTimeout); } - CloseSafeProducer(Producer delegate, @Nullable BlockingQueue> cache, - @Nullable Consumer> removeConsumerProducer, @Nullable String txId) { + CloseSafeProducer(Producer delegate, BlockingQueue> cache, + @Nullable Consumer> removeProducer, @Nullable String txId, + Duration closeTimeout) { this.delegate = delegate; this.cache = cache; - this.removeConsumerProducer = removeConsumerProducer; + this.removeProducer = removeProducer; this.txId = txId; + this.closeTimeout = closeTimeout; LOGGER.debug(() -> "Created new Producer: " + this); } @@ -600,7 +626,18 @@ public Future send(ProducerRecord record) { @Override public Future send(ProducerRecord record, Callback callback) { LOGGER.trace(() -> toString() + " send(" + record + ")"); - return this.delegate.send(record, callback); + return this.delegate.send(record, new Callback() { + + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (exception instanceof OutOfOrderSequenceException) { + CloseSafeProducer.this.producerFailed = exception; + close(CloseSafeProducer.this.closeTimeout); + } + callback.onCompletion(metadata, exception); + } + + }); } @Override @@ -632,7 +669,7 @@ public void beginTransaction() throws ProducerFencedException { } catch (RuntimeException e) { LOGGER.error(e, () -> "beginTransaction failed: " + this); - this.txFailed = e; + this.producerFailed = e; throw e; } } @@ -653,7 +690,7 @@ public void commitTransaction() throws ProducerFencedException { } catch (RuntimeException e) { LOGGER.error(e, () -> "commitTransaction failed: " + this); - this.txFailed = e; + this.producerFailed = e; throw e; } } @@ -661,8 +698,8 @@ public void commitTransaction() throws ProducerFencedException { @Override public void abortTransaction() throws ProducerFencedException { LOGGER.debug(() -> toString() + " abortTransaction()"); - if (this.txFailed != null) { - LOGGER.debug(() -> "abortTransaction ignored - previous txFailed: " + this.txFailed.getMessage() + if (this.producerFailed != null) { + LOGGER.debug(() -> "abortTransaction ignored - previous txFailed: " + this.producerFailed.getMessage() + ": " + this); } else { @@ -671,7 +708,7 @@ public void abortTransaction() throws ProducerFencedException { } catch (RuntimeException e) { LOGGER.error(e, () -> "Abort failed: " + this); - this.txFailed = e; + this.producerFailed = e; throw e; } } @@ -685,25 +722,26 @@ public void close() { @Override public void close(@Nullable Duration timeout) { LOGGER.trace(() -> toString() + " close(" + (timeout == null ? "null" : timeout) + ")"); - if (this.cache != null) { - Duration closeTimeout = this.txFailed instanceof TimeoutException - ? CLOSE_TIMEOUT_AFTER_TX_TIMEOUT - : timeout; - if (this.txFailed != null) { - LOGGER.warn(() -> "Error during transactional operation; producer removed from cache; " - + "possible cause: " - + "broker restarted during transaction: " + this); - this.delegate.close(closeTimeout); - if (this.removeConsumerProducer != null) { - this.removeConsumerProducer.accept(this); + if (!this.closed) { + if (this.producerFailed != null) { + LOGGER.warn(() -> "Error during some operation; producer removed from cache: " + this); + this.closed = true; + this.delegate.close(this.producerFailed instanceof TimeoutException + ? CLOSE_TIMEOUT_AFTER_TX_TIMEOUT + : timeout); + if (this.removeProducer != null) { + this.removeProducer.accept(this); } } else { - if (this.removeConsumerProducer == null) { // dedicated consumer producers are not cached + if (this.cache != null && this.removeProducer == null) { // dedicated consumer producers are not cached synchronized (this) { if (!this.cache.contains(this) && !this.cache.offer(this)) { - this.delegate.close(closeTimeout); + this.closed = true; + this.delegate.close(this.producerFailed instanceof TimeoutException + ? CLOSE_TIMEOUT_AFTER_TX_TIMEOUT + : timeout); } } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java index bd1b09170c..1bee94efdf 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java @@ -19,6 +19,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.willAnswer; import static org.mockito.BDDMockito.willThrow; @@ -31,11 +32,14 @@ import java.util.Map; import java.util.Queue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.UnknownProducerIdException; import org.junit.jupiter.api.Test; import org.mockito.InOrder; @@ -67,7 +71,8 @@ protected Producer createTransactionalProducer(String txIdPrefix) { producer.initTransactions(); BlockingQueue cache = getCache(txIdPrefix); Producer cached = cache.poll(); - return cached == null ? new CloseSafeProducer(producer, cache) : cached; + return cached == null ? new CloseSafeProducer(producer, cache, + Duration.ofSeconds(1)) : cached; } }; @@ -148,7 +153,7 @@ protected Producer createTransactionalProducer(String txIdPrefix) { producer.initTransactions(); BlockingQueue cache = getCache(txIdPrefix); Producer cached = cache.poll(); - return cached == null ? new CloseSafeProducer(producer, cache) : cached; + return cached == null ? new CloseSafeProducer(producer, cache, Duration.ofSeconds(1)) : cached; } }; @@ -221,4 +226,33 @@ protected Producer createRawProducer(Map configs) { assertThat(KafkaTestUtils.getPropertyValue(pf, "consumerProducers", Map.class)).hasSize(0); } + @Test + @SuppressWarnings({ "rawtypes", "unchecked" }) + void testUnknownProducerIdException() { + final Producer producer1 = mock(Producer.class); + willAnswer(inv -> { + ((Callback) inv.getArgument(1)).onCompletion(null, new UnknownProducerIdException("test")); + return null; + }).given(producer1).send(isNull(), any()); + final Producer producer2 = mock(Producer.class); + ProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) { + + private final AtomicBoolean first = new AtomicBoolean(true); + + @Override + protected Producer createKafkaProducer() { + return this.first.getAndSet(false) ? producer1 : producer2; + } + + }; + final Producer aProducer = pf.createProducer(); + assertThat(aProducer).isNotNull(); + aProducer.send(null, (meta, ex) -> { }); + aProducer.close(ProducerFactoryUtils.DEFAULT_CLOSE_TIMEOUT); + assertThat(KafkaTestUtils.getPropertyValue(pf, "producer")).isNull(); + verify(producer1).close(any(Duration.class)); + Producer bProducer = pf.createProducer(); + assertThat(bProducer).isNotSameAs(aProducer); + } + } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java index f9db7a0355..36e6fd68d8 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java @@ -381,7 +381,8 @@ public void testQuickCloseAfterCommitTimeout() { @Override public Producer createProducer(String txIdPrefixArg) { - CloseSafeProducer closeSafeProducer = new CloseSafeProducer<>(producer, getCache()); + CloseSafeProducer closeSafeProducer = new CloseSafeProducer<>(producer, getCache(), + Duration.ofSeconds(1)); return closeSafeProducer; } @@ -406,19 +407,22 @@ public void testNormalCloseAfterCommitCacheFull() { @SuppressWarnings("unchecked") Producer producer = mock(Producer.class); - DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(Collections.emptyMap()) { + DefaultKafkaProducerFactory pf = + new DefaultKafkaProducerFactory(Collections.emptyMap()) { @SuppressWarnings("unchecked") @Override public Producer createProducer(String txIdPrefixArg) { BlockingQueue> cache = new LinkedBlockingDeque<>(1); try { - cache.put(new CloseSafeProducer<>(mock(Producer.class))); + cache.put(new CloseSafeProducer<>(mock(Producer.class), this::removeProducer, + Duration.ofSeconds(1))); } catch (@SuppressWarnings("unused") InterruptedException e) { Thread.currentThread().interrupt(); } - CloseSafeProducer closeSafeProducer = new CloseSafeProducer<>(producer, cache); + CloseSafeProducer closeSafeProducer = new CloseSafeProducer<>(producer, cache, + Duration.ofSeconds(1)); return closeSafeProducer; }