-
Notifications
You must be signed in to change notification settings - Fork 214
/
ClusterSingletonSupervisorActor.java
154 lines (136 loc) · 6.95 KB
/
ClusterSingletonSupervisorActor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
/*
* Copyright (c) 2017 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.internal.utils.cluster;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.ConnectException;
import java.util.NoSuchElementException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorKilledException;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.InvalidActorNameException;
import org.apache.pekko.actor.OneForOneStrategy;
import org.apache.pekko.actor.Props;
import org.apache.pekko.actor.SupervisorStrategy;
import org.apache.pekko.event.DiagnosticLoggingAdapter;
import org.apache.pekko.japi.pf.DeciderBuilder;
import org.apache.pekko.pattern.AskTimeoutException;
/**
* Supervisor actor for cluster singletons which accepts a {@link SupervisorStrategy} (e.g. the one from the root
* actor).
*/
final class ClusterSingletonSupervisorActor extends AbstractActor {
private final DiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
private final SupervisorStrategy supervisorStrategy;
private final ActorRef child;
@SuppressWarnings("unused")
ClusterSingletonSupervisorActor(final Props childProps) {
this.supervisorStrategy = buildDefaultSupervisorStrategy();
this.child = getContext().actorOf(childProps, "supervised-child");
}
@SuppressWarnings("unused")
ClusterSingletonSupervisorActor(final Props childProps, final SupervisorStrategy supervisorStrategy) {
this.supervisorStrategy = supervisorStrategy;
this.child = getContext().actorOf(childProps, "supervised-child");
}
private OneForOneStrategy buildDefaultSupervisorStrategy() {
return new OneForOneStrategy(true, DeciderBuilder
.match(NullPointerException.class, e -> {
log.error(e, "NullPointer in singleton actor: {}", e.getMessage());
return restartChild();
}).match(IllegalArgumentException.class, e -> {
log.warning("Illegal Argument in singleton actor: {}", e.getMessage());
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);
log.warning("Illegal Argument in singleton actor: {}", sw.toString());
return SupervisorStrategy.resume();
}).match(IllegalStateException.class, e -> {
log.warning("Illegal State in singleton actor: {}", e.getMessage());
return SupervisorStrategy.resume();
}).match(IndexOutOfBoundsException.class, e -> {
log.warning("IndexOutOfBounds in singleton actor: {}", e.getMessage());
return SupervisorStrategy.resume();
}).match(NoSuchElementException.class, e -> {
log.warning("NoSuchElement in singleton actor: {}", e.getMessage());
return SupervisorStrategy.resume();
}).match(AskTimeoutException.class, e -> {
log.warning("AskTimeoutException in singleton actor: {}", e.getMessage());
return SupervisorStrategy.resume();
}).match(ConnectException.class, e -> {
log.warning("ConnectException in singleton actor: {}", e.getMessage());
return restartChild();
}).match(InvalidActorNameException.class, e -> {
log.warning("InvalidActorNameException in singleton actor: {}", e.getMessage());
return SupervisorStrategy.resume();
}).match(ActorKilledException.class, e -> {
log.error(e, "ActorKilledException in singleton actor: {}", e.message());
return restartChild();
}).match(DittoRuntimeException.class, e -> {
log.error(e,
"DittoRuntimeException '{}' should not be escalated to SupervisorActor. Simply resuming Actor.",
e.getErrorCode());
return SupervisorStrategy.resume();
}).match(UnsupportedOperationException.class, e -> {
log.error(e, "UnsupportedOperationException in singleton actor: {}",
e.getMessage());
terminateActorSystem();
return SupervisorStrategy.stop(); // only stopping as ActorSystem is terminated anyways
}).match(Throwable.class, e -> {
log.error(e, "Escalating above root actor!");
terminateActorSystem();
return SupervisorStrategy.stop(); // only stopping as ActorSystem is terminated anyways
}).matchAny(e -> {
log.error("Unknown message:'{}'! Escalating above root actor!", e);
terminateActorSystem();
return SupervisorStrategy.stop(); // only stopping as ActorSystem is terminated anyways
}).build());
}
private SupervisorStrategy.Directive restartChild() {
log.info("Restarting child ...");
return SupervisorStrategy.restart();
}
private void terminateActorSystem() {
log.error("Terminating ActorSystem as requested by ClusterSingletonSupervisorActor supervision strategy ...");
getContext().getSystem().terminate();
}
/**
* Creates Pekko configuration object Props for this Actor.
*
* @param childProps the Props of the child to create (the actual singleton).
* @return the Pekko configuration Props object.
*/
public static Props props(final Props childProps) {
return Props.create(ClusterSingletonSupervisorActor.class, childProps);
}
/**
* Creates Pekko configuration object Props for this Actor.
*
* @param childProps the Props of the child to create (the actual singleton).
* @param supervisorStrategy the {@link SupervisorStrategy} to apply.
* @return the Pekko configuration Props object.
*/
public static Props props(final Props childProps, final SupervisorStrategy supervisorStrategy) {
return Props.create(ClusterSingletonSupervisorActor.class, childProps, supervisorStrategy);
}
@Override
public SupervisorStrategy supervisorStrategy() {
return supervisorStrategy;
}
@Override
public Receive createReceive() {
return receiveBuilder().matchAny(msg -> child.forward(msg, getContext())).build();
}
}