diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaMessageTransformer.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaMessageTransformer.java index 1dc5c513d0..a9cd5efae2 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaMessageTransformer.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaMessageTransformer.java @@ -35,8 +35,8 @@ import org.eclipse.ditto.connectivity.model.EnforcementFilterFactory; import org.eclipse.ditto.connectivity.model.Source; import org.eclipse.ditto.connectivity.service.messaging.monitoring.ConnectionMonitor; -import org.eclipse.ditto.internal.utils.akka.logging.DittoLogger; import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory; +import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLogger; import org.eclipse.ditto.internal.utils.tracing.DittoTracing; import org.eclipse.ditto.internal.utils.tracing.instruments.trace.StartedTrace; @@ -48,7 +48,8 @@ @Immutable final class KafkaMessageTransformer { - private static final DittoLogger LOGGER = DittoLoggerFactory.getLogger(KafkaMessageTransformer.class); + private static final ThreadSafeDittoLogger LOGGER = + DittoLoggerFactory.getThreadSafeLogger(KafkaMessageTransformer.class); private final Source source; private final String sourceAddress; @@ -110,7 +111,7 @@ public TransformationResult transform(final ConsumerRecord c try { final String key = consumerRecord.key(); final ByteBuffer value = consumerRecord.value(); - final DittoLogger correlationIdScopedLogger = LOGGER.withCorrelationId(correlationId); + final ThreadSafeDittoLogger correlationIdScopedLogger = LOGGER.withCorrelationId(correlationId); correlationIdScopedLogger.debug( "Transforming incoming kafka message <{}> with headers <{}> and key <{}>.", value, messageHeaders, key