Skip to content

Commit

Permalink
Model InboundDispatchingActor as Sink
Browse files Browse the repository at this point in the history
* Allows to use backpressure from dispatching actor

Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Jul 21, 2021
1 parent 5ea5ca5 commit 38ab037
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 188 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ public abstract class BaseClientActor extends AbstractFSMWithStash<BaseClientSta
private static final String MESSAGE_MAPPING_PROCESSOR_DISPATCHER = "message-mapping-processor-dispatcher";

private static final Set<String> NO_ADDRESS_REPORTING_CHILD_NAMES = Set.of(
InboundDispatchingActor.ACTOR_NAME,
OutboundMappingProcessorActor.ACTOR_NAME,
OutboundDispatchingActor.ACTOR_NAME
);
Expand Down Expand Up @@ -315,9 +314,9 @@ protected void init(final ConnectionContext connectionContext) {
outboundDispatchingActor = actorPair.first();
outboundMappingProcessorActor = actorPair.second();

final ActorRef inboundDispatcher =
startInboundDispatchingActor(connection, protocolAdapter, outboundMappingProcessorActor);
inboundMappingSink = getInboundMappingSink(connectionContext, protocolAdapter, inboundDispatcher);
final Sink<Object, NotUsed> inboundDispatchingSink =
startInboundDispatchingSink(connection, protocolAdapter, outboundMappingProcessorActor);
inboundMappingSink = getInboundMappingSink(connectionContext, protocolAdapter, inboundDispatchingSink);
subscriptionManager = startSubscriptionManager(proxyActorSelection,
connectionContext.getConnectivityConfig().getClientConfig());

Expand Down Expand Up @@ -1668,35 +1667,33 @@ private Pair<ActorRef, ActorRef> startOutboundActors(final ConnectionContext con
}

/**
* Starts the {@link InboundDispatchingActor} responsible for signal de-multiplexing and acknowledgement
* Starts the {@link InboundDispatchingSink} responsible for signal de-multiplexing and acknowledgement
* aggregation.
*
* @return the ref to the started {@link InboundDispatchingActor}
* @return the ref to the started {@link InboundDispatchingSink}
* @throws DittoRuntimeException when mapping processor could not get started.
*/
private ActorRef startInboundDispatchingActor(final Connection connection,
private Sink<Object, NotUsed> startInboundDispatchingSink(final Connection connection,
final ProtocolAdapter protocolAdapter,
final ActorRef outboundMappingProcessorActor) {

final Props inboundDispatchingActorProps =
InboundDispatchingActor.props(connection, protocolAdapter.headerTranslator(), proxyActorSelection,
connectionActor, outboundMappingProcessorActor);

return getContext().actorOf(inboundDispatchingActorProps, InboundDispatchingActor.ACTOR_NAME);
return InboundDispatchingSink.createSink(connection, protocolAdapter.headerTranslator(), proxyActorSelection,
connectionActor, outboundMappingProcessorActor, getSelf(), getContext(),
getContext().system().settings().config());
}

/**
* Gets the {@link InboundMappingSink} responsible for payload transformation/mapping.
*
* @param connectionContext the connection.
* @param protocolAdapter the protocol adapter.
* @param inboundDispatchingActor the actor to hand mapping outcomes to.
* @param inboundDispatchingSink the sink to hand mapping outcomes to.
* @return the Sink.
* @throws DittoRuntimeException when mapping processor could not get started.
*/
private Sink<Object, NotUsed> getInboundMappingSink(final ConnectionContext connectionContext,
final ProtocolAdapter protocolAdapter,
final ActorRef inboundDispatchingActor) {
final Sink<Object, NotUsed> inboundDispatchingSink) {

final InboundMappingProcessor inboundMappingProcessor;
try {
Expand All @@ -1721,7 +1718,7 @@ private Sink<Object, NotUsed> getInboundMappingSink(final ConnectionContext conn
return InboundMappingSink.createSink(inboundMappingProcessor,
connectionContext.getConnection().getId(),
processorPoolSize,
inboundDispatchingActor,
inboundDispatchingSink,
mappingConfig,
messageMappingProcessorDispatcher);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.eclipse.ditto.connectivity.model.ConnectivityInternalErrorException;
import org.eclipse.ditto.connectivity.model.Source;
import org.eclipse.ditto.connectivity.model.signals.commands.ConnectivityErrorResponse;
import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig;
import org.eclipse.ditto.connectivity.service.config.DittoConnectivityConfig;
import org.eclipse.ditto.connectivity.service.config.MonitoringConfig;
import org.eclipse.ditto.connectivity.service.messaging.mappingoutcome.MappingOutcome;
Expand All @@ -67,8 +68,8 @@
import org.eclipse.ditto.internal.models.placeholders.ExpressionResolver;
import org.eclipse.ditto.internal.models.placeholders.PlaceholderFactory;
import org.eclipse.ditto.internal.models.placeholders.PlaceholderFilter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLogger;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.messages.model.signals.commands.MessageCommand;
import org.eclipse.ditto.messages.model.signals.commands.acks.MessageCommandAckRequestSetter;
Expand All @@ -85,22 +86,25 @@
import org.eclipse.ditto.thingsearch.model.signals.commands.WithSubscriptionId;
import org.eclipse.ditto.thingsearch.model.signals.commands.subscription.CreateSubscription;

import akka.actor.AbstractActor;
import com.typesafe.config.Config;

import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.ActorRefFactory;
import akka.actor.ActorSelection;
import akka.actor.Props;
import akka.japi.pf.PFBuilder;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.Patterns;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import scala.PartialFunction;
import scala.util.Either;
import scala.util.Left;
import scala.util.Right;

/**
* This Actor dispatches inbound messages after they are mapped.
* This Sink dispatches inbound messages after they are mapped.
*/
public final class InboundDispatchingActor extends AbstractActor
public final class InboundDispatchingSink
implements MappingOutcome.Visitor<MappedInboundExternalMessage, Optional<Signal<?>>> {

/**
Expand All @@ -110,7 +114,7 @@ public final class InboundDispatchingActor extends AbstractActor

private static final String UNKNOWN_MAPPER_ID = "?";

private final DittoDiagnosticLoggingAdapter logger;
private final ThreadSafeDittoLogger logger;

private final HeaderTranslator headerTranslator;
private final Connection connection;
Expand All @@ -123,40 +127,40 @@ public final class InboundDispatchingActor extends AbstractActor
private final ActorRef outboundMessageMappingProcessorActor;
private final ExpressionResolver connectionIdResolver;
private final AcknowledgementConfig acknowledgementConfig;
private final ActorRef clientActor;

@SuppressWarnings("unused")
private InboundDispatchingActor(
private InboundDispatchingSink(
final Connection connection,
final HeaderTranslator headerTranslator,
final ActorSelection proxyActor,
final ActorRef connectionActor,
final ActorRef outboundMessageMappingProcessorActor) {
final ActorRef outboundMessageMappingProcessorActor,
final ActorRef clientActor,
final ConnectivityConfig connectivityConfig,
final LimitsConfig limitsConfig,
final ActorRefFactory actorRefFactory) {

this.proxyActor = proxyActor;
this.outboundMessageMappingProcessorActor = outboundMessageMappingProcessorActor;
this.headerTranslator = headerTranslator;
this.connection = connection;
this.connectionActor = connectionActor;
this.clientActor = clientActor;

logger = DittoLoggerFactory.getDiagnosticLoggingAdapter(this)
logger = DittoLoggerFactory.getThreadSafeLogger(InboundDispatchingSink.class)
.withMdcEntry(ConnectivityMdcEntryKey.CONNECTION_ID, connection.getId());

connectionIdResolver = PlaceholderFactory.newExpressionResolver(
ConnectivityPlaceholders.newConnectionIdPlaceholder(),
connection.getId());

final DefaultScopedConfig dittoScoped =
DefaultScopedConfig.dittoScoped(getContext().getSystem().settings().config());

final DittoConnectivityConfig connectivityConfig = DittoConnectivityConfig.of(dittoScoped);
final MonitoringConfig monitoringConfig = connectivityConfig.getMonitoringConfig();
final LimitsConfig limitsConfig = DefaultLimitsConfig.of(dittoScoped);

connectionMonitorRegistry = DefaultConnectionMonitorRegistry.fromConfig(monitoringConfig);
responseMappedMonitor = connectionMonitorRegistry.forResponseMapped(connection);
toErrorResponseFunction = DittoRuntimeExceptionToErrorResponseFunction.of(limitsConfig.getHeadersMaxSize());
acknowledgementConfig = connectivityConfig.getConnectionConfig().getAcknowledgementConfig();
ackregatorStarter = AcknowledgementAggregatorActorStarter.of(getContext(),
ackregatorStarter = AcknowledgementAggregatorActorStarter.of(actorRefFactory,
acknowledgementConfig,
headerTranslator,
ThingModifyCommandAckRequestSetter.getInstance(),
Expand All @@ -165,7 +169,7 @@ private InboundDispatchingActor(
}

/**
* Creates Akka configuration object for this actor.
* Creates a Sink that dispatches inbound messages after they are mapped..
*
* @param connection the connection
* @param headerTranslator the headerTranslator to use.
Expand All @@ -174,41 +178,70 @@ private InboundDispatchingActor(
* @param outboundMessageMappingProcessorActor used to publish errors.
* @return the Akka configuration Props object.
*/
public static Props props(final Connection connection,
public static Sink<Object, NotUsed> createSink(final Connection connection,
final HeaderTranslator headerTranslator,
final ActorSelection proxyActor,
final ActorRef connectionActor,
final ActorRef outboundMessageMappingProcessorActor) {

return Props.create(InboundDispatchingActor.class,
final ActorRef outboundMessageMappingProcessorActor,
final ActorRef clientActor,
final ActorRefFactory actorRefFactory,
final Config config) {

final var dittoScoped = DefaultScopedConfig.dittoScoped(config);
final var connectivityConfig = DittoConnectivityConfig.of(dittoScoped);
final var limitsConfig = DefaultLimitsConfig.of(dittoScoped);
final var inboundDispatchingSink = new InboundDispatchingSink(
connection,
headerTranslator,
proxyActor,
connectionActor,
outboundMessageMappingProcessorActor
outboundMessageMappingProcessorActor,
clientActor,
connectivityConfig,
limitsConfig,
actorRefFactory
);
return inboundDispatchingSink.getSink();
}

@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.match(InboundMappingOutcomes.class, InboundMappingOutcomes::hasError, this::dispatchError)
.match(InboundMappingOutcomes.class, this::dispatchMapped)
.match(DittoRuntimeException.class, this::onDittoRuntimeException)
.matchAny(message -> logger.warning("Received unknown message <{}>.", message))
.build();
private Sink<Object, NotUsed> getSink() {
return Flow.create()
.divertTo(dispatchError(), InboundDispatchingSink::isInboundMappingOutcomesWithError)
.divertTo(dispatchMapped(), InboundMappingOutcomes.class::isInstance)
.divertTo(onDittoRuntimeException(), DittoRuntimeException.class::isInstance)
.to(Sink.foreach(message -> logger.warn("Received unknown message <{}>.", message)));
}

private static boolean isInboundMappingOutcomesWithError(final Object streamingElement) {
return streamingElement instanceof InboundMappingOutcomes &&
((InboundMappingOutcomes) streamingElement).hasError();
}

private Sink<Object, NotUsed> onDittoRuntimeException() {
return Flow.fromFunction(DittoRuntimeException.class::cast)
.to(Sink.foreach(this::onDittoRuntimeException));
}

private void onDittoRuntimeException(final DittoRuntimeException dittoRuntimeException) {
onError(UNKNOWN_MAPPER_ID, dittoRuntimeException, null, null);
}

private Sink<Object, NotUsed> dispatchError() {
return Flow.fromFunction(InboundMappingOutcomes.class::cast)
.to(Sink.foreach(this::dispatchError));
}

private void dispatchError(final InboundMappingOutcomes outcomes) {
onError(UNKNOWN_MAPPER_ID, outcomes.getError(), null, outcomes.getExternalMessage());
}

private Sink<Object, NotUsed> dispatchMapped() {
return Flow.fromFunction(InboundMappingOutcomes.class::cast)
.to(Sink.foreach(this::dispatchMapped));
}

private void dispatchMapped(final InboundMappingOutcomes outcomes) {
final ActorRef sender = getSender();
final ActorRef sender = outcomes.getSender();
final PartialFunction<Signal<?>, Stream<IncomingSignal>> dispatchResponsesAndSearchCommands =
dispatchResponsesAndSearchCommands(sender, outcomes);
final int ackRequestingSignalCount = outcomes.getOutcomes()
Expand All @@ -219,7 +252,7 @@ private void dispatchMapped(final InboundMappingOutcomes outcomes) {
.mapToInt(this::dispatchIncomingSignal)
.sum();
logger.debug("OnMapped from <{}>: <{}>", sender, outcomes);
sender.tell(ResponseCollectorActor.setCount(ackRequestingSignalCount), getSelf());
sender.tell(ResponseCollectorActor.setCount(ackRequestingSignalCount), ActorRef.noSender());
}

private Set<AcknowledgementLabel> getDeclaredAckLabels(final InboundMappingOutcomes outcomes) {
Expand Down Expand Up @@ -283,8 +316,8 @@ public Optional<Signal<?>> onError(final String mapperId,
@Nullable final TopicPath topicPath,
@Nullable final ExternalMessage message) {

logger.debug("OnError mapperId=<{}> exception=<{}> topicPath=<{}> message=<{}> sender=<{}>",
mapperId, e, topicPath, message, getSender());
logger.debug("OnError mapperId=<{}> exception=<{}> topicPath=<{}> message=<{}>",
mapperId, e, topicPath, message);
if (e instanceof DittoRuntimeException) {
final DittoRuntimeException dittoRuntimeException = (DittoRuntimeException) e;
final ErrorResponse<?> errorResponse = toErrorResponseFunction.apply(dittoRuntimeException, topicPath);
Expand All @@ -301,7 +334,7 @@ public Optional<Signal<?>> onError(final String mapperId,
e.getMessage());
mappedHeaders = applyInboundHeaderMapping(errorResponse, message, authorizationContext,
message.getTopicPath().orElse(null), message.getInternalHeaders());
final DittoDiagnosticLoggingAdapter l = logger.withCorrelationId(mappedHeaders);
final ThreadSafeDittoLogger l = logger.withCorrelationId(mappedHeaders);
l.info("Got exception <{}> when processing external message with mapper <{}>: <{}>",
dittoRuntimeException.getErrorCode(),
mapperId,
Expand All @@ -319,7 +352,7 @@ public Optional<Signal<?>> onError(final String mapperId,
logger.withCorrelationId(Optional.ofNullable(message)
.map(ExternalMessage::getInternalHeaders)
.orElseGet(DittoHeaders::empty)
).warning("Got unknown exception <{}> when processing external message with mapper <{}>: <{}>",
).warn("Got unknown exception <{}> when processing external message with mapper <{}>: <{}>",
e.getClass().getSimpleName(), mapperId, e.getMessage());
}
return Optional.empty();
Expand Down Expand Up @@ -428,7 +461,7 @@ private void startAckregatorAndForwardSignal(final ThingId thingId, final DittoH
responseSignal -> {
// potentially publish response/aggregated acks to reply target
if (signal.getDittoHeaders().isResponseRequired()) {
outboundMessageMappingProcessorActor.tell(responseSignal, getSelf());
outboundMessageMappingProcessorActor.tell(responseSignal, ActorRef.noSender());
}

// forward acks to the original sender for consumer settlement
Expand All @@ -453,29 +486,29 @@ private void handleErrorDuringStartingOfAckregator(final DittoRuntimeException e
final ErrorResponse<?> errorResponse = toErrorResponseFunction.apply(e, null);
// tell sender the error response for consumer settlement
if (sender != null) {
sender.tell(errorResponse, getSelf());
sender.tell(errorResponse, ActorRef.noSender());
}
// publish error response
outboundMessageMappingProcessorActor.tell(errorResponse.setDittoHeaders(dittoHeaders), ActorRef.noSender());
}

/**
* Only special Signals must be forwarded to the {@code ConnectionPersistenceActor}:
* Only special Signals must be forwarded to the {@code ClientActor}:
* <ul>
* <li>{@code Acknowledgement}s which were received via an incoming connection source</li>
* <li>live {@code CommandResponse}s which were received via an incoming connection source</li>
* <li>{@code SearchCommand}s which were received via an incoming connection source</li>
* </ul>
*
* @param signal the Signal to forward to the connectionActor
* @param signal the Signal to forward to the clientActor
* @param sender the sender which shall receive the response
* @param <T> type of elements for the next step..
* @return an empty source of Signals
*/
private <T> Stream<T> forwardToClientActor(final Signal<?> signal,
@Nullable final ActorRef sender) {
// wrap response or search command for dispatching by entity ID
getContext().parent().tell(InboundSignal.of(signal), sender);
clientActor.tell(InboundSignal.of(signal), sender);
return Stream.empty();
}

Expand Down
Loading

0 comments on commit 38ab037

Please sign in to comment.