diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java index 15139d22c0..da9039166e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2022 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -740,6 +740,10 @@ private Producer doCreateProducer(@Nullable String txIdPrefix) { return getOrCreateThreadBoundProducer(); } synchronized (this) { + if (this.producer != null && this.producer.closed) { + this.producer.closeDelegate(this.physicalCloseTimeout, this.listeners); + this.producer = null; + } if (this.producer != null && expire(this.producer)) { this.producer = null; } @@ -781,15 +785,13 @@ protected Producer createKafkaProducer() { * Remove the single shared producer and a thread-bound instance if present. * @param producerToRemove the producer. * @param timeout the close timeout. - * @return always true. + * @return true if the producer was closed. * @since 2.2.13 */ - protected final synchronized boolean removeProducer(CloseSafeProducer producerToRemove, Duration timeout) { + protected final boolean removeProducer(CloseSafeProducer producerToRemove, Duration timeout) { if (producerToRemove.closed) { - if (producerToRemove.equals(this.producer)) { - this.producer = null; - producerToRemove.closeDelegate(timeout, this.listeners); - } + CloseSafeProducer tlProd = this.threadBoundProducers.get(); + tlProd.closeDelegate(timeout, this.listeners); this.threadBoundProducers.remove(); return true; } @@ -1139,7 +1141,12 @@ public void close(@Nullable Duration timeout) { } void closeDelegate(Duration timeout, List> listeners) { - this.delegate.close(timeout == null ? this.closeTimeout : timeout); + try { + this.delegate.close(timeout == null ? this.closeTimeout : timeout); + } + catch (Exception ex) { + LOGGER.warn(ex, () -> "Failed to close " + this.delegate); + } listeners.forEach(listener -> listener.producerRemoved(this.clientId, this)); this.closed = true; } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java index a118480f12..1d479b6778 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -305,11 +305,12 @@ protected Producer createKafkaProducer() { }; final Producer aProducer = pf.createProducer(); assertThat(aProducer).isNotNull(); + Producer bProducer = pf.createProducer(); + assertThat(bProducer).isSameAs(aProducer); aProducer.send(null, (meta, ex) -> { }); aProducer.close(ProducerFactoryUtils.DEFAULT_CLOSE_TIMEOUT); - assertThat(KafkaTestUtils.getPropertyValue(pf, "producer")).isNull(); + bProducer = pf.createProducer(); verify(producer1).close(any(Duration.class)); - Producer bProducer = pf.createProducer(); assertThat(bProducer).isNotSameAs(aProducer); }