Skip to content

Commit

Permalink
route liveQueryCommandResponses from StreamingSessionActor and Outbou…
Browse files Browse the repository at this point in the history
…ndDispatchingActor to concierge for filtering the response based on the policy;

Signed-off-by: Stefan Maute <stefan.maute@bosch.io>
  • Loading branch information
Stefan Maute authored and jbartelh committed Oct 8, 2021
1 parent 4c57c00 commit 85cc494
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ public abstract class BaseClientActor extends AbstractFSMWithStash<BaseClientSta

private final Connection connection;
private final ActorRef connectionActor;
private final ActorRef proxyActor;
private final ActorSelection proxyActorSelection;
private final Gauge clientGauge;
private final Gauge clientConnectingGauge;
Expand Down Expand Up @@ -213,6 +214,7 @@ public abstract class BaseClientActor extends AbstractFSMWithStash<BaseClientSta
private int childActorCount = 0;

protected BaseClientActor(final Connection connection,
//TODO check if @Nullable can be removed
@Nullable final ActorRef proxyActor,
final ActorRef connectionActor,
final DittoHeaders dittoHeaders) {
Expand All @@ -221,6 +223,7 @@ protected BaseClientActor(final Connection connection,
materializer = Materializer.createMaterializer(system);
this.connection = checkNotNull(connection, "connection");
this.connectionActor = connectionActor;
this.proxyActor = proxyActor;
// this is retrieve via the extension for each baseClientActor in order to not pass it as constructor arg
// as all constructor arguments need to be serializable as the BaseClientActor is started behind a cluster
// router
Expand Down Expand Up @@ -1750,7 +1753,8 @@ private Pair<ActorRef, ActorRef> startOutboundActors(final ConnectionContext con
final ActorRef processorActor =
getContext().actorOf(outboundMappingProcessorActorProps, OutboundMappingProcessorActor.ACTOR_NAME);

final Props outboundDispatchingProcessorActorProps = OutboundDispatchingActor.props(settings, processorActor);
final Props outboundDispatchingProcessorActorProps =
OutboundDispatchingActor.props(settings, processorActor, proxyActor);
final ActorRef dispatchingActor =
getContext().actorOf(outboundDispatchingProcessorActorProps, OutboundDispatchingActor.ACTOR_NAME);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.eclipse.ditto.internal.models.acks.AcknowledgementForwarderActor;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.protocol.TopicPath;
import org.eclipse.ditto.things.model.signals.commands.query.ThingQueryCommandResponse;
import org.eclipse.ditto.thingsearch.model.signals.events.SubscriptionEvent;

import akka.actor.AbstractActor;
Expand All @@ -58,16 +60,19 @@ final class OutboundDispatchingActor extends AbstractActor {

private final OutboundMappingSettings settings;
private final ActorRef outboundMappingProcessorActor;
private final ActorRef proxyActor;

@SuppressWarnings("unused")
private OutboundDispatchingActor(final OutboundMappingSettings settings,
final ActorRef outboundMappingProcessorActor) {
final ActorRef outboundMappingProcessorActor, final ActorRef proxyActor) {
this.settings = settings;
this.outboundMappingProcessorActor = outboundMappingProcessorActor;
this.proxyActor = proxyActor;
}

static Props props(final OutboundMappingSettings settings, final ActorRef outboundMappingProcessorActor) {
return Props.create(OutboundDispatchingActor.class, settings, outboundMappingProcessorActor);
static Props props(final OutboundMappingSettings settings, final ActorRef outboundMappingProcessorActor,
final ActorRef proxyActor) {
return Props.create(OutboundDispatchingActor.class, settings, outboundMappingProcessorActor, proxyActor);
}

@Override
Expand Down Expand Up @@ -182,7 +187,14 @@ private void handleInboundResponseOrAcknowledgement(final WithDittoHeaders respo
}

final ActorContext context = getContext();
final Consumer<ActorRef> action = forwarder -> forwarder.forward(responseOrAck, context);
final Consumer<ActorRef> action = forwarder -> {
if (responseOrAck instanceof ThingQueryCommandResponse && isLiveResponse(responseOrAck)) {
// forward live command responses to concierge to filter response
proxyActor.tell(responseOrAck, getSender());
} else {
forwarder.forward(responseOrAck, context);
}
};
final Runnable emptyAction = () -> {
final String template = "No AcknowledgementForwarderActor found, forwarding to concierge: <{}>";
if (logger.isDebugEnabled()) {
Expand All @@ -197,4 +209,8 @@ private void handleInboundResponseOrAcknowledgement(final WithDittoHeaders respo
.ifPresentOrElse(action, emptyAction);
}

private boolean isLiveResponse(final WithDittoHeaders response) {
return response.getDittoHeaders().getChannel().filter(TopicPath.Channel.LIVE.getName()::equals).isPresent();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
import org.eclipse.ditto.connectivity.service.mapping.DittoConnectionContext;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.protocol.TopicPath;
import org.eclipse.ditto.protocol.adapter.DittoProtocolAdapter;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThingResponse;
import org.eclipse.ditto.things.model.signals.events.AttributeModified;
import org.junit.After;
import org.junit.Before;
Expand All @@ -55,10 +57,12 @@
public final class OutboundDispatchingActorTest {

private ActorSystem actorSystem;
private TestProbe proxyActor;

@Before
public void init() {
actorSystem = ActorSystem.create(getClass().getSimpleName(), TestConstants.CONFIG);
proxyActor = TestProbe.apply("proxy", actorSystem);
}

@After
Expand Down Expand Up @@ -287,6 +291,40 @@ public void testLiveMessageWithAuthorizedSubjectExpectIsNotForwarded() {
TestConstants.Targets.TWIN_TARGET);
}

@Test
public void testHandleLiveResponse() {
new TestKit(actorSystem) {{
final ConnectionId connectionId = ConnectionId.of("testHandleSignalWithAcknowledgementRequest");
final Connection connectionWithTestAck = TestConstants.createConnection()
.toBuilder()
.id(connectionId)
.sources(TestConstants.createConnection().getSources().stream()
.map(source -> ConnectivityModelFactory.newSourceBuilder(source)
.declaredAcknowledgementLabels(Set.of(getTestAck(connectionId)))
.build())
.collect(Collectors.toList()))
.build();
final TestProbe probe = TestProbe.apply("probe", actorSystem);
final ActorRef underTest = getOutboundDispatchingActor(connectionWithTestAck, probe.ref());

final DittoHeaders dittoHeaders = DittoHeaders.newBuilder()
.channel(TopicPath.Channel.LIVE.getName())
.readGrantedSubjects(Collections.singleton(TestConstants.Authorization.SUBJECT))
.timeout("2s")
.randomCorrelationId()
.build();

final RetrieveThingResponse retrieveThingResponse =
RetrieveThingResponse.of(TestConstants.Things.THING_ID, TestConstants.Things.THING, null,
null, dittoHeaders);

underTest.tell(InboundSignal.of(retrieveThingResponse), getRef());

final RetrieveThingResponse forwardedResponse = proxyActor.expectMsgClass(RetrieveThingResponse.class);
assertThat(forwardedResponse).isEqualTo(retrieveThingResponse);
}};
}

private void testForwardThingEvent(final Connection connection, final boolean isForwarded, final Signal<?> signal,
final Target expectedTarget) {

Expand All @@ -295,12 +333,14 @@ private void testForwardThingEvent(final Connection connection, final boolean is
final ActorSelection proxyActorSelection = ActorSelection.apply(proxyActor.ref(), "");
final TestProbe mappingActor = TestProbe.apply("mapping", actorSystem);

final var connectionContext = DittoConnectionContext.of(connection, TestConstants.CONNECTIVITY_CONFIG);
final var connectionContext =
DittoConnectionContext.of(connection, TestConstants.CONNECTIVITY_CONFIG);
final OutboundMappingSettings settings =
OutboundMappingSettings.of(connectionContext, actorSystem, proxyActorSelection,
DittoProtocolAdapter.newInstance(),
MockActor.getThreadSafeDittoLoggingAdapter(actorSystem));
final ActorRef underTest = childActorOf(OutboundDispatchingActor.props(settings, mappingActor.ref()));
final ActorRef underTest =
childActorOf(OutboundDispatchingActor.props(settings, mappingActor.ref(), proxyActor.ref()));

underTest.tell(signal, getRef());

Expand All @@ -319,14 +359,14 @@ private void testForwardThingEvent(final Connection connection, final boolean is
}

private ActorRef getOutboundDispatchingActor(final Connection connection, final ActorRef mappingActor) {
final TestProbe proxyActor = TestProbe.apply("proxy", actorSystem);
final ActorSelection proxyActorSelection = ActorSelection.apply(proxyActor.ref(), "");

final var connectionContext = DittoConnectionContext.of(connection, TestConstants.CONNECTIVITY_CONFIG);
final var connectionContext =
DittoConnectionContext.of(connection, TestConstants.CONNECTIVITY_CONFIG);
final OutboundMappingSettings settings =
OutboundMappingSettings.of(connectionContext, actorSystem, proxyActorSelection,
DittoProtocolAdapter.newInstance(), MockActor.getThreadSafeDittoLoggingAdapter(actorSystem));
return actorSystem.actorOf(OutboundDispatchingActor.props(settings, mappingActor));
return actorSystem.actorOf(OutboundDispatchingActor.props(settings, mappingActor, proxyActor.ref()));
}

private static AcknowledgementLabel getTestAck(final ConnectionId connectionId) {
Expand All @@ -336,4 +376,5 @@ private static AcknowledgementLabel getTestAck(final ConnectionId connectionId)
private static AcknowledgementLabel getPlaceholderAck() {
return AcknowledgementLabel.of("{{connection:id}}:test-ack");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.eclipse.ditto.rql.query.filter.QueryFilterCriteriaFactory;
import org.eclipse.ditto.things.model.signals.commands.acks.ThingLiveCommandAckRequestSetter;
import org.eclipse.ditto.things.model.signals.commands.acks.ThingModifyCommandAckRequestSetter;
import org.eclipse.ditto.things.model.signals.commands.query.ThingQueryCommandResponse;
import org.eclipse.ditto.thingsearch.model.signals.commands.ThingSearchCommand;
import org.eclipse.ditto.thingsearch.model.signals.events.SubscriptionEvent;

Expand Down Expand Up @@ -484,7 +485,14 @@ private void forwardAcknowledgementOrLiveCommandResponse(final CommandResponse<?
try {
getContext().findChild(AcknowledgementForwarderActor.determineActorName(response.getDittoHeaders()))
.ifPresentOrElse(
forwarder -> forwarder.tell(response, sender),
forwarder -> {
if (response instanceof ThingQueryCommandResponse && isLiveResponse(response)) {
// forward live command responses to concierge to filter response
commandRouter.tell(response, sender);
} else {
forwarder.tell(response, sender);
}
},
() -> {
// the Acknowledgement / LiveCommandResponse is meant for someone else:
final var template =
Expand All @@ -503,6 +511,10 @@ private void forwardAcknowledgementOrLiveCommandResponse(final CommandResponse<?
}
}

private boolean isLiveResponse(final CommandResponse<?> response) {
return response.getDittoHeaders().getChannel().filter(TopicPath.Channel.LIVE.getName()::equals).isPresent();
}

private void forwardSearchCommand(final ThingSearchCommand<?> searchCommand) {
subscriptionManager.tell(searchCommand, getSelf());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,13 @@
import org.eclipse.ditto.internal.models.acks.config.DefaultAcknowledgementConfig;
import org.eclipse.ditto.internal.utils.pubsub.DittoProtocolSub;
import org.eclipse.ditto.internal.utils.pubsub.StreamingType;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.jwt.model.JsonWebToken;
import org.eclipse.ditto.jwt.model.JwtInvalidException;
import org.eclipse.ditto.protocol.HeaderTranslator;
import org.eclipse.ditto.protocol.TopicPath;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThingResponse;
import org.eclipse.ditto.things.model.signals.events.ThingDeleted;
import org.junit.After;
import org.junit.Rule;
Expand Down Expand Up @@ -122,7 +125,8 @@ public StreamingSessionActorTest() {
TestSink.probe(actorSystem);
final Source<SessionedJsonifiable, SourceQueueWithComplete<SessionedJsonifiable>> source =
Source.queue(100, OverflowStrategy.fail());
final var pair = source.viaMat(KillSwitches.single(), Keep.both()).toMat(sink, Keep.both()).run(actorSystem);
final var pair =
source.viaMat(KillSwitches.single(), Keep.both()).toMat(sink, Keep.both()).run(actorSystem);
sourceQueue = pair.first().first();
sinkProbe = pair.second();
killSwitch = pair.first().second();
Expand Down Expand Up @@ -337,6 +341,24 @@ public void jwtExpirationTimeClosesStream() {
}};
}


@Test
public void sendLiveCommandResponseAndEnsureForwarding() {
new TestKit(actorSystem) {{
final var underTest = watch(actorSystem.actorOf(getProps()));
final DittoHeaders dittoHeaders =
DittoHeaders.newBuilder()
.channel(TopicPath.Channel.LIVE.getName())
.correlationId("corr:" + testName.getMethodName()).build();

final RetrieveThingResponse retrieveThingResponse =
RetrieveThingResponse.of(ThingId.generateRandom(), JsonObject.empty(), dittoHeaders);

underTest.tell(IncomingSignal.of(retrieveThingResponse), ActorRef.noSender());
commandRouterProbe.expectMsg(retrieveThingResponse);
}};
}

private static String getTokenString() {
return getTokenString(Instant.now().plusSeconds(60L));
}
Expand Down

0 comments on commit 85cc494

Please sign in to comment.