-
Notifications
You must be signed in to change notification settings - Fork 215
/
ProxyActor.java
154 lines (135 loc) · 6.73 KB
/
ProxyActor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
/*
* Copyright (c) 2017 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.gateway.service.proxy.actors;
import org.eclipse.ditto.base.api.devops.signals.commands.DevOpsCommand;
import org.eclipse.ditto.base.api.devops.signals.commands.RetrieveStatistics;
import org.eclipse.ditto.base.api.devops.signals.commands.RetrieveStatisticsDetails;
import org.eclipse.ditto.base.model.exceptions.DittoJsonException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.pubsub.StreamingType;
import org.eclipse.ditto.json.JsonRuntimeException;
import org.eclipse.ditto.thingsearch.model.signals.commands.query.QueryThings;
import akka.actor.AbstractActor;
import akka.actor.ActorKilledException;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.Status;
import akka.actor.SupervisorStrategy;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.japi.pf.DeciderBuilder;
import akka.japi.pf.ReceiveBuilder;
/**
* Abstract base implementation for a command proxy.
*/
public final class ProxyActor extends AbstractActor {
/**
* The name of this Actor in the ActorSystem.
*/
public static final String ACTOR_NAME = "proxy";
private final ActorSelection devOpsCommandsActor;
private final ActorRef edgeCommandForwarder;
private final ActorRef pubSubMediator;
private final DittoDiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
private final ActorRef statisticsActor;
ProxyActor(final ActorRef pubSubMediator,
final ActorSelection devOpsCommandsActor,
final ActorRef edgeCommandForwarder) {
this.pubSubMediator = pubSubMediator;
this.devOpsCommandsActor = devOpsCommandsActor;
this.edgeCommandForwarder = edgeCommandForwarder;
statisticsActor = getContext().actorOf(StatisticsActor.props(pubSubMediator), StatisticsActor.ACTOR_NAME);
}
/**
* Creates Akka configuration object Props for this ProxyActor.
*
* @param pubSubMediator the Pub/Sub mediator to use for subscribing for events.
* @param devOpsCommandsActor the Actor ref to the local DevOpsCommandsActor.
* @param edgeCommandForwarder the Actor ref to the {@code EdgeCommandForwarderActor}.
* @return the Akka configuration Props object.
*/
public static Props props(final ActorRef pubSubMediator,
final ActorSelection devOpsCommandsActor,
final ActorRef edgeCommandForwarder) {
return Props.create(ProxyActor.class, pubSubMediator, devOpsCommandsActor, edgeCommandForwarder);
}
static boolean isLiveCommandOrEvent(final Signal<?> signal) {
return StreamingType.isLiveSignal(signal);
}
@Override
public SupervisorStrategy supervisorStrategy() {
return new OneForOneStrategy(true, DeciderBuilder
.match(NullPointerException.class, e -> {
log.error(e, "NullPointer in child actor - restarting it...", e.getMessage());
log.info("Restarting child...");
return SupervisorStrategy.restart();
})
.match(ActorKilledException.class, e -> {
log.error(e.getCause(), "ActorKilledException in child actor - stopping it...");
return SupervisorStrategy.stop();
})
.matchAny(e -> (SupervisorStrategy.Directive) SupervisorStrategy.escalate())
.build());
}
@Override
public Receive createReceive() {
final ReceiveBuilder receiveBuilder = ReceiveBuilder.create();
// common commands
receiveBuilder
.match(RetrieveStatistics.class, retrieveStatistics -> {
log.debug("Got 'RetrieveStatistics' message");
statisticsActor.forward(retrieveStatistics, getContext());
})
.match(RetrieveStatisticsDetails.class, retrieveStatisticsDetails -> {
log.debug("Got 'RetrieveStatisticsDetails' message");
statisticsActor.forward(retrieveStatisticsDetails, getContext());
}).match(DevOpsCommand.class, command -> {
log.withCorrelationId(command)
.debug("Got 'DevOpsCommand' message <{}>, forwarding to local devOpsCommandsActor",
command.getType());
devOpsCommandsActor.forward(command, getContext());
})
.match(QueryThings.class, qt -> {
final ActorRef responseActor = getContext().actorOf(
QueryThingsPerRequestActor.props(qt, edgeCommandForwarder, getSender(), pubSubMediator)
);
edgeCommandForwarder.tell(qt, responseActor);
})
/* send all other Commands to command forwarder */
.match(Command.class, this::forwardToCommandForwarder)
/* Live Signals */
.match(Signal.class, ProxyActor::isLiveCommandOrEvent, this::forwardToCommandForwarder)
.match(Status.Failure.class, failure -> {
Throwable cause = failure.cause();
if (cause instanceof JsonRuntimeException) {
cause = new DittoJsonException((RuntimeException) cause);
}
getSender().tell(cause, getSelf());
})
.match(DittoRuntimeException.class, cre -> getSender().tell(cre, getSelf()))
.match(DistributedPubSubMediator.SubscribeAck.class, subscribeAck ->
log.debug("Successfully subscribed to distributed pub/sub on topic '{}'",
subscribeAck.subscribe().topic())
)
.matchAny(m -> log.warning("Got unknown message, expected a 'Command': {}", m));
return receiveBuilder.build();
}
private void forwardToCommandForwarder(final Signal<?> signal) {
edgeCommandForwarder.forward(signal, getContext());
}
}