Skip to content

Commit

Permalink
Issue #792: remove self messaging of OutboudMappingProcessorActor; re…
Browse files Browse the repository at this point in the history
…move unnecessary parts of AbstractGraphActor; fix logger thread safety.

- Converted OutboundMappingProcessorActor's Receive into 2 stages
  to avoid self-messaging.

- Removed AbstractGraphActor.processMessageFlow.
  Renamed processedMessageSink to createSink.

- Use thread-safe loggers in AbstractGraphActor. The logger is used
  asynchronously because it is available inside a stream.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Oct 10, 2020
1 parent 0e20a94 commit b7b42b6
Show file tree
Hide file tree
Showing 10 changed files with 87 additions and 99 deletions.
Expand Up @@ -30,15 +30,13 @@
import org.eclipse.ditto.model.policies.ResourceKey;
import org.eclipse.ditto.model.things.ThingConstants;
import org.eclipse.ditto.services.models.policies.Permission;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.services.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.services.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.services.utils.cache.EntityIdWithResourceType;
import org.eclipse.ditto.services.utils.metrics.instruments.timer.StartedTimer;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.commands.base.exceptions.GatewayInternalErrorException;

import akka.actor.ActorRef;
import akka.event.DiagnosticLoggingAdapter;
import akka.pattern.AskTimeoutException;

/**
Expand Down Expand Up @@ -224,23 +222,21 @@ protected EntityIdWithResourceType entityId() {
* {@code correlation-id} can be extracted in order to enhance the returned DiagnosticLoggingAdapter
* @return the diagnostic logging adapter.
*/
protected DiagnosticLoggingAdapter log(final Object withPotentialDittoHeaders) {
protected ThreadSafeDittoLoggingAdapter log(final Object withPotentialDittoHeaders) {
if (withPotentialDittoHeaders instanceof WithDittoHeaders) {
return log(((WithDittoHeaders<?>) withPotentialDittoHeaders).getDittoHeaders());
return context.getLog().withCorrelationId((WithDittoHeaders<?>) withPotentialDittoHeaders);
}
if (withPotentialDittoHeaders instanceof DittoHeaders) {
LogUtil.enhanceLogWithCorrelationId(context.getLog(), (DittoHeaders) withPotentialDittoHeaders);
return context.getLog().withCorrelationId((DittoHeaders) withPotentialDittoHeaders);
}
return context.getLog();
}

/**
* @return the diagnostic logging adapter.
*/
protected DiagnosticLoggingAdapter log() {
final DittoDiagnosticLoggingAdapter logger = context.getLog();
LogUtil.enhanceLogWithCorrelationId(logger, dittoHeaders());
return logger;
protected ThreadSafeDittoLoggingAdapter log() {
return context.getLog().withCorrelationId(dittoHeaders());
}

/**
Expand Down Expand Up @@ -361,8 +357,8 @@ protected Contextual<WithDittoHeaders> handleExceptionally(final Throwable throw
final DittoRuntimeException dittoRuntimeException =
DittoRuntimeException.asDittoRuntimeException(throwable,
cause -> {
LogUtil.enhanceLogWithCorrelationId(log(), context.getDittoHeaders());
log().error(cause, "Unexpected non-DittoRuntimeException");
log().withCorrelationId(context.getDittoHeaders())
.error(cause, "Unexpected non-DittoRuntimeException");
return GatewayInternalErrorException.newBuilder()
.cause(cause)
.dittoHeaders(context.getDittoHeaders())
Expand Down
Expand Up @@ -37,10 +37,8 @@

import com.github.benmanes.caffeine.cache.Caffeine;

import akka.NotUsed;
import akka.actor.ActorRef;
import akka.japi.pf.ReceiveBuilder;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;

/**
Expand Down Expand Up @@ -146,10 +144,7 @@ private StartedTimer createTimer(final WithDittoHeaders withDittoHeaders) {
}

@Override
protected abstract Flow<Contextual<WithDittoHeaders>, Contextual<WithDittoHeaders>, NotUsed> processMessageFlow();

@Override
protected abstract Sink<Contextual<WithDittoHeaders>, ?> processedMessageSink();
protected abstract Sink<Contextual<WithDittoHeaders>, ?> createSink();

@Override
protected int getBufferSize() {
Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.services.utils.akka.controlflow.WithSender;
import org.eclipse.ditto.services.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.services.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.services.utils.cache.Cache;
import org.eclipse.ditto.services.utils.cache.EntityIdWithResourceType;
import org.eclipse.ditto.services.utils.metrics.instruments.timer.StartedTimer;
Expand Down Expand Up @@ -55,7 +56,7 @@ public final class Contextual<T extends WithDittoHeaders> implements WithSender<

private final Duration askTimeout;

private final DittoDiagnosticLoggingAdapter log;
private final ThreadSafeDittoLoggingAdapter log;

@Nullable
private final EntityIdWithResourceType entityId;
Expand All @@ -80,7 +81,7 @@ public final class Contextual<T extends WithDittoHeaders> implements WithSender<

private Contextual(@Nullable final T message, final ActorRef self, final ActorRef sender,
final ActorRef pubSubMediator, final ActorRef conciergeForwarder,
final Duration askTimeout, final DittoDiagnosticLoggingAdapter log,
final Duration askTimeout, final ThreadSafeDittoLoggingAdapter log,
@Nullable final EntityIdWithResourceType entityId,
@Nullable final StartedTimer startedTimer,
@Nullable final ActorRef receiver,
Expand Down Expand Up @@ -109,7 +110,7 @@ static <T extends WithDittoHeaders<T>> Contextual<T> forActor(final ActorRef sel
final ActorRef pubSubMediator,
final ActorRef conciergeForwarder,
final Duration askTimeout,
final DittoDiagnosticLoggingAdapter log,
final ThreadSafeDittoLoggingAdapter log,
@Nullable final Cache<String, ActorRef> responseReceivers) {

return new Contextual<>(null, self, deadLetters, pubSubMediator, conciergeForwarder, askTimeout, log, null,
Expand Down Expand Up @@ -206,7 +207,7 @@ Duration getAskTimeout() {
return askTimeout;
}

DittoDiagnosticLoggingAdapter getLog() {
ThreadSafeDittoLoggingAdapter getLog() {
return log;
}

Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.services.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.services.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.services.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.services.utils.metrics.DittoMetrics;
import org.eclipse.ditto.services.utils.metrics.instruments.counter.Counter;

Expand Down Expand Up @@ -98,11 +99,10 @@ private void futureComplete(final FutureComplete futureComplete) {
}

private Void dispatchEnforcedMessage(final Contextual<?> enforcementResult) {
final DittoDiagnosticLoggingAdapter logger = enforcementResult.getLog();
final Optional<? extends WithDittoHeaders> messageOpt = enforcementResult.getMessageOptional();
if (messageOpt.isPresent()) {
final WithDittoHeaders<?> message = messageOpt.get();
logger.setCorrelationId(message);
final ThreadSafeDittoLoggingAdapter logger = enforcementResult.getLog().withCorrelationId(message);
final Optional<ActorRef> receiverOpt = enforcementResult.getReceiver();
final Optional<Supplier<CompletionStage<Object>>> askFutureOpt = enforcementResult.getAskFuture();
if (askFutureOpt.isPresent() && receiverOpt.isPresent()) {
Expand All @@ -125,7 +125,7 @@ private Void dispatchEnforcedMessage(final Contextual<?> enforcementResult) {
logger.discardCorrelationId();
} else {
// message does not exist; nothing to dispatch
logger.debug("Not dispatching due to lack of message: {}", enforcementResult);
enforcementResult.getLog().debug("Not dispatching due to lack of message: {}", enforcementResult);
}
return null;
}
Expand Down
Expand Up @@ -113,12 +113,7 @@ public static Props props(final ActorRef pubSubMediator,
}

@Override
protected Flow<Contextual<WithDittoHeaders>, Contextual<WithDittoHeaders>, NotUsed> processMessageFlow() {
return Flow.create();
}

@Override
protected Sink<Contextual<WithDittoHeaders>, ?> processedMessageSink() {
protected Sink<Contextual<WithDittoHeaders>, ?> createSink() {
return sink;
}

Expand Down
Expand Up @@ -12,7 +12,9 @@
*/
package org.eclipse.ditto.services.concierge.enforcement;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.after;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
Expand All @@ -27,7 +29,7 @@
import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.model.policies.PolicyId;
import org.eclipse.ditto.model.things.ThingId;
import org.eclipse.ditto.services.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.services.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.signals.commands.things.modify.ModifyPolicyId;
import org.eclipse.ditto.signals.commands.things.query.RetrieveThing;
import org.junit.AfterClass;
Expand Down Expand Up @@ -74,9 +76,13 @@ public void testOrdering() {
final TestProbe pubSubProbe = TestProbe.apply(actorSystem);
final TestProbe conciergeForwarderProbe = TestProbe.apply(actorSystem);
final TestProbe receiverProbe = TestProbe.apply(actorSystem);
final ThreadSafeDittoLoggingAdapter mockLogger = Mockito.mock(ThreadSafeDittoLoggingAdapter.class);
doAnswer(invocation -> mockLogger).when(mockLogger).withCorrelationId(any(DittoHeaders.class));
doAnswer(invocation -> mockLogger).when(mockLogger).withCorrelationId(any(WithDittoHeaders.class));
doAnswer(invocation -> mockLogger).when(mockLogger).withCorrelationId(any(CharSequence.class));
final Contextual<WithDittoHeaders> baseContextual = Contextual.forActor(getRef(), deadLetterProbe.ref(),
pubSubProbe.ref(), conciergeForwarderProbe.ref(),
Duration.ofSeconds(10), Mockito.mock(DittoDiagnosticLoggingAdapter.class),
Duration.ofSeconds(10), mockLogger,
null
);
final ThingId thingId = ThingId.of("busy", "thing");
Expand Down
Expand Up @@ -88,26 +88,18 @@ protected DispatcherActor.ImmutableDispatch mapMessage(final WithDittoHeaders me
}

@Override
protected Flow<ImmutableDispatch, ImmutableDispatch, NotUsed> processMessageFlow() {
return handler;
}

@Override
protected Sink<ImmutableDispatch, ?> processedMessageSink() {
return Sink.foreach(dispatch -> logger.withCorrelationId(dispatch.getMessage())
.warning("Unhandled Message in DispatcherActor: <{}>", dispatch));
protected Sink<ImmutableDispatch, ?> createSink() {
return handler.to(
Sink.foreach(dispatch -> logger.withCorrelationId(dispatch.getMessage())
.warning("Unhandled Message in DispatcherActor: <{}>", dispatch))
);
}

@Override
protected int getBufferSize() {
return enforcementConfig.getBufferSize();
}

@Override
protected void preEnhancement(final ReceiveBuilder receiveBuilder) {
// no-op
}

/**
* Create Akka actor configuration Props object without pre-enforcer.
*
Expand Down
Expand Up @@ -230,12 +230,7 @@ protected ExternalMessageWithSender mapMessage(final ExternalMessage message) {
}

@Override
protected Flow<ExternalMessageWithSender, ExternalMessageWithSender, NotUsed> processMessageFlow() {
return Flow.create();
}

@Override
protected Sink<ExternalMessageWithSender, ?> processedMessageSink() {
protected Sink<ExternalMessageWithSender, ?> createSink() {

return Flow.<ExternalMessageWithSender>create()
// parallelize potentially CPU-intensive payload mapping on this actor's dispatcher
Expand Down
Expand Up @@ -85,15 +85,18 @@
import org.eclipse.ditto.signals.commands.things.exceptions.ThingNotAccessibleException;
import org.eclipse.ditto.signals.events.things.ThingEventToThingConverter;

import akka.Done;
import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Status;
import akka.japi.Pair;
import akka.japi.pf.ReceiveBuilder;
import akka.japi.pf.PFBuilder;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import scala.PartialFunction;
import scala.runtime.BoxedUnit;

/**
* This Actor processes {@link OutboundSignal outbound signals} and dispatches them.
Expand Down Expand Up @@ -190,31 +193,44 @@ public static Props props(final ActorRef clientActor,
}

@Override
protected int getBufferSize() {
return mappingConfig.getBufferSize();
}

@Override
protected void preEnhancement(final ReceiveBuilder receiveBuilder) {
receiveBuilder
// Outgoing responses and signals go through the signal enrichment stream
public Receive createReceive() {
final PartialFunction<Object, Object> wrapAsOutboundSignal = new PFBuilder<>()
.match(Acknowledgement.class, this::handleNotExpectedAcknowledgement)
.match(CommandResponse.class, response -> handleCommandResponse(response, null, getSender()))
.match(Signal.class, signal -> handleSignal(signal, getSender()))
.match(Status.Failure.class, f -> logger.warning("Got failure with cause {}: {}",
f.cause().getClass().getSimpleName(), f.cause().getMessage()));
.match(DittoRuntimeException.class, this::mapDittoRuntimeException)
.match(Status.Failure.class, f -> {
logger.warning("Got failure with cause {}: {}",
f.cause().getClass().getSimpleName(), f.cause().getMessage());
return Done.getInstance();
})
.matchAny(x -> x)
.build();

final PartialFunction<Object, BoxedUnit> doNothingIfDone = new PFBuilder<Object, BoxedUnit>()
.matchEquals(Done.getInstance(), done -> BoxedUnit.UNIT)
.build();

final Receive addToSourceQueue = super.createReceive();

return new Receive(wrapAsOutboundSignal.andThen(doNothingIfDone.orElse(addToSourceQueue.onMessage())));
}

private void handleNotExpectedAcknowledgement(final Acknowledgement acknowledgement) {
@Override
protected int getBufferSize() {
return mappingConfig.getBufferSize();
}

private Object handleNotExpectedAcknowledgement(final Acknowledgement acknowledgement) {
// acknowledgements are not published to targets or reply-targets. this one is mis-routed.
logger.withCorrelationId(acknowledgement)
.warning("Received Acknowledgement where non was expected, discarding it: {}", acknowledgement);
return Done.getInstance();
}

@Override
protected void handleDittoRuntimeException(final DittoRuntimeException exception) {
private Object mapDittoRuntimeException(final DittoRuntimeException exception) {
final ErrorResponse<?> errorResponse = toErrorResponseFunction.apply(exception, null);
handleErrorResponse(exception, errorResponse, getSender());
return handleErrorResponse(exception, errorResponse, getSender());
}

@Override
Expand All @@ -228,12 +244,7 @@ protected OutboundSignalWithId mapMessage(final OutboundSignal message) {
}

@Override
protected Flow<OutboundSignalWithId, OutboundSignalWithId, NotUsed> processMessageFlow() {
return Flow.create();
}

@Override
protected Sink<OutboundSignalWithId, ?> processedMessageSink() {
protected Sink<OutboundSignalWithId, ?> createSink() {
// Enrich outbound signals by extra fields if necessary.
final Flow<OutboundSignalWithId, OutboundSignal.MultiMapped, ?> flow = Flow.<OutboundSignalWithId>create()
.mapAsync(processorPoolSize, outbound -> toMultiMappedOutboundSignal(
Expand Down Expand Up @@ -367,7 +378,7 @@ private void logEnrichmentFailure(final OutboundSignal outboundSignal, final Con
.forEach(monitor -> monitor.failure(outboundSignal.getSource(), errorToLog));
}

private void handleErrorResponse(final DittoRuntimeException exception, final ErrorResponse<?> errorResponse,
private Object handleErrorResponse(final DittoRuntimeException exception, final ErrorResponse<?> errorResponse,
final ActorRef sender) {

final ThreadSafeDittoLoggingAdapter l = logger.withCorrelationId(exception);
Expand All @@ -383,10 +394,10 @@ private void handleErrorResponse(final DittoRuntimeException exception, final Er
stackTrace);
}

handleCommandResponse(errorResponse, exception, sender);
return handleCommandResponse(errorResponse, exception, sender);
}

private void handleCommandResponse(final CommandResponse<?> response,
private Object handleCommandResponse(final CommandResponse<?> response,
@Nullable final DittoRuntimeException exception, final ActorRef sender) {

final ThreadSafeDittoLoggingAdapter l = logger.isDebugEnabled() ? logger.withCorrelationId(response) : logger;
Expand All @@ -397,14 +408,15 @@ private void handleCommandResponse(final CommandResponse<?> response,
responseDroppedMonitor.success(response,
"Dropped response since requester did not require response via Header {0}.",
DittoHeaderDefinition.EXPECTED_RESPONSE_TYPES);
return Done.getInstance();
} else {
if (isSuccessResponse(response)) {
l.debug("Received response <{}>.", response);
} else if (l.isDebugEnabled()) {
l.debug("Received error response <{}>.", response.toJsonString());
}

handleSignal(response, sender);
return handleSignal(response, sender);
}
}

Expand Down Expand Up @@ -433,10 +445,10 @@ private void forwardToPublisherActor(final OutboundSignal.MultiMapped mappedEnve
*
* @param signal the response/error
*/
private void handleSignal(final Signal<?> signal, final ActorRef sender) {
private Object handleSignal(final Signal<?> signal, final ActorRef sender) {
// map to outbound signal without authorized target (responses and errors are only sent to its origin)
logger.withCorrelationId(signal).debug("Handling raw signal <{}>.", signal);
getSelf().tell(OutboundSignalWithId.of(signal, sender), sender);
return OutboundSignalWithId.of(signal, sender);
}

private Source<OutboundSignalWithId, ?> mapToExternalMessage(final OutboundSignalWithId outbound) {
Expand Down

0 comments on commit b7b42b6

Please sign in to comment.