Skip to content

Commit

Permalink
improvement OutboundMappingProcessorActor logging by adding the conne…
Browse files Browse the repository at this point in the history
…ctionId to the MDC

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Mar 7, 2022
1 parent afbac0d commit 6ebbe7e
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*/
package org.eclipse.ditto.concierge.service.enforcement;

import java.util.function.UnaryOperator;

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
Expand Down Expand Up @@ -62,7 +64,7 @@ protected AbstractEnforcerActor(final ActorRef pubSubMediator,
@Nullable final Cache<CacheKey, Entry<CacheKey>> thingIdCache,
@Nullable final Cache<CacheKey, Entry<Enforcer>> policyEnforcerCache) {

super(WithDittoHeaders.class);
super(WithDittoHeaders.class, UnaryOperator.identity());

final var actorContext = getContext();
final var actorSystem = actorContext.getSystem();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.UnaryOperator;

import javax.annotation.concurrent.Immutable;

Expand Down Expand Up @@ -78,7 +79,7 @@ private DispatcherActor(final ActorRef enforcerActor,
final ActorRef pubSubMediator,
final Flow<ImmutableDispatch, ImmutableDispatch, NotUsed> handler) {

super(WithDittoHeaders.class);
super(WithDittoHeaders.class, UnaryOperator.identity());

enforcementConfig = DittoConciergeConfig.of(
DefaultScopedConfig.dittoScoped(getContext().getSystem().settings().config())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@
import org.eclipse.ditto.connectivity.service.util.ConnectivityMdcEntryKey;
import org.eclipse.ditto.internal.models.signalenrichment.SignalEnrichmentFacade;
import org.eclipse.ditto.internal.utils.akka.controlflow.AbstractGraphActor;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.pubsub.StreamingType;
import org.eclipse.ditto.json.JsonField;
Expand Down Expand Up @@ -129,8 +128,6 @@ public final class OutboundMappingProcessorActor
private static final ResourcePlaceholder RESOURCE_PLACEHOLDER = ResourcePlaceholder.getInstance();
private static final TimePlaceholder TIME_PLACEHOLDER = TimePlaceholder.getInstance();

private final ThreadSafeDittoLoggingAdapter dittoLoggingAdapter;

private final ActorRef clientActor;
private final Connection connection;
private final MappingConfig mappingConfig;
Expand All @@ -150,15 +147,14 @@ private OutboundMappingProcessorActor(final ActorRef clientActor,
final ConnectivityConfig connectivityConfig,
final int processorPoolSize) {

super(OutboundSignal.class);
super(OutboundSignal.class, logger ->
logger.withMdcEntry(ConnectivityMdcEntryKey.CONNECTION_ID, connection.getId())
);

this.clientActor = clientActor;
this.outboundMappingProcessors = checkNotEmpty(outboundMappingProcessors, "outboundMappingProcessors");
this.connection = connection;

dittoLoggingAdapter = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter(this)
.withMdcEntry(ConnectivityMdcEntryKey.CONNECTION_ID, this.connection.getId());

final MonitoringConfig monitoringConfig = connectivityConfig.getMonitoringConfig();
mappingConfig = connectivityConfig.getMappingConfig();
final LimitsConfig limitsConfig = connectivityConfig.getLimitsConfig();
Expand Down Expand Up @@ -232,7 +228,7 @@ private static void issueFailedAcknowledgements(final Signal<?> signal,

private int determinePoolSize(final int connectionPoolSize, final int maxPoolSize) {
if (connectionPoolSize > maxPoolSize) {
dittoLoggingAdapter.info("Configured pool size <{}> is greater than the configured max pool size <{}>." +
logger.info("Configured pool size <{}> is greater than the configured max pool size <{}>." +
" Will use max pool size <{}>.", connectionPoolSize, maxPoolSize, maxPoolSize);
return maxPoolSize;
}
Expand Down Expand Up @@ -276,7 +272,7 @@ public Receive createReceive() {
.match(Signal.class, signal -> handleSignal(signal, getSender()))
.match(DittoRuntimeException.class, this::mapDittoRuntimeException)
.match(Status.Failure.class, f -> {
dittoLoggingAdapter.warning("Got failure with cause {}: {}",
logger.warning("Got failure with cause {}: {}",
f.cause().getClass().getSimpleName(), f.cause().getMessage());
return Done.getInstance();
})
Expand All @@ -299,7 +295,7 @@ protected int getBufferSize() {

private Object handleNotExpectedAcknowledgement(final Acknowledgement acknowledgement) {
// acknowledgements are not published to targets or reply-targets. this one is mis-routed.
dittoLoggingAdapter.withCorrelationId(acknowledgement)
logger.withCorrelationId(acknowledgement)
.warning("Received Acknowledgement where non was expected, discarding it: {}", acknowledgement);
return Done.getInstance();
}
Expand Down Expand Up @@ -425,7 +421,7 @@ private CompletionStage<Collection<OutboundSignalWithSender>> enrichAndFilterSig
.orElse(CompletableFuture.completedStage(outboundSignal))
.thenApply(outboundSignalWithExtra -> applyFilter(outboundSignalWithExtra, filteredTopic))
.exceptionally(error -> {
dittoLoggingAdapter.withCorrelationId(outboundSignal.getSource())
logger.withCorrelationId(outboundSignal.getSource())
.warning("Could not retrieve extra data due to: {} {}", error.getClass().getSimpleName(),
error.getMessage());
// recover from all errors to keep message-mapping-stream running despite enrichment failures
Expand Down Expand Up @@ -454,13 +450,13 @@ private List<OutboundSignalWithSender> recoverFromEnrichmentError(final Outbound
// show enrichment failure in service logs according to severity
if (dittoRuntimeException instanceof ThingNotAccessibleException) {
// This error should be rare but possible due to user action; log on INFO level
dittoLoggingAdapter.withCorrelationId(outboundSignal.getSource())
logger.withCorrelationId(outboundSignal.getSource())
.info("Enrichment of <{}> failed due to <{}>.",
outboundSignal.getSource().getClass(), dittoRuntimeException);
} else {
// This error should not have happened during normal operation.
// There is a (possibly transient) problem with the Ditto cluster. Request parent to restart.
dittoLoggingAdapter.withCorrelationId(outboundSignal.getSource())
logger.withCorrelationId(outboundSignal.getSource())
.error(dittoRuntimeException, "Enrichment of <{}> failed due to <{}>.", outboundSignal,
dittoRuntimeException);
final ConnectionFailure connectionFailure =
Expand All @@ -484,7 +480,7 @@ private void logEnrichmentFailure(final OutboundSignal outboundSignal, final Dit
private Object handleErrorResponse(final DittoRuntimeException exception, final ErrorResponse<?> errorResponse,
final ActorRef sender) {

final ThreadSafeDittoLoggingAdapter l = dittoLoggingAdapter.withCorrelationId(exception);
final ThreadSafeDittoLoggingAdapter l = logger.withCorrelationId(exception);

if (l.isInfoEnabled()) {
l.info("Got DittoRuntimeException '{}' when ExternalMessage was processed: {} - {}",
Expand All @@ -504,8 +500,8 @@ private Object handleCommandResponse(final CommandResponse<?> response,
@Nullable final DittoRuntimeException exception, final ActorRef sender) {

final ThreadSafeDittoLoggingAdapter l =
dittoLoggingAdapter.isDebugEnabled() ? dittoLoggingAdapter.withCorrelationId(response) :
dittoLoggingAdapter;
logger.isDebugEnabled() ? logger.withCorrelationId(response) :
logger;
recordResponse(response, exception);
if (!response.isOfExpectedResponseType()) {
l.debug("Requester did not require response (via DittoHeader '{}') - not mapping back to ExternalMessage.",
Expand Down Expand Up @@ -537,8 +533,8 @@ private void recordResponse(final CommandResponse<?> response, @Nullable final D
final OutboundMappingProcessor outboundMappingProcessor) {

final Signal<?> source = outbound.getSource();
if (dittoLoggingAdapter.isDebugEnabled()) {
dittoLoggingAdapter.withCorrelationId(source).debug("Handling outbound signal <{}>.", source);
if (logger.isDebugEnabled()) {
logger.withCorrelationId(source).debug("Handling outbound signal <{}>.", source);
}
return mapToExternalMessage(outbound, outboundMappingProcessor);
}
Expand All @@ -555,7 +551,7 @@ private void forwardToPublisherActor(final OutboundSignal.MultiMapped mappedEnve
*/
private Object handleSignal(final Signal<?> signal, final ActorRef sender) {
// map to outbound signal without authorized target (responses and errors are only sent to its origin)
dittoLoggingAdapter.withCorrelationId(signal).debug("Handling raw signal <{}>.", signal);
logger.withCorrelationId(signal).debug("Handling raw signal <{}>.", signal);
return OutboundSignalWithSender.of(signal, sender);
}

Expand Down Expand Up @@ -585,14 +581,14 @@ private Object handleSignal(final Signal<?> signal, final ActorRef sender) {
final DittoRuntimeException e = (DittoRuntimeException) exception;
monitorsForOther.forEach(monitor ->
monitor.getLogger().failure(infoProvider, e));
dittoLoggingAdapter.withCorrelationId(e)
logger.withCorrelationId(e)
.info("Got DittoRuntimeException during processing Signal: {} - {}",
e.getMessage(),
e.getDescription().orElse(""));
} else {
monitorsForOther.forEach(monitor ->
monitor.getLogger().exception(infoProvider, exception));
dittoLoggingAdapter.withCorrelationId(outbound.getSource())
logger.withCorrelationId(outbound.getSource())
.warning("Got unexpected exception during processing Signal <{}>.",
exception.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import java.util.Collections;
import java.util.Map;
import java.util.function.UnaryOperator;

import org.eclipse.ditto.base.model.entity.id.WithEntityId;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
Expand Down Expand Up @@ -63,10 +64,13 @@ public abstract class AbstractGraphActor<T, M> extends AbstractActor {
* Constructs a new AbstractGraphActor object.
*
* @param matchClass the type of the message to be streamed if matched in this actor's receive handler.
* @param loggerEnhancer a function which can enhance the {@code logger} of this instance, e.g. with additional MDC
* values.
* @throws NullPointerException if {@code matchClass} is {@code null}.
*/
@SuppressWarnings("unchecked")
protected AbstractGraphActor(final Class<?> matchClass) {
protected AbstractGraphActor(final Class<?> matchClass,
final UnaryOperator<ThreadSafeDittoLoggingAdapter> loggerEnhancer) {
this.matchClass = checkNotNull((Class<M>) matchClass, "matchClass");

final Map<String, String> tags = Collections.singletonMap("class", getClass().getSimpleName());
Expand All @@ -76,7 +80,7 @@ protected AbstractGraphActor(final Class<?> matchClass) {
enqueueFailureCounter = DittoMetrics.counter("graph_actor_enqueue_failure", tags);
dequeueCounter = DittoMetrics.counter("graph_actor_dequeue", tags);

logger = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter(this);
this.logger = loggerEnhancer.apply(DittoLoggerFactory.getThreadSafeDittoLoggingAdapter(this));
materializer = Materializer.createMaterializer(this::getContext);
}

Expand Down
1 change: 0 additions & 1 deletion rql/search-option-parser/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@
<groupId>com.github.siom79.japicmp</groupId>
<artifactId>japicmp-maven-plugin</artifactId>
<configuration>
<skip>true</skip> <!-- TODO TJ unskip once 2.3.0 was released -->
<parameter>
<excludes>
<!-- Always exclude the 'internal' package -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ private Source<Map.Entry<ThingId, JsonObject>, NotUsed> retrieveThingFromCaching
.<Map.Entry<ThingId, JsonObject>>map(thing -> new AbstractMap.SimpleImmutableEntry<>(thingId, thing))
.recoverWithRetries(1, new PFBuilder<Throwable, Source<Map.Entry<ThingId, JsonObject>, NotUsed>>()
.match(Throwable.class, error -> {
log.error("Unexpected response for SudoRetrieveThing via cache" + thingId, error);
log.error("Unexpected response for SudoRetrieveThing via cache: <{}>", thingId, error);
return Source.empty();
})
.build());
Expand Down

0 comments on commit 6ebbe7e

Please sign in to comment.