Skip to content

Commit

Permalink
undid sending "live" CommandResponses in InboundDispatchingSink to pr…
Browse files Browse the repository at this point in the history
…oxyActor

* the responses must be correlated via clientActor and outboundDispatchingActor to original sender
* from there, the response must be sent via commandForwarder to e.g. "things" in order to get filtered correctly

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Jul 14, 2022
1 parent 763a90e commit 62ed425
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 35 deletions.
Expand Up @@ -1733,7 +1733,8 @@ private Pair<ActorRef, ActorRef> startOutboundActors(final ProtocolAdapter proto
final ActorRef processorActor =
getContext().actorOf(outboundMappingProcessorActorProps, OutboundMappingProcessorActor.ACTOR_NAME);

final Props outboundDispatchingProcessorActorProps = OutboundDispatchingActor.props(settings, processorActor);
final Props outboundDispatchingProcessorActorProps = OutboundDispatchingActor.props(
commandForwarderActorSelection, settings, processorActor);
final ActorRef dispatchingActor =
getContext().actorOf(outboundDispatchingProcessorActorProps, OutboundDispatchingActor.ACTOR_NAME);

Expand Down
Expand Up @@ -504,10 +504,9 @@ private PartialFunction<Signal<?>, Stream<IncomingSignal>> dispatchResponsesAndS
forwardAcknowledgement(ack, declaredAckLabels, outcomes))
.match(Acknowledgements.class, acks ->
forwardAcknowledgements(acks, declaredAckLabels, outcomes))
.match(CommandResponse.class, ProtocolAdapter::isLiveSignal, liveResponse -> {
proxyActor.tell(liveResponse, ActorRef.noSender());
return Stream.empty();
})
.match(CommandResponse.class, ProtocolAdapter::isLiveSignal, liveResponse ->
forwardToClientActor(liveResponse, ActorRef.noSender())
)
.match(CreateSubscription.class, cmd -> forwardToConnectionActor(cmd, sender))
.match(WithSubscriptionId.class, cmd -> forwardToClientActor(cmd, sender))
.matchAny(baseSignal -> ackregatorStarter.preprocess(baseSignal,
Expand Down Expand Up @@ -642,6 +641,7 @@ private void handleErrorDuringStartingOfAckregator(final DittoRuntimeException e
* Only special Signals must be forwarded to the {@code ClientActor}:
* <ul>
* <li>{@code Acknowledgement}s which were received via an incoming connection source</li>
* <li>live {@code CommandResponse}s which were received via an incoming connection source</li>
* <li>{@code SearchCommand}s which were received via an incoming connection source</li>
* </ul>
*
Expand Down
Expand Up @@ -59,16 +59,22 @@ final class OutboundDispatchingActor extends AbstractActor {

private final OutboundMappingSettings settings;
private final ActorRef outboundMappingProcessorActor;
private final ActorSelection commandForwarder;

@SuppressWarnings("unused")
private OutboundDispatchingActor(final OutboundMappingSettings settings,
private OutboundDispatchingActor(final ActorSelection commandForwarder,
final OutboundMappingSettings settings,
final ActorRef outboundMappingProcessorActor) {

this.commandForwarder = commandForwarder;
this.settings = settings;
this.outboundMappingProcessorActor = outboundMappingProcessorActor;
}

static Props props(final OutboundMappingSettings settings, final ActorRef outboundMappingProcessorActor) {
return Props.create(OutboundDispatchingActor.class, settings, outboundMappingProcessorActor);
static Props props(final ActorSelection commandForwarder, final OutboundMappingSettings settings,
final ActorRef outboundMappingProcessorActor) {

return Props.create(OutboundDispatchingActor.class, commandForwarder, settings, outboundMappingProcessorActor);
}

@Override
Expand Down Expand Up @@ -155,6 +161,7 @@ private Signal<?> adjustSignalAndStartAckForwarder(final Signal<?> signal, final
// start ackregator for source declared acks
return AcknowledgementForwarderActor.startAcknowledgementForwarder(getContext(),
self(),
commandForwarder,
entityId,
signal,
settings.getAcknowledgementConfig(),
Expand Down
Expand Up @@ -106,7 +106,7 @@ final class StreamingSessionActor extends AbstractActorWithTimers {
private final String type;
private final DittoProtocolSub dittoProtocolSub;
private final SourceQueueWithComplete<SessionedJsonifiable> eventAndResponsePublisher;
private final ActorRef commandRouter;
private final ActorRef commandForwarder;
private final AcknowledgementConfig acknowledgementConfig;
private final ActorRef subscriptionManager;
private final Set<StreamingType> outstandingSubscriptionAcks;
Expand All @@ -122,7 +122,7 @@ final class StreamingSessionActor extends AbstractActorWithTimers {
@SuppressWarnings("unused")
private StreamingSessionActor(final Connect connect,
final DittoProtocolSub dittoProtocolSub,
final ActorRef commandRouter,
final ActorRef commandForwarder,
final AcknowledgementConfig acknowledgementConfig,
final HeaderTranslator headerTranslator,
final Props subscriptionManagerProps,
Expand All @@ -134,7 +134,7 @@ private StreamingSessionActor(final Connect connect,
type = connect.getType();
this.dittoProtocolSub = dittoProtocolSub;
eventAndResponsePublisher = connect.getEventAndResponsePublisher();
this.commandRouter = commandRouter;
this.commandForwarder = commandForwarder;
this.acknowledgementConfig = acknowledgementConfig;
this.jwtValidator = jwtValidator;
this.jwtAuthenticationResultProvider = jwtAuthenticationResultProvider;
Expand Down Expand Up @@ -166,7 +166,7 @@ private StreamingSessionActor(final Connect connect,
*
* @param connect the command to start a streaming session.
* @param dittoProtocolSub manager of subscriptions.
* @param commandRouter the actor who distributes incoming commands in the Ditto cluster.
* @param commandForwarder the actor who distributes incoming commands in the Ditto cluster.
* @param acknowledgementConfig the config to apply for Acknowledgements.
* @param headerTranslator translates headers from external sources or to external sources.
* @param subscriptionManagerProps Props of the subscription manager for search protocol.
Expand All @@ -176,7 +176,7 @@ private StreamingSessionActor(final Connect connect,
*/
static Props props(final Connect connect,
final DittoProtocolSub dittoProtocolSub,
final ActorRef commandRouter,
final ActorRef commandForwarder,
final AcknowledgementConfig acknowledgementConfig,
final HeaderTranslator headerTranslator,
final Props subscriptionManagerProps,
Expand All @@ -186,7 +186,7 @@ static Props props(final Connect connect,
return Props.create(StreamingSessionActor.class,
connect,
dittoProtocolSub,
commandRouter,
commandForwarder,
acknowledgementConfig,
headerTranslator,
subscriptionManagerProps,
Expand Down Expand Up @@ -231,12 +231,12 @@ private Receive createIncomingSignalBehavior() {
.match(Acknowledgement.class, this::hasUndeclaredAckLabel, this::ackLabelNotDeclared)
.match(Acknowledgement.class, this::forwardAcknowledgementOrLiveCommandResponse)
.match(CommandResponse.class, CommandResponse::isLiveCommandResponse, liveCommandResponse ->
commandRouter.forward(liveCommandResponse, getContext()))
commandForwarder.forward(liveCommandResponse, getContext()))
.match(CommandResponse.class, this::forwardAcknowledgementOrLiveCommandResponse)
.match(ThingSearchCommand.class, this::forwardSearchCommand)
.match(Signal.class, signal ->
// forward signals for which no reply is expected with self return address for downstream errors
commandRouter.tell(signal, getReturnAddress(signal)))
commandForwarder.tell(signal, getReturnAddress(signal)))
.matchEquals(Done.getInstance(), done -> {})
.build();

Expand Down Expand Up @@ -450,7 +450,7 @@ private Object startAckregatorAndForward(final Signal<?> signal) {
.orElseGet(() -> ackregatorStarter.doStart(entityIdOptional.get(),
s, null, this::publishResponseOrError,
(ackregator, adjustedSignal) -> {
commandRouter.tell(adjustedSignal, ackregator);
commandForwarder.tell(adjustedSignal, ackregator);
return Done.getInstance();
}
));
Expand All @@ -472,7 +472,8 @@ private Signal<?> startAckForwarder(final Signal<?> signal) {
if (entityIdOptional.isPresent()) {
final var entityIdWithType = entityIdOptional.get();
return AcknowledgementForwarderActor.startAcknowledgementForwarder(getContext(),
self(),
getSelf(),
getContext().actorSelection(commandForwarder.path()),
entityIdWithType,
signal,
acknowledgementConfig,
Expand Down Expand Up @@ -514,7 +515,7 @@ private void forwardAcknowledgementOrLiveCommandResponse(final CommandResponse<?
} else {
logger.withCorrelationId(response).info(template, response.getType());
}
commandRouter.tell(response, ActorRef.noSender());
commandForwarder.tell(response, ActorRef.noSender());
}
);
} catch (final DittoRuntimeException e) {
Expand Down
Expand Up @@ -58,13 +58,16 @@ public final class AcknowledgementForwarderActor extends AbstractActor {
*/
static final String ACTOR_NAME_PREFIX = "ackForwarder-";

private final ActorSelection commandForwarder;
private final String correlationId;
@Nullable private final String ackgregatorAddressFallback;
private final DittoDiagnosticLoggingAdapter log;

@SuppressWarnings("unused")
private AcknowledgementForwarderActor(final DittoHeaders dittoHeaders, final Duration defaultTimeout) {
private AcknowledgementForwarderActor(final ActorSelection commandForwarder, final DittoHeaders dittoHeaders,
final Duration defaultTimeout) {

this.commandForwarder = commandForwarder;
correlationId = dittoHeaders.getCorrelationId()
.orElseGet(() ->
// fall back using the actor name which also contains the correlation-id
Expand All @@ -79,12 +82,14 @@ private AcknowledgementForwarderActor(final DittoHeaders dittoHeaders, final Dur
/**
* Creates Akka configuration object Props for this AcknowledgementForwarderActor.
*
* @param commandForwarder the actor ref of the edge command forwarder actor.
* @param dittoHeaders the DittoHeaders of the Signal which contained the request for Acknowledgements.
* @param defaultTimeout the default timeout to apply when {@code dittoHeaders} did not contain a specific timeout.
* @return the Akka configuration Props object.
*/
static Props props(final DittoHeaders dittoHeaders, final Duration defaultTimeout) {
return Props.create(AcknowledgementForwarderActor.class, dittoHeaders, defaultTimeout);
static Props props(final ActorSelection commandForwarder, final DittoHeaders dittoHeaders,
final Duration defaultTimeout) {
return Props.create(AcknowledgementForwarderActor.class, commandForwarder, dittoHeaders, defaultTimeout);
}

@Override
Expand All @@ -100,17 +105,17 @@ private void forwardCommandResponse(final WithDittoHeaders acknowledgementOrResp
final DittoHeaders dittoHeaders = acknowledgementOrResponse.getDittoHeaders();
final String ackregatorAddress = dittoHeaders.getOrDefault(
DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey(), ackgregatorAddressFallback);
if (null != ackregatorAddress) {
if (null != ackregatorAddress && acknowledgementOrResponse instanceof Acknowledgement acknowledgement) {
final ActorSelection acknowledgementRequester = getContext().actorSelection(ackregatorAddress);
log.withCorrelationId(acknowledgementOrResponse)
log.withCorrelationId(acknowledgement)
.debug("Received Acknowledgement / live CommandResponse, forwarding to acknowledgement " +
"aggregator <{}>: " + "<{}>", acknowledgementRequester, acknowledgementOrResponse);
acknowledgementRequester.tell(acknowledgementOrResponse, getSender());
"aggregator <{}>: " + "<{}>", acknowledgementRequester, acknowledgement);
acknowledgementRequester.tell(acknowledgement, getSender());
} else {
log.withCorrelationId(acknowledgementOrResponse)
.error("Received Acknowledgement / live CommandResponse <{}> did not contain header of " +
"acknowledgement aggregator address: {}",
.debug("Received live CommandResponse <{}>, forwarding to command forwarder: {}",
acknowledgementOrResponse.getClass().getSimpleName(), dittoHeaders);
commandForwarder.tell(acknowledgementOrResponse, ActorRef.noSender());
}
}

Expand Down Expand Up @@ -141,12 +146,13 @@ public static String determineActorName(final DittoHeaders dittoHeaders) {

static Optional<ActorRef> startAcknowledgementForwarderForTest(final ActorRefFactory actorRefFactory,
final ActorRef parent,
final ActorSelection commandForwarder,
final EntityId entityId,
final Signal<?> signal,
final AcknowledgementConfig acknowledgementConfig) {

final AcknowledgementForwarderActorStarter starter = AcknowledgementForwarderActorStarter
.getInstance(actorRefFactory, parent, entityId, signal, acknowledgementConfig,
.getInstance(actorRefFactory, parent, commandForwarder, entityId, signal, acknowledgementConfig,
label -> true);
return starter.get();
}
Expand All @@ -160,6 +166,7 @@ static Optional<ActorRef> startAcknowledgementForwarderForTest(final ActorRefFac
*
* @param actorRefFactory the factory to start the forwarder actor in.
* @param parent the parent of the forwarder actor.
* @param commandForwarder the actor ref of the edge command forwarder actor.
* @param entityId the entityId of the {@code Signal} which requested the Acknowledgements.
* @param signal the signal for which acknowledgements are expected.
* @param acknowledgementConfig the AcknowledgementConfig to use for looking up config values.
Expand All @@ -169,12 +176,13 @@ static Optional<ActorRef> startAcknowledgementForwarderForTest(final ActorRefFac
*/
public static Signal<?> startAcknowledgementForwarder(final ActorRefFactory actorRefFactory,
final ActorRef parent,
final ActorSelection commandForwarder,
final EntityId entityId,
final Signal<?> signal,
final AcknowledgementConfig acknowledgementConfig,
final Predicate<AcknowledgementLabel> isAckLabelAllowed) {
final AcknowledgementForwarderActorStarter starter =
AcknowledgementForwarderActorStarter.getInstance(actorRefFactory, parent, entityId,
AcknowledgementForwarderActorStarter.getInstance(actorRefFactory, parent, commandForwarder, entityId,
signal, acknowledgementConfig, isAckLabelAllowed);
final DittoHeadersBuilder<?, ?> dittoHeadersBuilder = signal.getDittoHeaders().toBuilder();
starter.getConflictFree().ifPresent(dittoHeadersBuilder::correlationId);
Expand Down
Expand Up @@ -62,6 +62,7 @@ final class AcknowledgementForwarderActorStarter implements Supplier<Optional<Ac

private final ActorRefFactory actorRefFactory;
private final ActorRef parent;
private final ActorSelection commandForwarder;
private final EntityId entityId;
private final Signal<?> signal;
private final DittoHeaders dittoHeaders;
Expand All @@ -70,13 +71,15 @@ final class AcknowledgementForwarderActorStarter implements Supplier<Optional<Ac

private AcknowledgementForwarderActorStarter(final ActorRefFactory actorRefFactory,
final ActorRef parent,
final ActorSelection commandForwarder,
final EntityId entityId,
final Signal<?> signal,
final AcknowledgementConfig acknowledgementConfig,
final Predicate<AcknowledgementLabel> isAckLabelAllowed) {

this.actorRefFactory = checkNotNull(actorRefFactory, "actorRefFactory");
this.parent = parent;
this.parent = checkNotNull(parent, "parent");
this.commandForwarder = checkNotNull(commandForwarder, "commandForwarder");
this.entityId = checkNotNull(entityId, "entityId");
this.signal = checkNotNull(signal, "signal");
dittoHeaders = signal.getDittoHeaders();
Expand All @@ -92,6 +95,7 @@ private AcknowledgementForwarderActorStarter(final ActorRefFactory actorRefFacto
*
* @param actorRefFactory the factory to start the forwarder actor in.
* @param parent the parent of the forwarder actor.
* @param commandForwarder the actor ref of the edge command forwarder actor.
* @param entityId is used for the NACKs if the forwarder actor cannot be started.
* @param signal the signal for which the forwarder actor is to start.
* @param acknowledgementConfig provides configuration setting regarding acknowledgement handling.
Expand All @@ -102,12 +106,13 @@ private AcknowledgementForwarderActorStarter(final ActorRefFactory actorRefFacto
*/
static AcknowledgementForwarderActorStarter getInstance(final ActorRefFactory actorRefFactory,
final ActorRef parent,
final ActorSelection commandForwarder,
final EntityId entityId,
final Signal<?> signal,
final AcknowledgementConfig acknowledgementConfig,
final Predicate<AcknowledgementLabel> isAckLabelAllowed) {

return new AcknowledgementForwarderActorStarter(actorRefFactory, parent, entityId, signal,
return new AcknowledgementForwarderActorStarter(actorRefFactory, parent, commandForwarder, entityId, signal,
acknowledgementConfig,
// live-response is always allowed
isAckLabelAllowed.or(DittoAcknowledgementLabel.LIVE_RESPONSE::equals));
Expand Down Expand Up @@ -192,7 +197,7 @@ private Pair<String, Integer> parseCorrelationId(final DittoHeaders dittoHeaders
}

private ActorRef startAckForwarderActor(final DittoHeaders dittoHeaders) {
final Props props = AcknowledgementForwarderActor.props(dittoHeaders,
final Props props = AcknowledgementForwarderActor.props(commandForwarder, dittoHeaders,
acknowledgementConfig.getForwarderFallbackTimeout());
return actorRefFactory.actorOf(props, AcknowledgementForwarderActor.determineActorName(dittoHeaders));
}
Expand Down
Expand Up @@ -39,6 +39,7 @@

import com.typesafe.config.ConfigFactory;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.testkit.TestProbe;
import akka.testkit.javadsl.TestKit;
Expand Down Expand Up @@ -138,7 +139,8 @@ public void startForwarderActorWithDuplicateCorrelationId() {
}

private AcknowledgementForwarderActorStarter getActorStarter(final DittoHeaders dittoHeaders) {
return AcknowledgementForwarderActorStarter.getInstance(actorSystem, TestProbe.apply(actorSystem).ref(),
final ActorRef ref = TestProbe.apply(actorSystem).ref();
return AcknowledgementForwarderActorStarter.getInstance(actorSystem, ref, actorSystem.actorSelection(ref.path()),
KNOWN_ENTITY_ID,
ThingDeleted.of(KNOWN_ENTITY_ID, 1L, Instant.EPOCH, dittoHeaders, null),
acknowledgementConfig, label -> true);
Expand Down

0 comments on commit 62ed425

Please sign in to comment.