Skip to content

Commit

Permalink
Merge proxy actors together to make more clear what it actually does
Browse files Browse the repository at this point in the history
* This should not change any behaviour, but just should reduce code

Signed-off-by: Yannic Klem <Yannic.Klem@bosch.io>
  • Loading branch information
Yannic92 committed Jun 22, 2022
1 parent b88e7d2 commit 332b483
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 222 deletions.

This file was deleted.

This file was deleted.

117 changes: 111 additions & 6 deletions gateway/service/src/main/java/org/eclipse/ditto/gateway/service/proxy/actors/ProxyActor.java
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,56 @@
*/
package org.eclipse.ditto.gateway.service.proxy.actors;

import org.eclipse.ditto.base.api.devops.signals.commands.DevOpsCommand;
import org.eclipse.ditto.base.api.devops.signals.commands.RetrieveStatistics;
import org.eclipse.ditto.base.api.devops.signals.commands.RetrieveStatisticsDetails;
import org.eclipse.ditto.base.model.exceptions.DittoJsonException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.pubsub.StreamingType;
import org.eclipse.ditto.json.JsonRuntimeException;
import org.eclipse.ditto.thingsearch.model.signals.commands.query.QueryThings;

import akka.actor.AbstractActor;
import akka.actor.ActorKilledException;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.Status;
import akka.actor.SupervisorStrategy;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.japi.pf.DeciderBuilder;
import akka.japi.pf.ReceiveBuilder;

/**
* Actor which delegates {@link Command}s to the appropriate receivers in the cluster.
* Abstract base implementation for a command proxy.
*/
public final class ProxyActor extends AbstractThingProxyActor {
public final class ProxyActor extends AbstractActor {

@SuppressWarnings("unused")
private ProxyActor(final ActorRef pubSubMediator,
/**
* The name of this Actor in the ActorSystem.
*/
public static final String ACTOR_NAME = "proxy";

private final ActorSelection devOpsCommandsActor;
private final ActorRef edgeCommandForwarder;
private final ActorRef pubSubMediator;

private final DittoDiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);

private final ActorRef statisticsActor;

ProxyActor(final ActorRef pubSubMediator,
final ActorSelection devOpsCommandsActor,
final ActorRef edgeCommandForwarder) {

super(pubSubMediator, devOpsCommandsActor, edgeCommandForwarder);
this.pubSubMediator = pubSubMediator;
this.devOpsCommandsActor = devOpsCommandsActor;
this.edgeCommandForwarder = edgeCommandForwarder;
statisticsActor = getContext().actorOf(StatisticsActor.props(pubSubMediator), StatisticsActor.ACTOR_NAME);
}

/**
Expand All @@ -46,4 +79,76 @@ public static Props props(final ActorRef pubSubMediator,
return Props.create(ProxyActor.class, pubSubMediator, devOpsCommandsActor, edgeCommandForwarder);
}

static boolean isLiveCommandOrEvent(final Signal<?> signal) {
return StreamingType.isLiveSignal(signal);
}

@Override
public SupervisorStrategy supervisorStrategy() {
return new OneForOneStrategy(true, DeciderBuilder
.match(NullPointerException.class, e -> {
log.error(e, "NullPointer in child actor - restarting it...", e.getMessage());
log.info("Restarting child...");
return SupervisorStrategy.restart();
})
.match(ActorKilledException.class, e -> {
log.error(e.getCause(), "ActorKilledException in child actor - stopping it...");
return SupervisorStrategy.stop();
})
.matchAny(e -> (SupervisorStrategy.Directive) SupervisorStrategy.escalate())
.build());
}

@Override
public Receive createReceive() {
final ReceiveBuilder receiveBuilder = ReceiveBuilder.create();

// common commands
receiveBuilder
.match(RetrieveStatistics.class, retrieveStatistics -> {
log.debug("Got 'RetrieveStatistics' message");
statisticsActor.forward(retrieveStatistics, getContext());
})
.match(RetrieveStatisticsDetails.class, retrieveStatisticsDetails -> {
log.debug("Got 'RetrieveStatisticsDetails' message");
statisticsActor.forward(retrieveStatisticsDetails, getContext());
}).match(DevOpsCommand.class, command -> {
log.withCorrelationId(command)
.debug("Got 'DevOpsCommand' message <{}>, forwarding to local devOpsCommandsActor",
command.getType());
devOpsCommandsActor.forward(command, getContext());
})
.match(QueryThings.class, qt -> {
final ActorRef responseActor = getContext().actorOf(
QueryThingsPerRequestActor.props(qt, edgeCommandForwarder, getSender(), pubSubMediator)
);
edgeCommandForwarder.tell(qt, responseActor);
})

/* send all other Commands to command forwarder */
.match(Command.class, this::forwardToCommandForwarder)

/* Live Signals */
.match(Signal.class, ProxyActor::isLiveCommandOrEvent, this::forwardToCommandForwarder)
.match(Status.Failure.class, failure -> {
Throwable cause = failure.cause();
if (cause instanceof JsonRuntimeException) {
cause = new DittoJsonException((RuntimeException) cause);
}
getSender().tell(cause, getSelf());
})
.match(DittoRuntimeException.class, cre -> getSender().tell(cre, getSelf()))
.match(DistributedPubSubMediator.SubscribeAck.class, subscribeAck ->
log.debug("Successfully subscribed to distributed pub/sub on topic '{}'",
subscribeAck.subscribe().topic())
)
.matchAny(m -> log.warning("Got unknown message, expected a 'Command': {}", m));

return receiveBuilder.build();
}

private void forwardToCommandForwarder(final Signal<?> signal) {
edgeCommandForwarder.forward(signal, getContext());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.eclipse.ditto.gateway.service.health.DittoStatusAndHealthProviderFactory;
import org.eclipse.ditto.gateway.service.health.GatewayHttpReadinessCheck;
import org.eclipse.ditto.gateway.service.health.StatusAndHealthProvider;
import org.eclipse.ditto.gateway.service.proxy.actors.AbstractProxyActor;
import org.eclipse.ditto.gateway.service.proxy.actors.ProxyActor;
import org.eclipse.ditto.gateway.service.security.authentication.jwt.JwtAuthenticationFactory;
import org.eclipse.ditto.gateway.service.security.authentication.jwt.JwtAuthenticationResultProvider;
Expand Down Expand Up @@ -312,7 +311,7 @@ private ActorRef startProxyActor(final ActorRefFactory actorSystem, final ActorR
final ActorSelection devOpsCommandsActor =
actorSystem.actorSelection(DevOpsRoute.DEVOPS_COMMANDS_ACTOR_SELECTION);

return startChildActor(AbstractProxyActor.ACTOR_NAME,
return startChildActor(ProxyActor.ACTOR_NAME,
ProxyActor.props(pubSubMediator, devOpsCommandsActor, edgeCommandForwarder));

}
Expand Down

0 comments on commit 332b483

Please sign in to comment.