-
Notifications
You must be signed in to change notification settings - Fork 214
/
SubSupervisor.java
165 lines (147 loc) · 6.12 KB
/
SubSupervisor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
/*
* Copyright (c) 2019 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.internal.utils.pubsub.actors;
import javax.annotation.Nullable;
import org.eclipse.ditto.internal.utils.pubsub.extractors.AckExtractor;
import org.eclipse.ditto.internal.utils.pubsub.extractors.PubSubTopicExtractor;
import org.eclipse.ditto.internal.utils.pubsub.DistributedAcks;
import org.eclipse.ditto.internal.utils.pubsub.api.Request;
import org.eclipse.ditto.internal.utils.pubsub.ddata.compressed.CompressedDData;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.Props;
import org.apache.pekko.actor.Terminated;
import org.apache.pekko.japi.pf.ReceiveBuilder;
/**
* Supervisor of actors dealing with subscriptions.
* <pre>
* {@code
* SubSupervisor
* +
* |
* |supervises one-for-many
* +---------------------------------+
* | |
* | |
* | |
* | |
* | |
* v v
* SubUpdater +-----------------> Subscriber
* + update (forwards
* | local published
* | subscriptions signals)
* |
* |
* |
* |
* |
* |
* |
* |write with highest requested consistency
* |
* +----------------->DDataReplicator
* }
* </pre>
*
* @param <T> type of messages subscribed for.
*/
public final class SubSupervisor<T> extends AbstractPubSubSupervisor {
private final Class<T> messageClass;
private final PubSubTopicExtractor<T> topicExtractor;
private final CompressedDData topicsDData;
private final AckExtractor<T> ackExtractor;
private final DistributedAcks distributedAcks;
@Nullable private ActorRef subscriber;
@Nullable private ActorRef updater;
@SuppressWarnings("unused")
private SubSupervisor(final Class<T> messageClass,
final PubSubTopicExtractor<T> topicExtractor,
final CompressedDData topicsDData,
final AckExtractor<T> ackExtractor,
final DistributedAcks distributedAcks) {
this.messageClass = messageClass;
this.topicExtractor = topicExtractor;
this.topicsDData = topicsDData;
this.ackExtractor = ackExtractor;
this.distributedAcks = distributedAcks;
}
/**
* Create Props object for this actor.
*
* @param <T> type of messages.
* @param messageClass class of messages.
* @param topicExtractor extractor of topics from messages.
* @param topicsDData access to the distributed data of topics.
* @param ackExtractor extractor of acknowledgement-related information from a message.
* @param distributedAcks access to the distributed data of declared acknowledgement labels.
* @return the Props object.
*/
public static <T> Props props(final Class<T> messageClass,
final PubSubTopicExtractor<T> topicExtractor,
final CompressedDData topicsDData,
final AckExtractor<T> ackExtractor,
final DistributedAcks distributedAcks) {
return Props.create(SubSupervisor.class, messageClass, topicExtractor, topicsDData, ackExtractor,
distributedAcks);
}
@Override
protected Receive createPubSubBehavior() {
return ReceiveBuilder.create()
.match(Request.class, this::isUpdaterAvailable, this::request)
.match(Request.class, this::updaterUnavailable)
.match(Terminated.class, this::childTerminated)
.matchEquals(ActorEvent.DEBUG_KILL_CHILDREN, this::debugKillChildren)
.build();
}
@Override
protected void onChildFailure(final ActorRef failingChild) {
if (updater != null && !failingChild.equals(updater)) {
// Updater survived. Ask it to inform known subscribers of local data loss.
updater.tell(ActorEvent.PUBSUB_TERMINATED, getSelf());
}
updater = null;
subscriber = null;
log.error("All local subscriptions lost.");
}
@Override
protected void startChildren() {
subscriber = startChild(Subscriber.props(messageClass, topicExtractor, ackExtractor, distributedAcks),
Subscriber.ACTOR_NAME_PREFIX);
updater = startChild(SubUpdater.props(config, subscriber, topicsDData), SubUpdater.ACTOR_NAME_PREFIX);
}
private void debugKillChildren(final ActorEvent debugKillChildren) {
log.warning("Killing children on request. DO NOT do this in production!");
getContext().getChildren().forEach(getContext()::stop);
}
private void childTerminated(final Terminated terminated) {
if (terminated.getActor().equals(subscriber) || terminated.getActor().equals(updater)) {
log.error("Child actor terminated. Removing subscriber from DData: <{}>", terminated.getActor());
topicsDData.getWriter().removeSubscriber(terminated.getActor(), ClusterMemberRemovedAware.writeLocal());
getContext().getChildren().forEach(getContext()::stop);
subscriber = null;
updater = null;
scheduleRestartChildren();
}
}
private boolean isUpdaterAvailable() {
return updater != null;
}
@SuppressWarnings("ConstantConditions")
private void request(final Request request) {
updater.tell(request, getSender());
}
private void updaterUnavailable(final Request request) {
log.error("SubUpdater unavailable. Dropping <{}>", request);
getSender().tell(new IllegalStateException("AcksUpdater not available"), getSelf());
}
}