Skip to content

Commit

Permalink
Concierge: Add an actor to handle live thing query commands with cust…
Browse files Browse the repository at this point in the history
…om acknowledgement requests.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Nov 15, 2021
1 parent 56bca4b commit 150da16
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.concierge.service.actors;

import java.time.Duration;
import java.util.HashSet;
import java.util.Set;

import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgements;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.things.model.signals.commands.query.ThingQueryCommandResponse;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.ReceiveTimeout;
import akka.japi.pf.ReceiveBuilder;

/**
* An actor to deal with a live thing query command expecting a response and requesting custom acknowledgements.
* The sender of the live command is an actor in Concierge in order to apply policy enforcement on the response.
* As a result, custom acknowledgements are also sent to Concierge, which must forward them to the acknowledgement
* aggregator actor.
*/
public final class LiveResponseAndAcknowledgementForwarder extends AbstractActor {

private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(60);

private final DittoDiagnosticLoggingAdapter logger;
private final ActorRef messageReceiver;
private final ActorRef acknowledgementReceiver;
private final Set<AcknowledgementLabel> pendingAcknowledgements;
private boolean responseReceived = false;
private ActorRef messageSender;

private LiveResponseAndAcknowledgementForwarder(final Signal<?> liveSignal,
final ActorRef messageReceiver,
final ActorRef acknowledgementReceiver) {

logger = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
pendingAcknowledgements = new HashSet<>();
this.messageReceiver = messageReceiver;
this.acknowledgementReceiver = acknowledgementReceiver;
getContext().setReceiveTimeout(liveSignal.getDittoHeaders().getTimeout().orElse(DEFAULT_TIMEOUT));
for (final var ackRequest : liveSignal.getDittoHeaders().getAcknowledgementRequests()) {
pendingAcknowledgements.add(ackRequest.getLabel());
}
}

/**
* Create Props object for this actor.
*
* @param liveSignal The live signal whose acknowledgements and responses this actor listens for.
* @param messageReceiver Receiver of the message to publish.
* @param acknowledgementReceiver Receiver of acknowledgements.
* @return The Props object.
*/
public static Props props(final Signal<?> liveSignal,
final ActorRef messageReceiver,
final ActorRef acknowledgementReceiver) {
return Props.create(LiveResponseAndAcknowledgementForwarder.class, liveSignal, messageReceiver,
acknowledgementReceiver);
}

@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.match(ThingQueryCommandResponse.class, this::onQueryCommandResponse)
.match(Acknowledgement.class, this::onAcknowledgement)
.match(Acknowledgements.class, this::onAcknowledgements)
.match(ReceiveTimeout.class, this::stopSelf)
.matchAny(this::sendMessage)
.build();
}

private void sendMessage(final Object message) {
logger.debug("Got message to send <{}>", message);
messageSender = getSender();
messageReceiver.tell(message, getSelf());
}

private void onAcknowledgement(final Acknowledgement ack) {
logger.debug("Got <{}>", ack);
pendingAcknowledgements.remove(ack.getLabel());
acknowledgementReceiver.forward(ack, getContext());
}

private void onAcknowledgements(final Acknowledgements acks) {
logger.debug("Got <{}>", acks);
for (final var ack : acks) {
pendingAcknowledgements.remove(ack.getLabel());
}
acknowledgementReceiver.forward(acks, getContext());
}

private void onQueryCommandResponse(final ThingQueryCommandResponse<?> response) {
logger.debug("Got <{}>", response);
responseReceived = true;
if (messageSender != null) {
messageSender.forward(response, getContext());
checkCompletion();
} else {
logger.error("Got response without receiving command");
stopSelf("Message sender not found");
}
}

private void checkCompletion() {
if (responseReceived && pendingAcknowledgements.isEmpty()) {
stopSelf("All responses and acknowledgements delivered");
}
}

private void stopSelf(final Object trigger) {
logger.debug("Stopping due to <{}>", trigger);
getContext().stop(getSelf());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
Expand All @@ -29,6 +30,7 @@
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.base.model.signals.commands.exceptions.GatewayInternalErrorException;
import org.eclipse.ditto.concierge.service.actors.LiveResponseAndAcknowledgementForwarder;
import org.eclipse.ditto.internal.utils.cache.Cache;
import org.eclipse.ditto.internal.utils.cache.entry.Entry;
import org.eclipse.ditto.internal.utils.cacheloaders.EnforcementCacheKey;
Expand Down Expand Up @@ -59,6 +61,7 @@
import org.eclipse.ditto.things.model.signals.events.ThingEvent;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.japi.Pair;

/**
Expand All @@ -76,17 +79,20 @@ public final class LiveSignalEnforcement extends AbstractEnforcementWithAsk<Sign

private final EnforcerRetriever<Enforcer> enforcerRetriever;
private final LiveSignalPub liveSignalPub;
private final ActorSystem actorSystem;

private LiveSignalEnforcement(final Contextual<SignalWithEntityId<?>> context,
final Cache<EnforcementCacheKey, Entry<EnforcementCacheKey>> thingIdCache,
final Cache<EnforcementCacheKey, Entry<Enforcer>> policyEnforcerCache,
final LiveSignalPub liveSignalPub) {
final LiveSignalPub liveSignalPub,
final ActorSystem actorSystem) {

super(context, ThingQueryCommandResponse.class);
requireNonNull(thingIdCache);
requireNonNull(policyEnforcerCache);
enforcerRetriever = PolicyEnforcerRetrieverFactory.create(thingIdCache, policyEnforcerCache);
this.liveSignalPub = liveSignalPub;
this.actorSystem = actorSystem;
}

@Override
Expand Down Expand Up @@ -116,6 +122,7 @@ public static final class Provider implements EnforcementProvider<SignalWithEnti
private final Cache<EnforcementCacheKey, Entry<EnforcementCacheKey>> thingIdCache;
private final Cache<EnforcementCacheKey, Entry<Enforcer>> policyEnforcerCache;
private final LiveSignalPub liveSignalPub;
private final ActorSystem actorSystem;

/**
* Constructor.
Expand All @@ -126,11 +133,13 @@ public static final class Provider implements EnforcementProvider<SignalWithEnti
*/
public Provider(final Cache<EnforcementCacheKey, Entry<EnforcementCacheKey>> thingIdCache,
final Cache<EnforcementCacheKey, Entry<Enforcer>> policyEnforcerCache,
final LiveSignalPub liveSignalPub) {
final LiveSignalPub liveSignalPub,
final ActorSystem actorSystem) {

this.thingIdCache = requireNonNull(thingIdCache);
this.policyEnforcerCache = requireNonNull(policyEnforcerCache);
this.liveSignalPub = liveSignalPub;
this.actorSystem = actorSystem;
}

@Override
Expand All @@ -147,7 +156,7 @@ public boolean isApplicable(final SignalWithEntityId<?> signal) {
@Override
public AbstractEnforcement<SignalWithEntityId<?>> createEnforcement(
final Contextual<SignalWithEntityId<?>> context) {
return new LiveSignalEnforcement(context, thingIdCache, policyEnforcerCache, liveSignalPub);
return new LiveSignalEnforcement(context, thingIdCache, policyEnforcerCache, liveSignalPub, actorSystem);
}

}
Expand Down Expand Up @@ -277,11 +286,18 @@ private CompletionStage<Contextual<WithDittoHeaders>> enforceLiveSignal(final St
final ThingCommand<?> withReadSubjects =
addEffectedReadSubjectsToThingLiveSignal((ThingCommand<?>) liveSignal, enforcer);
log(withReadSubjects).info("Live Command was authorized: <{}>", withReadSubjects);
if (liveSignal instanceof ThingQueryCommand && liveSignal.getDittoHeaders().isResponseRequired()) {
final boolean isThingQueryCommandRequiringResponse =
liveSignal instanceof ThingQueryCommand && liveSignal.getDittoHeaders().isResponseRequired();
final boolean hasCustomAckRequests = hasCustomAcknowledgementRequests(withReadSubjects);
if (isThingQueryCommandRequiringResponse && !hasCustomAckRequests) {
return addToResponseReceiver(withReadSubjects).thenApply(newSignal ->
askAndBuildJsonView((ThingCommand<?>) newSignal, THING_COMMAND_ACK_EXTRACTOR,
liveSignalPub.command(), enforcer)
);
} else if (isThingQueryCommandRequiringResponse) {
return addToResponseReceiver(withReadSubjects).thenApply(newSignal ->
askAndBuildJsonViewWithAckForwarding((ThingCommand<?>) newSignal,
THING_COMMAND_ACK_EXTRACTOR, liveSignalPub.command(), enforcer));
} else {
return publishLiveSignal(withReadSubjects, THING_COMMAND_ACK_EXTRACTOR, liveSignalPub.command());
}
Expand All @@ -294,6 +310,13 @@ private CompletionStage<Contextual<WithDittoHeaders>> enforceLiveSignal(final St
}
}

private static boolean hasCustomAcknowledgementRequests(final Signal<?> signal) {
return !signal.getDittoHeaders()
.getAcknowledgementRequests()
.stream()
.allMatch(request -> DittoAcknowledgementLabel.LIVE_RESPONSE.equals(request.getLabel()));
}

/**
* Extend a signal by subject headers given with granted and revoked READ access.
* The subjects are provided by the given enforcer for the resource type {@link org.eclipse.ditto.things.model.ThingConstants#ENTITY_TYPE}.
Expand Down Expand Up @@ -396,6 +419,21 @@ private <T extends Signal<?>, S extends T> CompletionStage<Contextual<WithDittoH
);
}

private <T extends Signal<?>> Contextual<WithDittoHeaders> askAndBuildJsonViewWithAckForwarding(
final T signal,
final AckExtractor<T> ackExtractor,
final DistributedPub<T> pub,
final Enforcer enforcer) {

final var publish = pub.wrapForPublicationWithAcks(signal, ackExtractor);
final var castSignal = (SignalWithEntityId<?>) signal;
final var props = LiveResponseAndAcknowledgementForwarder.props(signal, pub.getPublisher(), sender());
final var liveResponseForwarder = actorSystem.actorOf(props);
return withMessageToReceiverViaAskFuture(signal, sender(), () ->
askAndBuildJsonView(liveResponseForwarder, castSignal, publish, enforcer, context.getScheduler(),
context.getExecutor()));
}

private <T extends Signal<?>> Contextual<WithDittoHeaders> askAndBuildJsonView(
final T signal,
final AckExtractor<T> ackExtractor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public ActorRef startEnforcerActor(final ActorContext context, final ConciergeCo
policiesShardRegionProxy, thingIdCache, projectedEnforcerCache, preEnforcer));
enforcementProviders.add(new PolicyCommandEnforcement.Provider(policiesShardRegionProxy, policyEnforcerCache));
enforcementProviders.add(
new LiveSignalEnforcement.Provider(thingIdCache, projectedEnforcerCache, liveSignalPub));
new LiveSignalEnforcement.Provider(thingIdCache, projectedEnforcerCache, liveSignalPub, actorSystem));

final ActorRef conciergeEnforcerRouter =
ConciergeEnforcerClusterRouterFactory.createConciergeEnforcerClusterRouter(context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public ActorRef build() {
enforcementProviders.add(new PolicyCommandEnforcement.Provider(policiesShardRegion, policyEnforcerCache));
enforcementProviders.add(
new LiveSignalEnforcement.Provider(thingIdCache, projectedEnforcerCache,
new DummyLiveSignalPub(puSubMediatorRef)));
new DummyLiveSignalPub(puSubMediatorRef), system));
final Props props = EnforcerActor.props(testActorRef, enforcementProviders, conciergeForwarder, preEnforcer,
null, null);

Expand Down

0 comments on commit 150da16

Please sign in to comment.