diff --git a/modules/dcache/src/main/java/org/dcache/kafka/LoggingProducerListener.java b/modules/dcache/src/main/java/org/dcache/kafka/LoggingProducerListener.java index 69a8b8bbe3d..edbb749bba4 100644 --- a/modules/dcache/src/main/java/org/dcache/kafka/LoggingProducerListener.java +++ b/modules/dcache/src/main/java/org/dcache/kafka/LoggingProducerListener.java @@ -17,8 +17,10 @@ */ package org.dcache.kafka; +import com.google.common.base.Throwables; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.errors.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.support.ProducerListener; @@ -35,7 +37,13 @@ public void onSuccess(ProducerRecord record, RecordMetadata recordMetadata @Override public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) { - LOGGER.error("Producer exception occurred while publishing message : {}, exception : {}", - producerRecord, exception.toString()); + if (exception instanceof TimeoutException && exception.getCause() == null) { + LOGGER.error("Producer failed to send the message," + + " the broker is down or the connection was refused "); + } else { + LOGGER.error( + "Producer exception occurred while publishing message : {}, exception : {}", + producerRecord, Throwables.getRootCause(exception).getMessage()); + } } }