Skip to content

Commit

Permalink
#1739 provide the traceparent header as MDC value in logs
Browse files Browse the repository at this point in the history
* generify the fields to provide to the MDC in CommonMdcEntryKey enum
* rename "x-correlation-id" in logs to just "correlation-id"
* exchange some places where only correlationId was extracted from a map of headers with parsing all the headers for MDC worthy fields

Signed-off-by: Thomas Jäckle <thomas.jaeckle@beyonnex.io>
  • Loading branch information
thjaeckle committed Sep 18, 2023
1 parent 8bbd417 commit 88d7fa8
Show file tree
Hide file tree
Showing 58 changed files with 360 additions and 294 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -522,8 +522,7 @@ private DevOpsCommandResponseCorrelationActor(final ActorRef devOpsCommandSender
final var dittoHeaders = devOpsCommand.getDittoHeaders();
aggregateResults = isAggregateResults(dittoHeaders);
this.expectedResponses = expectedResponses;
logger = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
logger.setCorrelationId(dittoHeaders);
logger = DittoLoggerFactory.getDiagnosticLoggingAdapter(this).withCorrelationId(dittoHeaders);
}

private static boolean isAggregateResults(final DittoHeaders dittoHeaders) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
package org.eclipse.ditto.connectivity.service.messaging;

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;
import static org.eclipse.ditto.base.model.headers.DittoHeaderDefinition.CORRELATION_ID;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
Expand Down Expand Up @@ -314,11 +313,10 @@ private Stream<SendingOrDropped> sendMappedOutboundSignal(final OutboundSignal.M
final int maxPayloadBytesForSignal) {

final var message = outbound.getExternalMessage();
final String correlationId = message.getHeaders().get(CORRELATION_ID.getKey());
final Signal<?> outboundSource = outbound.getSource();
final List<Target> outboundTargets = outbound.getTargets();

final ThreadSafeDittoLoggingAdapter l = logger.withCorrelationId(correlationId);
final ThreadSafeDittoLoggingAdapter l = logger.withCorrelationId(message.getHeaders());
final Optional<SendingContext> replyTargetSendingContext = getSendingContext(outbound);

final List<SendingContext> sendingContexts = replyTargetSendingContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,11 +329,8 @@ public Optional<Signal<?>> onMapped(final String mapperId,

@Override
public Optional<Signal<?>> onDropped(final String mapperId, @Nullable final ExternalMessage incomingMessage) {
logger.withCorrelationId(Optional.ofNullable(incomingMessage)
.map(ExternalMessage::getHeaders)
.map(h -> h.get(DittoHeaderDefinition.CORRELATION_ID.getKey()))
.orElse(null)
).debug("Message mapping returned null, message is dropped.");
logger.withCorrelationId(incomingMessage != null ? incomingMessage.getHeaders() : null)
.debug("Message mapping returned null, message is dropped.");
if (incomingMessage != null) {
final String source = getAddress(incomingMessage);
final ConnectionMonitor.InfoProvider infoProvider = InfoProviderFactory.forExternalMessage(incomingMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ static InboundMappingProcessor of(final Connection connection,
@Override
List<MappingOutcome<MappedInboundExternalMessage>> process(final ExternalMessage message) {
final var mappers = getMappers(message.getPayloadMapping().orElse(null));
logger.withCorrelationId(message.getHeaders().get(DittoHeaderDefinition.CORRELATION_ID.getKey()))
logger.withCorrelationId(message.getHeaders())
.debug("Mappers resolved for message: {}", mappers);
final var mappingTimer = MappingTimer.inbound(connectionId, connectionType, message.getHeaders());
return mappingTimer.overall(() -> mappers.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.service.config.ThrottlingConfig;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.service.config.mapping.MappingConfig;
Expand Down Expand Up @@ -169,14 +168,12 @@ private InboundMappingOutcomes mapInboundMessage(final ExternalMessageWithSender
final InboundMappingProcessor inboundMappingProcessor) {

final var externalMessage = withSender.externalMessage();
@Nullable final var correlationId =
externalMessage.findHeaderIgnoreCase(DittoHeaderDefinition.CORRELATION_ID.getKey()).orElse(null);
logger.withCorrelationId(correlationId)
logger.withCorrelationId(externalMessage.getHeaders())
.debug("Handling ExternalMessage: {}", externalMessage);
try {
return mapExternalMessageToSignal(withSender, inboundMappingProcessor);
} catch (final Exception e) {
logger.withCorrelationId(correlationId)
logger.withCorrelationId(externalMessage.getHeaders())
.error("Handling exception when mapping external message: {}", e.getMessage());
return InboundMappingOutcomes.of(withSender.externalMessage(), e, withSender.sender());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
package org.eclipse.ditto.connectivity.service.messaging;

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;
import static org.eclipse.ditto.base.model.headers.DittoHeaderDefinition.CORRELATION_ID;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
Expand All @@ -29,7 +30,6 @@
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.entity.id.WithEntityId;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
Expand Down Expand Up @@ -217,10 +217,9 @@ private void updateSendMonitor(@Nullable final Exception exception) {
dittoRuntimeException.getClass().getSimpleName(), dittoRuntimeException.getMessage());
} else {
publishedMonitor.exception(message, exception);
final DittoHeaders internalHeaders = message.getInternalHeaders();
@Nullable final String correlationId = internalHeaders.getCorrelationId()
.orElseGet(() -> message.findHeaderIgnoreCase(CORRELATION_ID.getKey()).orElse(null));
logger.withCorrelationId(correlationId)
final Map<String, String> combinedHeaders = new HashMap<>(message.getHeaders());
combinedHeaders.putAll(message.getInternalHeaders());
logger.withCorrelationId(combinedHeaders)
.info("Unexpected failure when publishing signal - {}: {}",
exception.getClass().getSimpleName(), exception.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ private void handleJmsMessage(final JmsMessage message) {
.start();
headers = startedSpan.propagateContext(headers);
final ExternalMessageBuilder builder = ExternalMessageFactory.newExternalMessageBuilder(headers);
final ExternalMessage externalMessage = extractPayloadFromMessage(message, builder, correlationId)
final ExternalMessage externalMessage = extractPayloadFromMessage(message, builder)
.withAuthorizationContext(source.getAuthorizationContext())
.withEnforcement(headerEnforcementFilterFactory.getFilter(headers))
.withHeaderMapping(source.getHeaderMapping())
Expand All @@ -374,10 +374,10 @@ private void handleJmsMessage(final JmsMessage message) {
.build();
inboundMonitor.success(externalMessage);
final Map<String, String> externalMessageHeaders = externalMessage.getHeaders();
logger.withCorrelationId(correlationId)
logger.withCorrelationId(headers)
.info("Received message from AMQP 1.0 with externalMessageHeaders: {}", externalMessageHeaders);
if (logger.isDebugEnabled()) {
logger.withCorrelationId(correlationId).debug("Received message from AMQP 1.0 with payload: {}",
logger.withCorrelationId(headers).debug("Received message from AMQP 1.0 with payload: {}",
externalMessage.getTextPayload().orElse("binary"));
}
forwardToMapping(externalMessage,
Expand All @@ -404,7 +404,7 @@ private void handleJmsMessage(final JmsMessage message) {
inboundMonitor.exception(e);
}
startedSpan.tagAsFailed(e);
logger.withCorrelationId(correlationId)
logger.withCorrelationId(headers)
.error(e, "Unexpected {}: {}", e.getClass().getName(), e.getMessage());
} finally {
startedSpan.finish();
Expand All @@ -422,8 +422,6 @@ private void handleJmsMessage(final JmsMessage message) {
private void acknowledge(final JmsMessage message, final boolean isSuccess, final boolean redeliver,
final Map<String, String> externalMessageHeaders) {

final Optional<String> correlationId = Optional.ofNullable(
externalMessageHeaders.get(DittoHeaderDefinition.CORRELATION_ID.getKey()));
try {
final String messageId = message.getJMSMessageID();
recordAckForRateLimit(messageId, isSuccess, redeliver);
Expand All @@ -437,8 +435,7 @@ private void acknowledge(final JmsMessage message, final boolean isSuccess, fina
ackType = redeliver ? MODIFIED_FAILED : REJECTED;
ackTypeName = redeliver ? "modified[delivery-failed]" : "rejected";
}
final String jmsCorrelationID = message.getJMSCorrelationID();
logger.withCorrelationId(correlationId.orElse(jmsCorrelationID))
logger.withCorrelationId(externalMessageHeaders)
.info(MessageFormat.format(
"Acking <{0}> with original external message headers=<{1}>, isSuccess=<{2}>, ackType=<{3} {4}>",
messageId,
Expand All @@ -457,15 +454,15 @@ private void acknowledge(final JmsMessage message, final boolean isSuccess, fina
"Sending negative acknowledgement: <{0}>", ackTypeName);
}
} catch (final IllegalStateException e) {
logger.withCorrelationId(correlationId.orElse(null))
logger.withCorrelationId(externalMessageHeaders)
.warning(e, "Failed to ack an AMQP message because of server side issues");
} catch (final Exception e) {
logger.withCorrelationId(correlationId.orElse(null)).error(e, "Failed to ack an AMQP message");
logger.withCorrelationId(externalMessageHeaders).error(e, "Failed to ack an AMQP message");
}
}

private ExternalMessageBuilder extractPayloadFromMessage(final JmsMessage message,
final ExternalMessageBuilder builder, @Nullable final String correlationId) throws JMSException {
final ExternalMessageBuilder builder) throws JMSException {
if (message instanceof TextMessage textMessage) {
final String payload = textMessage.getText();
if (payload == null) {
Expand All @@ -487,7 +484,7 @@ private ExternalMessageBuilder extractPayloadFromMessage(final JmsMessage messag
if (logger.isDebugEnabled()) {
final Destination destination = message.getJMSDestination();
final Map<String, String> headersMapFromJmsMessage = extractHeadersMapFromJmsMessage(message);
logger.withCorrelationId(correlationId)
logger.withCorrelationId(headersMapFromJmsMessage)
.debug("Received message at '{}' of unsupported type ({}) with headers: {}",
destination, message.getClass().getName(), headersMapFromJmsMessage);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public TransformationResult transform(final ConsumerRecord<String, ByteBuffer> c
try {
final String key = consumerRecord.key();
final ByteBuffer value = consumerRecord.value();
final ThreadSafeDittoLogger correlationIdScopedLogger = LOGGER.withCorrelationId(correlationId);
final ThreadSafeDittoLogger correlationIdScopedLogger = LOGGER.withCorrelationId(messageHeaders);
correlationIdScopedLogger.debug(
"Transforming incoming kafka message <{}> with headers <{}> and key <{}>.",
value, messageHeaders, key
Expand Down Expand Up @@ -148,7 +148,7 @@ public TransformationResult transform(final ConsumerRecord<String, ByteBuffer> c
return TransformationResult.failed(e.setDittoHeaders(DittoHeaders.of(messageHeaders)));
} catch (final Exception e) {
inboundMonitor.exception(messageHeaders, e);
LOGGER.withCorrelationId(correlationId)
LOGGER.withCorrelationId(messageHeaders)
.error(String.format("Unexpected {%s}: {%s}", e.getClass().getName(), e.getMessage()), e);
startedSpan.tagAsFailed(e);
return null; // Drop message
Expand Down
4 changes: 2 additions & 2 deletions connectivity/service/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%date{ISO8601} %-5level [%X{x-correlation-id}] %logger{20} %X{pekkoSource} - %msg%n</pattern>
<pattern>%date{ISO8601} %-5level [%X{correlation-id}][%X{traceparent}] %logger{20} %X{pekkoSource} - %msg%n</pattern>
</encoder>
</appender>

<appender name="STDERR" class="ch.qos.logback.core.ConsoleAppender">
<target>System.err</target>
<encoder>
<pattern>%date{ISO8601} %-5level [%X{x-correlation-id}] %logger{20} %X{pekkoSource} - %msg%n</pattern>
<pattern>%date{ISO8601} %-5level [%X{correlation-id}][%X{traceparent}] %logger{20} %X{pekkoSource} - %msg%n</pattern>
</encoder>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>ERROR</level>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ protected Sink<Object, NotUsed> setupInboundMappingSink(final ActorRef clientAct
.thenReturn(logger);
when(logger.withCorrelationId(Mockito.any(WithDittoHeaders.class)))
.thenReturn(logger);
when(logger.withCorrelationId(Mockito.any(Map.class))).thenReturn(logger);
final ProtocolAdapter protocolAdapter = protocolAdapterProvider.getProtocolAdapter(null);
final var connection = CONNECTION.toBuilder().payloadMappingDefinition(payloadMappingDefinition).build();
final InboundMappingProcessor inboundMappingProcessor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ ActorRef createInboundMappingProcessorActor(final ActorRef proxyActor,
final var logger = Mockito.mock(ThreadSafeDittoLoggingAdapter.class);
Mockito.when(logger.withCorrelationId(Mockito.any(DittoHeaders.class)))
.thenReturn(logger);
Mockito.when(logger.withCorrelationId(Mockito.any(Map.class))).thenReturn(logger);
Mockito.when(logger.withCorrelationId(Mockito.nullable(CharSequence.class)))
.thenReturn(logger);
Mockito.when(logger.withCorrelationId(Mockito.any(WithDittoHeaders.class)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ private static InboundMappingProcessor createThrowingProcessor() {
final ThreadSafeDittoLoggingAdapter logger = Mockito.mock(ThreadSafeDittoLoggingAdapter.class);
Mockito.doAnswer(inv -> logger).when(logger).withCorrelationId(Mockito.<CharSequence>any());
Mockito.doAnswer(inv -> logger).when(logger).withCorrelationId(Mockito.<DittoHeaders>any());
Mockito.doAnswer(inv -> logger).when(logger).withCorrelationId(Mockito.<Map>any());
final Connection connection =
TestConstants.createConnection(ConnectionId.of("connectionId"), ConnectionType.MQTT);
return InboundMappingProcessor.of(connection,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public static void setUp() {
when(logger.withCorrelationId(Mockito.nullable(String.class))).thenReturn(logger);
when(logger.withCorrelationId(Mockito.nullable(WithDittoHeaders.class))).thenReturn(logger);
when(logger.withCorrelationId(Mockito.nullable(DittoHeaders.class))).thenReturn(logger);
when(logger.withCorrelationId(Mockito.nullable(Map.class))).thenReturn(logger);
when(logger.withCorrelationId(Mockito.nullable(CharSequence.class))).thenReturn(logger);

protocolAdapterProvider = new DittoProtocolAdapterProvider(Mockito.mock(ProtocolConfig.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.withSettings;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand Down Expand Up @@ -92,8 +93,8 @@ public void setUp() {
.build();

Mockito.when(externalMessage.getInternalHeaders()).thenReturn(dittoHeaders);
Mockito.when(logger.withCorrelationId(Mockito.nullable(CharSequence.class))).thenReturn(logger);
Mockito.when(logger.withCorrelationId(Mockito.any(WithDittoHeaders.class))).thenReturn(logger);
Mockito.when(logger.withCorrelationId(Mockito.any(Map.class))).thenReturn(logger);

exceptionConverter = DefaultExceptionToAcknowledgementConverter.getInstance();
}
Expand Down Expand Up @@ -313,7 +314,7 @@ public void monitorAndAcknowledgeWhenFutureResponseTerminatedExceptionallyAndNoA

Mockito.verifyNoInteractions(acknowledgedMonitor);
Mockito.verify(publishedMonitor).exception(eq(externalMessage), eq(rootCause));
Mockito.verify(logger).withCorrelationId(testName.getMethodName());
Mockito.verify(logger).withCorrelationId(Map.of("correlation-id", testName.getMethodName()));
assertThat(result).hasValueSatisfying(resultFuture -> assertThat(resultFuture).isCompletedWithValue(null));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ public static ThreadSafeDittoLoggingAdapter mockThreadSafeDittoLoggingAdapter()
when(logger.withCorrelationId(Mockito.nullable(String.class))).thenReturn(logger);
when(logger.withCorrelationId(Mockito.nullable(WithDittoHeaders.class))).thenReturn(logger);
when(logger.withCorrelationId(Mockito.nullable(DittoHeaders.class))).thenReturn(logger);
when(logger.withCorrelationId(Mockito.nullable(Map.class))).thenReturn(logger);
when(logger.withCorrelationId(Mockito.nullable(CharSequence.class))).thenReturn(logger);
return logger;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ private Sink<Object, NotUsed> setupMappingSink(final ActorRef testRef,
final ThreadSafeDittoLoggingAdapter logger = mock(ThreadSafeDittoLoggingAdapter.class);
when(logger.withCorrelationId(any(DittoHeaders.class)))
.thenReturn(logger);
when(logger.withCorrelationId(any(Map.class))).thenReturn(logger);
when(logger.withCorrelationId(Mockito.nullable(CharSequence.class)))
.thenReturn(logger);
when(logger.withCorrelationId(any(WithDittoHeaders.class)))
Expand Down
2 changes: 1 addition & 1 deletion deployment/helm/ditto/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ description: |
A digital twin is a virtual, cloud based, representation of his real world counterpart
(real world “Things”, e.g. devices like sensors, smart heating, connected cars, smart grids, EV charging stations etc).
type: application
version: 3.3.7 # chart version is effectively set by release-job
version: 3.3.8 # chart version is effectively set by release-job
appVersion: 3.3.7
keywords:
- iot-chart
Expand Down
1 change: 0 additions & 1 deletion deployment/helm/ditto/logback-config/connectivity.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
<excludeMdcKeyName>sourceActorSystem</excludeMdcKeyName>
<excludeMdcKeyName>pekkoUid</excludeMdcKeyName>
<excludeMdcKeyName>pekkoTimestamp</excludeMdcKeyName>
<mdcKeyFieldName>x-correlation-id=correlation-id</mdcKeyFieldName>
<mdcKeyFieldName>connection-id=ditto-connection-id</mdcKeyFieldName>
<mdcKeyFieldName>connection-type=ditto-connection-type</mdcKeyFieldName>
</encoder>
Expand Down
1 change: 0 additions & 1 deletion deployment/helm/ditto/logback-config/gateway.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
<excludeMdcKeyName>sourceActorSystem</excludeMdcKeyName>
<excludeMdcKeyName>pekkoUid</excludeMdcKeyName>
<excludeMdcKeyName>pekkoTimestamp</excludeMdcKeyName>
<mdcKeyFieldName>x-correlation-id=correlation-id</mdcKeyFieldName>
</encoder>
</appender>

Expand Down

0 comments on commit 88d7fa8

Please sign in to comment.