Skip to content

Commit

Permalink
Refactored DittoTracing to slim down its API and get rid of direct …
Browse files Browse the repository at this point in the history
…dependencies to Kamon.

* `DittoTracing` now also reflects its actual state.
* Wrapped Kamon HTTP context propagation in `KamonHttpContextPropagation` for passing it around.
* When creating a prepared span, some well-known tags like for example correlation ID are automatically put if they can be found in the specified headers.
* Introduced new simple wrapper types to leverage the benefits of strong and static typing.
* Empty traces now have an operation name.

Signed-off-by: Juergen Fickel <juergen.fickel@bosch.io>
  • Loading branch information
Juergen Fickel committed Oct 14, 2022
1 parent 8a1635b commit 28c019c
Show file tree
Hide file tree
Showing 106 changed files with 3,018 additions and 1,265 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ private void startKamon() {
startPrometheusReporter();
}

DittoTracing.initialize(tracingConfig);
DittoTracing.init(tracingConfig);
}

private void startPrometheusReporter() {
Expand Down
7 changes: 7 additions & 0 deletions bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,13 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-utils-tracing</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-connectivity-common</artifactId>
Expand Down
6 changes: 6 additions & 0 deletions connectivity/api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-utils-tracing</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.internal.utils.tracing.DittoTracingInitResource;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.events.ThingDeleted;
import org.junit.ClassRule;
import org.junit.Test;
import org.mutabilitydetector.unittesting.MutabilityAssert;
import org.mutabilitydetector.unittesting.MutabilityMatchers;
Expand All @@ -39,6 +41,10 @@
*/
public final class InboundSignalTest {

@ClassRule
public static final DittoTracingInitResource DITTO_TRACING_INIT_RESOURCE =
DittoTracingInitResource.disableDittoTracing();

@Test
public void assertImmutability() {
MutabilityAssert.assertInstancesOf(InboundSignal.class, MutabilityMatchers.areImmutable(),
Expand Down
7 changes: 6 additions & 1 deletion connectivity/service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,18 @@ jmh-generator-annprocess). jmh-generator-annprocess overwrites the whole META-IN
<scope>test</scope>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-utils-test</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-utils-tracing</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-utils-persistence</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import akka.pattern.Patterns;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import kamon.context.Context;

/**
* Base class for consumer actors that holds common fields and handles the address status.
Expand Down Expand Up @@ -144,8 +143,9 @@ private void prepareResponseHandler(final AcknowledgeableMessage acknowledgeable
.tag(TracingTags.CONNECTION_ID, connectionId.toString())
.tag(TracingTags.CONNECTION_TYPE, connectionType.toString())
.increment();
final Context traceContext = DittoTracing.extractTraceContext(acknowledgeableMessage.getMessage().getHeaders());
DittoTracing.wrapTimer(traceContext, ackTimer);

final var acknowledgeableExternalMessage = acknowledgeableMessage.getMessage();
DittoTracing.newStartedTraceByTimer(acknowledgeableExternalMessage.getHeaders(), ackTimer);

final Duration askTimeout = acknowledgementConfig.getCollectorFallbackAskTimeout();
// Ask response collector actor to get the collected responses in a future
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
import org.eclipse.ditto.internal.utils.config.InstanceIdentifierSupplier;
import org.eclipse.ditto.internal.utils.tracing.DittoTracing;
import org.eclipse.ditto.internal.utils.tracing.TraceOperationName;
import org.eclipse.ditto.internal.utils.tracing.instruments.trace.StartedTrace;
import org.eclipse.ditto.internal.utils.tracing.TracingTags;
import org.eclipse.ditto.messages.model.signals.commands.MessageCommand;
import org.eclipse.ditto.placeholders.ExpressionResolver;
import org.eclipse.ditto.placeholders.PlaceholderFactory;
Expand All @@ -90,7 +90,6 @@
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.japi.pf.ReceiveBuilder;
import kamon.context.Context;

/**
* Base class for publisher actors. Holds the map of configured targets.
Expand Down Expand Up @@ -463,21 +462,24 @@ private SendingOrDropped publishToGenericTarget(final ExpressionResolver resolve
final Object entityId = outboundSource instanceof WithEntityId wEntityId ? wEntityId.getEntityId() : "?";
l.info("Publishing mapped message of type <{}> for id <{}> to PublishTarget <{}>",
outboundSource.getType(), entityId, publishTarget);
l.debug("Publishing mapped message of type <{}> for id <{}> to PublishTarget <{}>: {}", outboundSource.getType(),
entityId, publishTarget, sendingContext.getExternalMessage());
l.debug("Publishing mapped message of type <{}> for id <{}> to PublishTarget <{}>: {}",
outboundSource.getType(),
entityId,
publishTarget,
sendingContext.getExternalMessage());
@Nullable final Target autoAckTarget = sendingContext.getAutoAckTarget().orElse(null);

final HeaderMapping headerMapping = genericTarget.getHeaderMapping();
final ExternalMessage mappedMessage = applyHeaderMapping(resolver, outbound, headerMapping);
final Context context = DittoTracing.extractTraceContext(mappedMessage.getHeaders());
final StartedTrace trace = DittoTracing
.trace(context, TraceOperationName.of(connection.getConnectionType() + "_publish"))
final var trace = DittoTracing.newPreparedTrace(
mappedMessage.getHeaders(),
TraceOperationName.of(connection.getConnectionType() + "_publish")
)
.connectionId(connection.getId())
.connectionType(connection.getConnectionType())
.tag(TracingTags.CONNECTION_TYPE, connection.getConnectionType().toString())
.start();
final ExternalMessage mappedMessageWithTraceContext =
DittoTracing.propagateContext(trace.getContext(), mappedMessage,
(msg, entry) -> msg.withHeader(entry.getKey(), entry.getValue()));
final var mappedMessageWithTraceContext =
mappedMessage.withHeaders(trace.propagateContext(mappedMessage.getHeaders()));

final CompletionStage<SendResult> responsesFuture = publishMessage(outboundSource,
autoAckTarget,
Expand Down Expand Up @@ -655,4 +657,5 @@ private static ResourceStatus getTargetResourceStatus(final Target target) {
Instant.now());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,11 @@
import org.eclipse.ditto.edge.service.headers.DittoHeadersValidator;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;
import org.eclipse.ditto.internal.utils.tracing.DittoTracing;
import org.eclipse.ditto.protocol.Adaptable;
import org.eclipse.ditto.protocol.adapter.ProtocolAdapter;
import org.eclipse.ditto.things.model.ThingConstants;

import akka.actor.ActorSystem;
import kamon.context.Context;

/**
* Processes incoming {@link ExternalMessage}s to {@link Signal}s.
Expand Down Expand Up @@ -115,7 +113,12 @@ static InboundMappingProcessor of(final Connection connection,
final ProtocolAdapter adapter, final DittoHeadersValidator dittoHeadersValidator) {
final var connectionId = connection.getId();
final var connectionType = connection.getConnectionType();
return new InboundMappingProcessor(connectionId, connectionType, registry, logger, adapter, dittoHeadersValidator);
return new InboundMappingProcessor(connectionId,
connectionType,
registry,
logger,
adapter,
dittoHeadersValidator);
}

/**
Expand All @@ -126,16 +129,20 @@ static InboundMappingProcessor of(final Connection connection,
*/
@Override
List<MappingOutcome<MappedInboundExternalMessage>> process(final ExternalMessage message) {
final List<MessageMapper> mappers = getMappers(message.getPayloadMapping().orElse(null));
final var mappers = getMappers(message.getPayloadMapping().orElse(null));
logger.withCorrelationId(message.getHeaders().get(DittoHeaderDefinition.CORRELATION_ID.getKey()))
.debug("Mappers resolved for message: {}", mappers);
final Context context = DittoTracing.extractTraceContext(message.getHeaders());
final MappingTimer mappingTimer = MappingTimer.inbound(connectionId, connectionType, context);
final ExternalMessage externalMessageWithTraceContext =
DittoTracing.propagateContext(mappingTimer.getContext(), message,
(msg, header) -> msg.withHeader(header.getKey(), header.getValue()));
final var mappingTimer = MappingTimer.inbound(connectionId, connectionType, message.getHeaders());
return mappingTimer.overall(() -> mappers.stream()
.flatMap(mapper -> runMapper(mapper, externalMessageWithTraceContext, mappingTimer))
.flatMap(mapper -> {
final var mappingTimerTrace = mappingTimer.getTrace();
return runMapper(
mapper,
message.withHeaders(mappingTimerTrace.propagateContext(message.getHeaders())),
mappingTimer
);
}
)
.toList()
);
}
Expand Down Expand Up @@ -170,7 +177,8 @@ private Stream<MappingOutcome<MappedInboundExternalMessage>> runMapper(final Mes
signalWithMapperHeader);
mappedMessages.add(mappedMessage);
} catch (final Exception e) {
logger.withCorrelationId(e instanceof WithDittoHeaders wdh ? wdh.getDittoHeaders() : adaptable.getDittoHeaders())
logger.withCorrelationId(e instanceof WithDittoHeaders wdh ? wdh.getDittoHeaders() :
adaptable.getDittoHeaders())
.info("Exception during inbound adaptable conversion to Signal: <{}: {}>",
e.getClass().getSimpleName(), e.getMessage());
return Stream.of(MappingOutcome.error(mapper.getId(),
Expand Down
Loading

0 comments on commit 28c019c

Please sign in to comment.