/
ClusterGroupRouter.java
148 lines (125 loc) · 5.36 KB
/
ClusterGroupRouter.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package net.dempsy.router.group;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.IntConsumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import net.dempsy.DempsyException;
import net.dempsy.Infrastructure;
import net.dempsy.config.ClusterId;
import net.dempsy.messages.KeyedMessageWithType;
import net.dempsy.router.RoutingStrategy.ContainerAddress;
import net.dempsy.router.RoutingStrategy.Router;
import net.dempsy.router.group.intern.GroupDetails;
import net.dempsy.router.shardutils.ShardState;
import net.dempsy.router.shardutils.Utils;
import net.dempsy.util.SafeString;
public class ClusterGroupRouter implements Router, IntConsumer {
private static Logger LOGGER = LoggerFactory.getLogger(ClusterGroupRouter.class);
private final AtomicReference<GroupDetails[]> destinations;
final ClusterId clusterId;
private final String clusterName;
private final AtomicBoolean isRunning;
private final ClusterGroupRouterFactory mommy;
private final String thisNodeId;
private final ShardState<GroupDetails> state;
private final Utils<GroupDetails> utils;
private int mask = 0;
private int containerIndex = -1;
ClusterGroupRouter(final ClusterGroupRouterFactory mom, final ClusterId clusterId, final Infrastructure infra, final String groupName) {
this.mommy = mom;
this.clusterId = clusterId;
this.clusterName = clusterId.clusterName;
this.thisNodeId = infra.getNodeId();
this.isRunning = new AtomicBoolean(true);
this.state = new ShardState<GroupDetails>(groupName, infra, isRunning, GroupDetails[]::new, this);
this.utils = state.getUtils();
this.destinations = state.getShardContentsArray();
this.state.process();
}
@Override
public void accept(final int mask) {
this.mask = mask;
}
@Override
public ContainerAddress selectDestinationForMessage(final KeyedMessageWithType message) {
final GroupDetails[] destinations = this.destinations.get();
if (destinations == null)
throw new DempsyException("It appears the " + ClusterGroupRouter.class.getSimpleName() + " strategy for the message key " +
SafeString.objectDescription(message != null ? message.key : null)
+ " is being used prior to initialization or after a failure.");
if (containerIndex < 0) {
containerIndex = getIndex(destinations, clusterName);
if (containerIndex < 0)
return null;
}
final GroupDetails cur = destinations[utils.determineShard(message.key, mask)];
return cur.containerAddresses[containerIndex];
}
@Override
public Collection<ContainerAddress> allDesintations() {
final GroupDetails[] cur = destinations.get();
if (containerIndex < 0) {
containerIndex = getIndex(cur, clusterName);
if (containerIndex < 0)
return Collections.emptyList();
}
if (cur == null)
return new ArrayList<>();
return new ArrayList<>(Arrays.stream(cur)
.filter(gd -> gd != null)
.map(gd -> gd.containerAddresses[containerIndex])
.filter(ca -> ca != null)
.collect(Collectors.toSet()));
}
@Override
public synchronized void release() {
mommy.release(this);
isRunning.set(false);
}
@Override
public String toString() {
return "{" + ClusterGroupRouter.class.getSimpleName() + " at " + thisNodeId + " to " + clusterId + "}";
}
/**
* This makes sure all of the destinations are full.
*/
boolean isReady() {
final GroupDetails[] ds = destinations.get();
if (ds == null)
return false;
for (final GroupDetails d : ds)
if (d == null)
return false;
final boolean ret = ds.length != 0 && getIndex(ds, clusterName) >= 0; // this method is only called in tests and this needs to be true there.
if (ret && LOGGER.isDebugEnabled())
LOGGER.debug("at {} to {} is Ready " + shorthand(cvrt(ds, clusterName)), thisNodeId, clusterId);
return ret;
}
private static int getIndex(final GroupDetails[] destinations, final String clustername) {
final GroupDetails gd = Arrays.stream(destinations).filter(g -> g != null).findAny().orElse(null);
if (gd == null)
return -1;
final Integer ret = gd.clusterIndicies.get(clustername);
return ret == null ? -1 : ret.intValue();
}
private static ContainerAddress[] cvrt(final GroupDetails[] gds, final String clusterName) {
final int clusterIndex = getIndex(gds, clusterName);
if (clusterIndex < 0)
return new ContainerAddress[0];
return Arrays.stream(gds)
.map(gd -> gd == null ? null : gd.containerAddresses[clusterIndex])
.toArray(ContainerAddress[]::new);
}
private static final Set<ContainerAddress> shorthand(final ContainerAddress[] addr) {
if (addr == null)
return null;
return Arrays.stream(addr).collect(Collectors.toSet());
}
}