/
ManagedRouter.java
108 lines (90 loc) · 3.77 KB
/
ManagedRouter.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package net.dempsy.router.managed;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.IntConsumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import net.dempsy.DempsyException;
import net.dempsy.Infrastructure;
import net.dempsy.config.ClusterId;
import net.dempsy.messages.KeyedMessageWithType;
import net.dempsy.router.RoutingStrategy.ContainerAddress;
import net.dempsy.router.RoutingStrategy.Router;
import net.dempsy.router.shardutils.ShardState;
import net.dempsy.router.shardutils.Utils;
import net.dempsy.util.SafeString;
public class ManagedRouter implements Router, IntConsumer {
private static Logger LOGGER = LoggerFactory.getLogger(ManagedRouter.class);
private final AtomicReference<ContainerAddress[]> destinations;
final ClusterId clusterId;
private final AtomicBoolean isRunning;
private final ManagedRouterFactory mommy;
private final String thisNodeId;
private final ShardState<ContainerAddress> state;
private final Utils<ContainerAddress> utils;
private int mask = 0;
ManagedRouter(final ManagedRouterFactory mom, final ClusterId clusterId, final Infrastructure infra) {
this.mommy = mom;
this.clusterId = clusterId;
this.thisNodeId = infra.getNodeId();
this.isRunning = new AtomicBoolean(true);
this.state = new ShardState<ContainerAddress>(clusterId.clusterName, infra, isRunning, ContainerAddress[]::new, this);
this.utils = state.getUtils();
this.destinations = state.getShardContentsArray();
this.state.process();
}
@Override
public void accept(final int mask) {
this.mask = mask;
}
@Override
public ContainerAddress selectDestinationForMessage(final KeyedMessageWithType message) {
final ContainerAddress[] destinations = this.destinations.get();
if (destinations == null)
throw new DempsyException("It appears the " + ManagedRouter.class.getSimpleName() + " strategy for the message key " +
SafeString.objectDescription(message != null ? message.key : null)
+ " is being used prior to initialization or after a failure.");
return destinations[utils.determineShard(message.key, mask)];
}
@Override
public Collection<ContainerAddress> allDesintations() {
final ContainerAddress[] cur = destinations.get();
if (cur == null)
return new ArrayList<>();
return new ArrayList<>(Arrays.stream(cur).filter(ca -> ca != null).collect(Collectors.toSet()));
}
@Override
public synchronized void release() {
mommy.release(this);
isRunning.set(false);
}
@Override
public String toString() {
return "{" + ManagedRouter.class.getSimpleName() + " at " + thisNodeId + " to " + clusterId + "}";
}
/**
* This makes sure all of the destinations are full.
*/
boolean isReady() {
final ContainerAddress[] ds = destinations.get();
if (ds == null)
return false;
for (final ContainerAddress d : ds)
if (d == null)
return false;
final boolean ret = ds.length != 0; // this method is only called in tests and this needs to be true there.
if (ret && LOGGER.isDebugEnabled())
LOGGER.debug("at {} to {} is Ready " + shorthand(ds), thisNodeId, clusterId);
return ret;
}
private static final Set<ContainerAddress> shorthand(final ContainerAddress[] addr) {
if (addr == null)
return null;
return Arrays.stream(addr).collect(Collectors.toSet());
}
}