-
Notifications
You must be signed in to change notification settings - Fork 214
/
DittoProtocolSub.java
146 lines (130 loc) · 5.88 KB
/
DittoProtocolSub.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.internal.utils.pubsubthings;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
import javax.annotation.Nullable;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.internal.utils.pubsub.StreamingType;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Extension;
/**
* Subscriptions for Ditto protocol channels.
*/
public interface DittoProtocolSub extends Extension {
/**
* Subscribe for each streaming type the same collection of topics.
*
* @param types the streaming types.
* @param topics the topics.
* @param subscriber who is subscribing.
* @return future that completes or fails according to the acknowledgement.
*/
default CompletionStage<Void> subscribe(Collection<StreamingType> types, Collection<String> topics,
ActorRef subscriber) {
return subscribe(types, topics, subscriber, null, false).thenApply(consistent -> null);
}
/**
* Subscribe for each streaming type the same collection of topics.
*
* @param types the streaming types.
* @param topics the topics.
* @param subscriber who is subscribing.
* @param group the group the subscriber belongs to, or null.
* @return future that completes or fails according to the acknowledgement, containing the result of consistency
* check for resubscriptions.
*/
CompletionStage<Boolean> subscribe(Collection<StreamingType> types, Collection<String> topics, ActorRef subscriber,
@Nullable String group, final boolean resubscribe);
/**
* Remove a subscriber.
*
* @param subscriber who is unsubscribing.
*/
void removeSubscriber(ActorRef subscriber);
/**
* Update streaming types of a subscriber.
*
* @param types the currently active streaming types.
* @param topics the topics to unsubscribe from.
* @param subscriber the subscriber.
* @return future that completes or fails according to the acknowledgement.
*/
CompletionStage<Void> updateLiveSubscriptions(Collection<StreamingType> types, Collection<String> topics,
ActorRef subscriber);
/**
* Remove a subscriber from the twin events channel only.
*
* @param subscriber whom to remove.
* @param topics what were the subscribed topics.
* @return future that completes or fails according to the acknowledgement.
*/
CompletionStage<Void> removeTwinSubscriber(ActorRef subscriber, Collection<String> topics);
/**
* Remove a subscriber from the policy announcements only.
*
* @param subscriber whom to remove.
* @param topics what were the subscribed topics.
* @return future that completes or fails according to the acknowledgement.
*/
CompletionStage<Void> removePolicyAnnouncementSubscriber(ActorRef subscriber, Collection<String> topics);
/**
* Declare acknowledgement labels for a subscriber.
* Declared acknowledgement labels are globally unique for each subscriber.
* When racing against another subscriber on another node, the future may still complete successfully,
* but the subscriber losing the race will receive an {@code AcknowledgementLabelNotUniqueException} later.
* This method will always return a failed future if a distributed data for declared labels is not provided.
*
* @param acknowledgementLabels the acknowledgement labels to declare.
* @param subscriber the subscriber making the declaration.
* @param group any group the subscriber belongs to, or null.
* @return a future that completes successfully when the initial declaration succeeds and fails if duplicate labels
* are known. Subscribers losing a race against remote subscribers may receive an
* {@code AcknowledgementLabelNotUniqueException} later.
*/
CompletionStage<Void> declareAcknowledgementLabels(Collection<AcknowledgementLabel> acknowledgementLabels,
ActorRef subscriber, @Nullable String group);
/**
* Relinquish any acknowledgement labels declared by a subscriber.
*
* @param subscriber the subscriber.
*/
void removeAcknowledgementLabelDeclaration(ActorRef subscriber);
/**
* Remove subscriber from all distributed data and wait for acknowledgements.
*
* @param subscriber the subscriber to be removed.
* @param topics the topics of the subscriber to be removed.
* @return future that completes or fails according to the acknowledgements.
*/
default CompletionStage<Void> removeSubscriber(ActorRef subscriber, Collection<String> topics) {
removeAcknowledgementLabelDeclaration(subscriber);
final var unsubLive = updateLiveSubscriptions(List.of(), topics, subscriber);
final var unsubTwin = removeTwinSubscriber(subscriber, topics);
final var unsubPolicy = removePolicyAnnouncementSubscriber(subscriber, topics);
final BiFunction<Object, Object, Void> voidFunction = (x, y) -> null;
return unsubLive.thenCombine(unsubTwin, voidFunction).thenCombine(unsubPolicy, voidFunction);
}
/**
* Get the {@code DittoProtocolSub} for an actor system.
*
* @param system the actor system.
* @return the {@code DittoProtocolSub} extension.
*/
static DittoProtocolSub get(final ActorSystem system) {
return DittoProtocolSubImpl.ExtensionId.INSTANCE.get(system);
}
}