/
ThingSupervisorActor.java
executable file
·275 lines (230 loc) · 10 KB
/
ThingSupervisorActor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
/*
* Copyright (c) 2017 Bosch Software Innovations GmbH.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* which accompanies this distribution, and is available at
* https://www.eclipse.org/org/documents/epl-2.0/index.php
*
* Contributors:
* Bosch Software Innovations GmbH - initial contribution
*/
package org.eclipse.ditto.services.things.persistence.actors;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.annotation.concurrent.NotThreadSafe;
import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.signals.commands.things.exceptions.ThingUnavailableException;
import akka.actor.AbstractActor;
import akka.actor.ActorKilledException;
import akka.actor.ActorRef;
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.actor.Terminated;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.Creator;
import akka.japi.pf.DeciderBuilder;
import scala.concurrent.duration.FiniteDuration;
/**
* Supervisor for {@link ThingPersistenceActor} which means it will create, start and watch it as child actor.
* <p>
* If the child terminates, it will wait for the calculated exponential back off time and restart it afterwards.
* The child has to send {@link ManualReset} after it started successfully.
* Between the termination of the child and the restart, this actor answers to all requests with a
* {@link ThingUnavailableException} as fail fast strategy.
* </p>
*/
public final class ThingSupervisorActor extends AbstractActor {
private final DiagnosticLoggingAdapter log = LogUtil.obtain(this);
private final String thingId;
private final Props persistenceActorProps;
private final Duration minBackOff;
private final Duration maxBackOff;
private final double randomFactor;
private final SupervisorStrategy supervisorStrategy;
private ActorRef child;
private long restartCount;
private ThingSupervisorActor(final Duration minBackOff,
final Duration maxBackOff,
final double randomFactor,
final Function<String, Props> thingPersistenceActorPropsFactory,
final SupervisorStrategy supervisorStrategy) {
try {
thingId = URLDecoder.decode(getSelf().path().name(), StandardCharsets.UTF_8.name());
} catch (final UnsupportedEncodingException e) {
throw new IllegalStateException("Unsupported encoding!", e);
}
persistenceActorProps = thingPersistenceActorPropsFactory.apply(thingId);
this.minBackOff = minBackOff;
this.maxBackOff = maxBackOff;
this.randomFactor = randomFactor;
this.supervisorStrategy = supervisorStrategy;
child = null;
}
/**
* Props for creating a {@code ThingSupervisorActor}.
* <p>
* Exceptions in the child are handled with a supervision strategy that restarts the child on
* {@link NullPointerException}'s, stops it for {@link ActorKilledException}'s and escalates all others.
* </p>
*
* @param minBackOff minimum (initial) duration until the child actor will started again, if it is terminated.
* @param maxBackOff the exponential back-off is capped to this duration.
* @param randomFactor after calculation of the exponential back-off an additional random delay based on this factor
* is added, e.g. `0.2` adds up to `20%` delay. In order to skip this additional delay pass in `0`.
* @return the {@link Props} to create this actor.
*/
public static Props props(final Duration minBackOff,
final Duration maxBackOff,
final double randomFactor,
final Function<String, Props> thingPersistenceActorPropsFactory) {
return Props.create(ThingSupervisorActor.class, new Creator<ThingSupervisorActor>() {
private static final long serialVersionUID = 1L;
@Override
public ThingSupervisorActor create() {
final OneForOneStrategy oneForOneStrategy = new OneForOneStrategy(true, DeciderBuilder
.match(NullPointerException.class, e -> SupervisorStrategy.restart())
.match(ActorKilledException.class, e -> SupervisorStrategy.stop())
.matchAny(e -> SupervisorStrategy.escalate())
.build());
return new ThingSupervisorActor(minBackOff, maxBackOff, randomFactor, thingPersistenceActorPropsFactory,
oneForOneStrategy);
}
});
}
private Collection<ReceiveStrategy<?>> initReceiveStrategies() {
final Collection<ReceiveStrategy<?>> result = new ArrayList<>();
result.add(new StartChildStrategy());
result.add(new ChildTerminatedStrategy());
result.add(new ManualResetStrategy());
return result;
}
@Override
public SupervisorStrategy supervisorStrategy() {
return supervisorStrategy;
}
@Override
public void preStart() throws Exception {
super.preStart();
startChild();
}
@Override
public Receive createReceive() {
final Collection<ReceiveStrategy<?>> receiveStrategies = initReceiveStrategies();
final StrategyAwareReceiveBuilder strategyAwareReceiveBuilder = new StrategyAwareReceiveBuilder();
receiveStrategies.forEach(strategyAwareReceiveBuilder::match);
strategyAwareReceiveBuilder.matchAny(new MatchAnyStrategy());
return strategyAwareReceiveBuilder.build();
}
private void startChild() {
if (null == child) {
log.debug("Starting persistence actor for Thing with ID <{}>.", thingId);
final ActorRef childRef = getContext().actorOf(persistenceActorProps, "pa");
child = getContext().watch(childRef);
}
}
/**
* Message that should be sent to this actor to indicate a working child and reset the exponential back off
* mechanism.
*/
static final class ManualReset {
}
/**
* This strategy handles the Termination of the child actor by restarting it after an exponential back off.
*/
@NotThreadSafe
private final class ChildTerminatedStrategy extends AbstractReceiveStrategy<Terminated> {
ChildTerminatedStrategy() {
super(Terminated.class, log);
}
@Override
public void doApply(final Terminated message) {
log.info("Persistence actor for Thing with ID <{}> terminated abnormally.", thingId);
child = null;
final Duration restartDelay = calculateRestartDelay();
getContext().system()
.scheduler()
.scheduleOnce(new FiniteDuration(restartDelay.toNanos(), TimeUnit.NANOSECONDS), getSelf(),
new StartChild(), getContext().dispatcher(), null);
restartCount += 1;
}
private Duration calculateRestartDelay() {
final double rnd = 1.0 + ThreadLocalRandom.current().nextDouble() * randomFactor;
if (restartCount >= 30) // Duration overflow protection (> 100 years)
{
return maxBackOff;
} else {
final double backOff = minBackOff.toNanos() * Math.pow(2, restartCount) * rnd;
return Duration.ofNanos(Math.min(maxBackOff.toNanos(), (long) backOff));
}
}
}
/**
* Message that is sent to the actor by itself to restart the child.
*/
private static final class StartChild {
}
/**
* This strategy handles a {@link StartChild} message by starting the child actor immediately.
*/
@NotThreadSafe
private final class StartChildStrategy extends AbstractReceiveStrategy<StartChild> {
StartChildStrategy() {
super(StartChild.class, log);
}
@Override
public void doApply(final StartChild message) {
startChild();
}
}
/**
* This strategy handles a {@link ManualReset} message by resetting the exponential back off restart count.
*/
@NotThreadSafe
private final class ManualResetStrategy extends AbstractReceiveStrategy<ManualReset> {
ManualResetStrategy() {
super(ManualReset.class, log);
}
@Override
public void doApply(final ManualReset message) {
restartCount = 0;
}
}
/**
* This strategy handles all other messages by forwarding all messages to the child if it is active or by replying
* immediately with a {@link ThingUnavailableException} if the child has terminated (fail fast).
*/
@NotThreadSafe
private final class MatchAnyStrategy extends AbstractReceiveStrategy<Object> {
MatchAnyStrategy() {
super(Object.class, log);
}
@Override
public void doApply(final Object message) {
if (null != child) {
if (child.equals(getSender())) {
log.warning("Received unhandled message from child actor '{}': {}", thingId, message);
unhandled(message);
} else {
child.forward(message, getContext());
}
} else {
log.warning("Received message during downtime of child actor for Thing with ID <{}>.", thingId);
final ThingUnavailableException.Builder builder = ThingUnavailableException.newBuilder(thingId);
if (message instanceof WithDittoHeaders) {
builder.dittoHeaders(((WithDittoHeaders) message).getDittoHeaders());
}
getSender().tell(builder.build(), getSelf());
}
}
}
}