-
Notifications
You must be signed in to change notification settings - Fork 215
/
DittoProtocolSubImpl.java
194 lines (169 loc) · 8.4 KB
/
DittoProtocolSubImpl.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
/*
* Copyright (c) 2019 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.internal.utils.pubsub;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import akka.actor.AbstractExtensionId;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.ExtendedActorSystem;
/**
* Default implementation of {@link DittoProtocolSub}.
*/
final class DittoProtocolSubImpl implements DittoProtocolSub {
private final DistributedSub liveSignalSub;
private final DistributedSub twinEventSub;
private final DistributedSub policyAnnouncementSub;
private final DistributedAcks distributedAcks;
private DittoProtocolSubImpl(final DistributedSub liveSignalSub,
final DistributedSub twinEventSub,
final DistributedSub policyAnnouncementSub,
final DistributedAcks distributedAcks) {
this.liveSignalSub = liveSignalSub;
this.twinEventSub = twinEventSub;
this.policyAnnouncementSub = policyAnnouncementSub;
this.distributedAcks = distributedAcks;
}
static DittoProtocolSubImpl of(final ActorSystem system, final DistributedAcks distributedAcks) {
final DistributedSub liveSignalSub =
LiveSignalPubSubFactory.of(system, distributedAcks).startDistributedSub();
final DistributedSub twinEventSub =
ThingEventPubSubFactory.readSubjectsOnly(system, distributedAcks).startDistributedSub();
final DistributedSub policyAnnouncementSub =
PolicyAnnouncementPubSubFactory.of(system, system).startDistributedSub();
return new DittoProtocolSubImpl(liveSignalSub, twinEventSub, policyAnnouncementSub, distributedAcks);
}
@Override
public CompletionStage<Void> subscribe(final Collection<StreamingType> types,
final Collection<String> topics,
final ActorRef subscriber,
@Nullable final String group,
final boolean resubscribe) {
final CompletionStage<?> nop = CompletableFuture.completedFuture(null);
return partitionByStreamingTypes(types,
liveTypes -> !liveTypes.isEmpty()
? liveSignalSub.subscribeWithFilterAndGroup(topics, subscriber, toFilter(liveTypes), group,
resubscribe)
: nop,
hasTwinEvents -> hasTwinEvents
? twinEventSub.subscribeWithFilterAndGroup(topics, subscriber, null, group, resubscribe)
: nop,
hasPolicyAnnouncements -> hasPolicyAnnouncements
? policyAnnouncementSub.subscribeWithFilterAndGroup(topics, subscriber, null, group,
resubscribe)
: nop
);
}
@Override
public void removeSubscriber(final ActorRef subscriber) {
liveSignalSub.removeSubscriber(subscriber);
twinEventSub.removeSubscriber(subscriber);
policyAnnouncementSub.removeSubscriber(subscriber);
distributedAcks.removeSubscriber(subscriber);
}
@Override
public CompletionStage<Void> updateLiveSubscriptions(final Collection<StreamingType> types,
final Collection<String> topics,
final ActorRef subscriber) {
return partitionByStreamingTypes(types,
liveTypes -> !liveTypes.isEmpty()
? liveSignalSub.subscribeWithFilterAndGroup(topics, subscriber, toFilter(liveTypes), null,
false)
: liveSignalSub.unsubscribeWithAck(topics, subscriber),
hasTwinEvents -> CompletableFuture.completedStage(null),
hasPolicyAnnouncements -> CompletableFuture.completedStage(null)
);
}
@Override
public CompletionStage<Void> removeTwinSubscriber(final ActorRef subscriber, final Collection<String> topics) {
return twinEventSub.unsubscribeWithAck(topics, subscriber).thenApply(ack -> null);
}
@Override
public CompletionStage<Void> removePolicyAnnouncementSubscriber(final ActorRef subscriber,
final Collection<String> topics) {
return policyAnnouncementSub.unsubscribeWithAck(topics, subscriber).thenApply(ack -> null);
}
@Override
public CompletionStage<Void> declareAcknowledgementLabels(
final Collection<AcknowledgementLabel> acknowledgementLabels,
final ActorRef subscriber,
@Nullable final String group) {
if (acknowledgementLabels.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
// don't complete the future with the exception this method emits as this is a bug in Ditto which we must escalate
// via the actor supervision strategy
ensureAcknowledgementLabelsAreFullyResolved(acknowledgementLabels);
return distributedAcks.declareAcknowledgementLabels(acknowledgementLabels, subscriber, group)
.thenApply(ack -> null);
}
private static void ensureAcknowledgementLabelsAreFullyResolved(final Collection<AcknowledgementLabel> ackLabels) {
ackLabels.stream()
.filter(Predicate.not(AcknowledgementLabel::isFullyResolved))
.findFirst()
.ifPresent(ackLabel -> {
// if this happens, this is a bug in the Ditto codebase! at this point the AckLabel must be resolved
throw new IllegalArgumentException("AcknowledgementLabel was not fully resolved while " +
"trying to declare it: " + ackLabel);
});
}
@Override
public void removeAcknowledgementLabelDeclaration(final ActorRef subscriber) {
distributedAcks.removeAcknowledgementLabelDeclaration(subscriber);
}
private CompletionStage<Void> partitionByStreamingTypes(final Collection<StreamingType> types,
final Function<Set<StreamingType>, CompletionStage<?>> onLiveSignals,
final Function<Boolean, CompletionStage<?>> onTwinEvents,
final Function<Boolean, CompletionStage<?>> onPolicyAnnouncement) {
final Set<StreamingType> liveTypes;
final boolean hasTwinEvents;
final boolean hasPolicyAnnouncements;
if (types.isEmpty()) {
liveTypes = Collections.emptySet();
hasTwinEvents = false;
hasPolicyAnnouncements = false;
} else {
liveTypes = EnumSet.copyOf(types);
hasTwinEvents = liveTypes.remove(StreamingType.EVENTS);
hasPolicyAnnouncements = liveTypes.remove(StreamingType.POLICY_ANNOUNCEMENTS);
}
final CompletableFuture<?> liveStage = onLiveSignals.apply(liveTypes).toCompletableFuture();
final CompletableFuture<?> twinStage = onTwinEvents.apply(hasTwinEvents).toCompletableFuture();
final CompletableFuture<?> policyAnnouncementStage =
onPolicyAnnouncement.apply(hasPolicyAnnouncements).toCompletableFuture();
return CompletableFuture.allOf(liveStage, twinStage, policyAnnouncementStage);
}
private static Predicate<Collection<String>> toFilter(final Collection<StreamingType> streamingTypes) {
final Set<String> streamingTypeTopics =
streamingTypes.stream().map(StreamingType::getDistributedPubSubTopic).collect(Collectors.toSet());
return topics -> topics.stream().anyMatch(streamingTypeTopics::contains);
}
static final class ExtensionId extends AbstractExtensionId<DittoProtocolSub> {
static final ExtensionId INSTANCE = new ExtensionId();
private ExtensionId() {}
@Override
public DittoProtocolSub createExtension(final ExtendedActorSystem system) {
final DistributedAcks distributedAcks = DistributedAcks.create(system);
return of(system, distributedAcks);
}
}
}