-
Notifications
You must be signed in to change notification settings - Fork 215
/
DistributedSubImpl.java
122 lines (105 loc) · 4.83 KB
/
DistributedSubImpl.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
/*
* Copyright (c) 2019 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.internal.utils.pubsub;
import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotEmpty;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.eclipse.ditto.internal.utils.ddata.DistributedDataConfig;
import org.eclipse.ditto.internal.utils.pubsub.api.RemoveSubscriber;
import org.eclipse.ditto.internal.utils.pubsub.api.Request;
import org.eclipse.ditto.internal.utils.pubsub.api.SubAck;
import org.eclipse.ditto.internal.utils.pubsub.api.Subscribe;
import org.eclipse.ditto.internal.utils.pubsub.api.Unsubscribe;
import akka.actor.ActorRef;
import akka.cluster.ddata.Replicator;
import akka.pattern.Patterns;
/**
* Package-private implementation of {@link DistributedSub}.
*/
final class DistributedSubImpl implements DistributedSub {
// package-private for unit tests
final ActorRef subSupervisor;
private final DistributedDataConfig config;
private final Replicator.WriteConsistency writeConsistency;
private final long ddataDelayInMillis;
DistributedSubImpl(final DistributedDataConfig config, final ActorRef subSupervisor) {
this.config = config;
this.subSupervisor = subSupervisor;
writeConsistency = config.getSubscriptionWriteConsistency();
ddataDelayInMillis = config.getSubscriptionDelay().toMillis();
}
@Override
public CompletionStage<SubAck> subscribeWithFilterAndGroup(final Collection<String> topics,
final ActorRef subscriber,
@Nullable final Predicate<Collection<String>> filter,
@Nullable final String group,
final boolean resubscribe) {
if (group != null) {
checkNotEmpty(group, "group");
}
final Subscribe subscribe = Subscribe.of(topics, subscriber, writeConsistency, true, filter, group, resubscribe);
final CompletionStage<SubAck> subAckFuture = askSubSupervisor(subscribe);
if (ddataDelayInMillis <= 0) {
return subAckFuture;
} else {
return subAckFuture.thenCompose(result -> {
// delay completion to account for dissemination delay between ddata replicator and change recipient
final CompletableFuture<SubAck> resultFuture = new CompletableFuture<>();
resultFuture.completeOnTimeout(result, ddataDelayInMillis, TimeUnit.MILLISECONDS);
return resultFuture;
});
}
}
@Override
public CompletionStage<SubAck> unsubscribeWithAck(final Collection<String> topics,
final ActorRef subscriber) {
return askSubSupervisor(Unsubscribe.of(topics, subscriber, writeConsistency, true));
}
private CompletionStage<SubAck> askSubSupervisor(final Request request) {
return Patterns.ask(subSupervisor, request, config.getWriteTimeout())
.thenCompose(DistributedSubImpl::processAskResponse);
}
@Override
public void subscribeWithoutAck(final Collection<String> topics, final ActorRef subscriber) {
final Request request =
Subscribe.of(topics, subscriber,
(Replicator.WriteConsistency) Replicator.writeLocal(), false, null);
subSupervisor.tell(request, subscriber);
}
@Override
public void unsubscribeWithoutAck(final Collection<String> topics, final ActorRef subscriber) {
final Request request =
Unsubscribe.of(topics, subscriber,
(Replicator.WriteConsistency) Replicator.writeLocal(), false);
subSupervisor.tell(request, subscriber);
}
@Override
public void removeSubscriber(final ActorRef subscriber) {
final Request request =
RemoveSubscriber.of(subscriber, (Replicator.WriteConsistency) Replicator.writeLocal(), false);
subSupervisor.tell(request, subscriber);
}
private static CompletionStage<SubAck> processAskResponse(final Object askResponse) {
if (askResponse instanceof SubAck) {
return CompletableFuture.completedStage((SubAck) askResponse);
} else if (askResponse instanceof Throwable) {
return CompletableFuture.failedStage((Throwable) askResponse);
} else {
return CompletableFuture.failedStage(new ClassCastException("Expect SubAck, got: " + askResponse));
}
}
}