diff --git a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java index c561fc23608b..00c0334b977c 100644 --- a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java +++ b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java @@ -349,8 +349,14 @@ protected Producer getKafkaProducer(Properties props) { protected void append(LoggingEvent event) { String message = subAppend(event); LogLog.debug("[" + new Date(event.getTimeStamp()) + "]" + message); - Future response = producer.send( - new ProducerRecord<>(topic, message.getBytes(StandardCharsets.UTF_8))); + Future response; + try { + response = producer.send(new ProducerRecord<>(topic, message.getBytes(StandardCharsets.UTF_8))); + } catch (IllegalStateException e) { + // The producer has been closed + LogLog.debug("Exception while sending to Kafka", e); + return; + } if (syncSend) { try { response.get(); @@ -370,7 +376,9 @@ private String subAppend(LoggingEvent event) { public void close() { if (!this.closed) { this.closed = true; - producer.close(); + if (producer != null) { + producer.close(); + } } } diff --git a/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java b/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java index 90a791f7f30c..eb64a85bd886 100644 --- a/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java +++ b/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java @@ -24,9 +24,11 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.SaslConfigs; +import org.apache.log4j.Appender; import org.apache.log4j.Logger; import org.apache.log4j.PropertyConfigurator; import org.apache.log4j.helpers.LogLog; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -50,6 +52,17 @@ public void setup() { LogLog.setInternalDebugging(true); } + @AfterEach + public void cleanup() { + Logger rootLogger = Logger.getRootLogger(); + Appender appender = rootLogger.getAppender("KAFKA"); + if (appender != null) { + // Tests which do not call PropertyConfigurator.configure don't create an appender to remove. + rootLogger.removeAppender(appender); + appender.close(); + } + } + @Test public void testKafkaLog4jConfigs() { Properties hostMissingProps = new Properties();