/
AckUpdater.java
372 lines (326 loc) · 15.7 KB
/
AckUpdater.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
/*
* Copyright (c) 2020 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.internal.utils.pubsub.actors;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabelNotUniqueException;
import org.eclipse.ditto.base.model.acks.PubSubTerminatedException;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.metrics.instruments.gauge.Gauge;
import org.eclipse.ditto.internal.utils.pubsub.api.AcksDeclared;
import org.eclipse.ditto.internal.utils.pubsub.api.DeclareAcks;
import org.eclipse.ditto.internal.utils.pubsub.api.LocalAcksChanged;
import org.eclipse.ditto.internal.utils.pubsub.api.ReceiveLocalAcks;
import org.eclipse.ditto.internal.utils.pubsub.api.ReceiveRemoteAcks;
import org.eclipse.ditto.internal.utils.pubsub.api.RemoteAcksChanged;
import org.eclipse.ditto.internal.utils.pubsub.api.RemoveSubscriberAcks;
import org.eclipse.ditto.internal.utils.pubsub.config.PubSubConfig;
import org.eclipse.ditto.internal.utils.pubsub.ddata.DData;
import org.eclipse.ditto.internal.utils.pubsub.ddata.DDataWriter;
import org.eclipse.ditto.internal.utils.pubsub.ddata.ack.Grouped;
import org.eclipse.ditto.internal.utils.pubsub.ddata.ack.GroupedRelation;
import org.eclipse.ditto.internal.utils.pubsub.ddata.literal.LiteralUpdate;
import org.eclipse.ditto.json.JsonValue;
import akka.actor.AbstractActorWithTimers;
import akka.actor.ActorRef;
import akka.actor.Address;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.cluster.ddata.Key;
import akka.cluster.ddata.ORMultiMap;
import akka.cluster.ddata.Replicator;
import akka.event.LoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
/**
* Manage the local and remote ternary relation between actors, group names and declared acknowledgement labels.
* In case of conflict, prefer data from the cluster member with a smaller address.
*/
public final class AckUpdater extends AbstractActorWithTimers implements ClusterMemberRemovedAware {
/**
* Prefix of this actor's name.
*/
public static final String ACTOR_NAME_PREFIX = "ackUpdater";
protected final ThreadSafeDittoLoggingAdapter log = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter(this);
private final GroupedRelation<ActorRef, String> localAckLabels;
private final Address ownAddress;
private final DData<Address, String, LiteralUpdate> ackDData;
private final java.util.Set<ActorRef> ddataChangeRecipients;
private final java.util.Set<ActorRef> localChangeRecipients;
private final Gauge ackSizeMetric;
private final Map<Key<?>, Map<Address, List<Grouped<String>>>> cachedRemoteAcks;
private Map<String, Set<String>> remoteAckLabels = Map.of();
private Map<String, Set<String>> remoteGroups = Map.of();
private LiteralUpdate previousUpdate = LiteralUpdate.empty();
protected AckUpdater(final PubSubConfig config,
final Address ownAddress,
final DData<Address, String, LiteralUpdate> ackDData) {
this.ownAddress = ownAddress;
this.ackDData = ackDData;
localAckLabels = GroupedRelation.create();
ddataChangeRecipients = new HashSet<>();
localChangeRecipients = new HashSet<>();
ackSizeMetric = DittoMetrics.gauge("pubsub-ack-size-bytes");
cachedRemoteAcks = new HashMap<>();
subscribeForClusterMemberRemovedAware();
ackDData.getReader().receiveChanges(getSelf());
getTimers().startTimerAtFixedRate(Clock.TICK, Clock.TICK, config.getUpdateInterval());
}
/**
* Create Props object for this actor.
*
* @param config the pub-sub config.
* @param ownAddress address of the cluster member on which this actor lives.
* @param ackDData access to the distributed data of declared acknowledgement labels.
* @return the Props object.
*/
public static Props props(final PubSubConfig config, final Address ownAddress,
final DData<Address, String, LiteralUpdate> ackDData) {
return Props.create(AckUpdater.class, config, ownAddress, ackDData);
}
@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.match(DeclareAcks.class, this::declare)
.match(Terminated.class, this::terminated)
.match(RemoveSubscriberAcks.class, this::removeSubscriber)
.match(ReceiveRemoteAcks.class, this::onReceiveDDataChanges)
.match(ReceiveLocalAcks.class, this::onReceiveLocalChanges)
.matchEquals(Clock.TICK, this::tick)
.match(Replicator.Changed.class, this::onChanged)
.build()
.orElse(receiveClusterMemberRemoved())
.orElse(ReceiveBuilder.create().matchAny(this::logUnhandled).build());
}
@Override
public LoggingAdapter log() {
return log;
}
@Override
public DDataWriter<?, ?> getDDataWriter() {
return ackDData.getWriter();
}
private void declare(final DeclareAcks request) {
final ActorRef sender = getSender();
final ActorRef subscriber = request.getSubscriber();
final String group = request.getGroup().orElse(null);
final Set<String> ackLabels = request.getAckLabels();
if (isAllowedLocally(group, ackLabels) && isAllowedRemotely(group, ackLabels)) {
localAckLabels.put(subscriber, group, ackLabels);
getContext().watch(subscriber);
getSender().tell(AcksDeclared.of(request, sender), getSelf());
} else {
failSubscribe(sender);
}
}
private boolean isAllowedLocally(@Nullable final String group, final Set<String> ackLabels) {
if (group != null) {
final Optional<Set<String>> groupLabels = localAckLabels.getValuesOfGroup(group);
if (groupLabels.isPresent()) {
return groupLabels.get().equals(ackLabels);
}
}
return noDeclaredLabelMatches(ackLabels, localAckLabels::containsValue);
}
private boolean isAllowedRemotely(@Nullable final String group, final Set<String> ackLabels) {
return isAllowedRemotelyBy(group, ackLabels, remoteGroups,
conflictWithOtherGroups(group, remoteAckLabels));
}
private boolean isAllowedRemotelyBy(final Grouped<String> groupedLabels,
final Map<String, Set<String>> remoteGroups,
final Predicate<String> isTakenRemotely) {
return isAllowedRemotelyBy(groupedLabels.getGroup().orElse(null), groupedLabels.getValues(),
remoteGroups, isTakenRemotely);
}
private boolean isAllowedRemotelyBy(@Nullable final String group, final Set<String> ackLabels,
final Map<String, Set<String>> remoteGroups,
final Predicate<String> isTakenRemotely) {
final boolean noConflict = noDeclaredLabelMatches(ackLabels, isTakenRemotely);
if (noConflict && group != null) {
final Set<String> remoteGroup = remoteGroups.get(group);
if (remoteGroup != null) {
return remoteGroup.equals(ackLabels);
}
}
return noConflict;
}
private boolean noDeclaredLabelMatches(final Set<String> ackLabels, final Predicate<String> contains) {
return ackLabels.stream().noneMatch(contains);
}
private void tick(final Clock tick) {
writeLocalDData();
final var changed = LocalAcksChanged.of(localAckLabels.export());
localChangeRecipients.forEach(recipient -> recipient.tell(changed, getSelf()));
ackSizeMetric.set(changed.getSnapshot().estimateSize());
}
private void onChanged(final Replicator.Changed<?> event) {
final Map<Address, List<Grouped<String>>> mmap = Grouped.deserializeORMultiMap(
((ORMultiMap<Address, String>) event.dataValue()), JsonValue::asString);
cachedRemoteAcks.put(event.key(), mmap);
// calculate all sharded maps:
final Map<Address, List<Grouped<String>>> completeMmap = new HashMap<>();
cachedRemoteAcks.values()
.forEach(map -> map.forEach((address, groups) ->
completeMmap.merge(address, groups, (g1, g2) -> {
g1.addAll(g2);
return g1;
}))
);
final List<Grouped<String>> remoteGroupedAckLabels = getRemoteGroupedAckLabelsOrderByAddress(completeMmap);
remoteGroups = getRemoteGroups(remoteGroupedAckLabels);
remoteAckLabels = getRemoteAckLabels(remoteGroupedAckLabels);
for (final ActorRef localLoser : getLocalLosers(completeMmap)) {
doRemoveSubscriber(localLoser);
failSubscribe(localLoser);
}
final var ddataChanged = RemoteAcksChanged.of(completeMmap);
ddataChangeRecipients.forEach(recipient -> recipient.tell(ddataChanged, getSelf()));
}
private List<Grouped<String>> getRemoteGroupedAckLabelsOrderByAddress(
final Map<Address, List<Grouped<String>>> mmap) {
return mmap.entrySet()
.stream()
.filter(this::isNotOwnAddress)
.sorted(entryKeyAddressComparator())
.map(Map.Entry::getValue)
.flatMap(List::stream)
.collect(Collectors.toList());
}
private boolean isNotOwnAddress(final Map.Entry<Address, ?> entry) {
return !ownAddress.equals(entry.getKey());
}
private Map<String, Set<String>> getRemoteGroups(final List<Grouped<String>> remoteGroupedAckLabels) {
final Map<String, Set<String>> result = new HashMap<>();
remoteGroupedAckLabels.stream()
.flatMap(Grouped<String>::streamAsGroupedPair)
// do not set a group of ack labels if already set by a member of smaller address
.forEach(pair -> result.computeIfAbsent(pair.first(), group -> pair.second()));
return Collections.unmodifiableMap(result);
}
private Map<String, Set<String>> getRemoteAckLabels(final List<Grouped<String>> remoteGroupedAckLabels) {
final Map<String, Set<String>> remoteAckLabelsToGroup = new HashMap<>();
for (final Grouped<String> grouped : remoteGroupedAckLabels) {
final String groupKey = grouped.getGroup().orElse("");
for (final String label : grouped.getValues()) {
remoteAckLabelsToGroup.compute(label, (k, groups) -> {
final Set<String> nonNullSet = groups == null ? new HashSet<>() : groups;
nonNullSet.add(groupKey);
return nonNullSet;
});
}
}
return remoteAckLabelsToGroup;
}
private void logUnhandled(final Object message) {
log.warning("Unhandled: <{}>", message);
}
private void terminated(final Terminated terminated) {
final ActorRef terminatedActor = terminated.actor();
doRemoveSubscriber(terminatedActor);
ddataChangeRecipients.remove(terminatedActor);
if (localChangeRecipients.remove(terminatedActor)) {
reportLocalDataLoss();
}
}
private void reportLocalDataLoss() {
// local SubUpdater terminated. Request all known subscribers to terminate.
localAckLabels.entrySet()
.forEach(entry -> entry.getKey().tell(PubSubTerminatedException.getInstance(), getSelf()));
localAckLabels.clear();
}
private void removeSubscriber(final RemoveSubscriberAcks request) {
doRemoveSubscriber(request.getSubscriber());
}
private void doRemoveSubscriber(final ActorRef subscriber) {
localAckLabels.removeKey(subscriber);
getContext().unwatch(subscriber);
}
// NOT thread-safe
private void writeLocalDData() {
final LiteralUpdate diff = createAndSetDDataUpdate();
ackDData.getWriter()
.put(ownAddress, diff, (Replicator.WriteConsistency) Replicator.writeLocal())
.whenComplete((unused, error) -> {
if (error != null) {
log.error(error, "Failed to update local DData");
}
});
}
// NOT thread-safe
private LiteralUpdate createAndSetDDataUpdate() {
final Set<String> groupedAckLabels = localAckLabels.exportValuesByGroup()
.stream()
.map(Grouped::toJsonString)
.collect(Collectors.toSet());
final var nextUpdate = LiteralUpdate.withInserts(groupedAckLabels);
final LiteralUpdate diff = nextUpdate.diff(previousUpdate);
previousUpdate = nextUpdate;
return diff;
}
private void failSubscribe(final ActorRef sender) {
final Throwable error = AcknowledgementLabelNotUniqueException.getInstance();
sender.tell(error, getSelf());
}
private List<ActorRef> getLocalLosers(final Map<Address, List<Grouped<String>>> mmap) {
final Map<Address, List<Grouped<String>>> moreImportantEntries = mmap.entrySet()
.stream()
.filter(entry -> Address.addressOrdering().compare(entry.getKey(), ownAddress) < 0)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
final List<Grouped<String>> moreImportantGroupedAckLabels =
getRemoteGroupedAckLabelsOrderByAddress(moreImportantEntries);
final Map<String, Set<String>> moreImportantRemoteGroups = getRemoteGroups(moreImportantGroupedAckLabels);
final Map<String, Set<String>> moreImportantAckLabels = getRemoteAckLabels(moreImportantGroupedAckLabels);
return localAckLabels.entrySet()
.stream()
.filter(entry -> !isAllowedRemotelyBy(entry.getValue(), moreImportantRemoteGroups,
conflictWithOtherGroups(entry.getValue().getGroup().orElse(null),
moreImportantAckLabels)))
.map(Map.Entry::getKey)
.collect(Collectors.toList());
}
private void onReceiveDDataChanges(final ReceiveRemoteAcks request) {
ddataChangeRecipients.add(request.getReceiver());
getContext().watch(request.getReceiver());
}
private void onReceiveLocalChanges(final ReceiveLocalAcks request) {
localChangeRecipients.add(request.getReceiver());
getContext().watch(request.getReceiver());
}
private static <T> Comparator<Map.Entry<Address, T>> entryKeyAddressComparator() {
return (left, right) -> Address.addressOrdering().compare(left.getKey(), right.getKey());
}
private static Predicate<String> conflictWithOtherGroups(@Nullable final String group,
final Map<String, Set<String>> topicToGroup) {
if (group == null) {
return topicToGroup::containsKey;
} else {
return topic -> {
final Set<String> groups = topicToGroup.getOrDefault(topic, Set.of());
return !(groups.isEmpty() || groups.size() == 1 && groups.contains(group));
};
}
}
private enum Clock {
TICK
}
}