-
Notifications
You must be signed in to change notification settings - Fork 215
/
ClientActorRefs.java
142 lines (122 loc) · 4.12 KB
/
ClientActorRefs.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
/*
* Copyright (c) 2020 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.concurrent.NotThreadSafe;
import org.eclipse.ditto.internal.utils.pubsub.PubSubFactory;
import akka.actor.ActorPath;
import akka.actor.ActorRef;
/**
* Collection of all client actor refs of a connection actor.
*/
@NotThreadSafe
public final class ClientActorRefs {
private final Map<ActorPath, ActorRef> refsByPath = new HashMap<>();
private List<ActorRef> sortedRefs = List.of();
private ClientActorRefs() {}
/**
* Create a new empty store of client actor refs.
*
* @return an empty store.
*/
public static ClientActorRefs empty() {
return new ClientActorRefs();
}
/**
* Add a new client actor.
*
* @param newClientActor the new client actor.
*/
public void add(final ActorRef newClientActor) {
refsByPath.put(newClientActor.path(), newClientActor);
sortedRefs = sort(refsByPath);
}
/**
* Add a new client actor.
*
* @param newClientActors the new client actors.
*/
public void add(final List<ActorRef> newClientActors) {
newClientActors.forEach(newClientActor -> refsByPath.put(newClientActor.path(), newClientActor));
sortedRefs = sort(refsByPath);
}
public void remove(final ActorRef deadClientActor) {
refsByPath.remove(deadClientActor.path());
sortedRefs = sort(refsByPath);
}
/**
* Remove all known client actors.
*/
public void clear() {
refsByPath.clear();
sortedRefs = List.of();
}
/**
* Get stored client actor refs other than the one given.
*
* @param clientActor the given client actor.
* @return stored client actor refs different from the given actor.
*/
public List<ActorRef> getOtherActors(final ActorRef clientActor) {
return sortedRefs.stream()
.filter(actorRef -> !actorRef.equals(clientActor))
.collect(Collectors.toList());
}
/**
* Get the client actor responsible for a hash key, usually the entity ID of a signal.
* Only works if:
* - All client actors have started and have not crashed,
* - All client actors know about all other client actors,
* - All client actors run on the same Connectivity instance, or no more than 1 client actor runs on an instance.
*
* @param hashKey the hash key to determine.
* @return the client actor responsible for it.
*/
public Optional<ActorRef> lookup(final CharSequence hashKey) {
return get(PubSubFactory.hashForPubSub(hashKey));
}
/**
* Get the i-th client actor.
*
* @param index the index number of the client actor.
* @return the i-th client actor, or an empty optional if none exists.
*/
public Optional<ActorRef> get(final int index) {
if (sortedRefs.isEmpty()) {
return Optional.empty();
} else {
final int i = Math.max(0, Math.abs(index));
return Optional.of(sortedRefs.get(i % sortedRefs.size()));
}
}
public int size() {
return sortedRefs.size();
}
public List<ActorRef> getSortedRefs() {
return sortedRefs;
}
private static List<ActorRef> sort(final Map<ActorPath, ActorRef> refsByPath) {
return refsByPath.values().stream().sorted(ActorRef::compareTo).collect(Collectors.toList());
}
@Override
public String toString() {
return getClass().getSimpleName() + " [" +
"refsByPath=" + refsByPath +
", sortedRefs=" + sortedRefs +
"]";
}
}