Skip to content

Commit

Permalink
Small optimization of the group strategy. Plus changed paraent pom ar…
Browse files Browse the repository at this point in the history
…tifactId to fit the convention.
  • Loading branch information
homer-simpleton committed May 14, 2017
1 parent 27d24da commit f3de50c
Show file tree
Hide file tree
Showing 14 changed files with 160 additions and 76 deletions.
2 changes: 1 addition & 1 deletion dempsy-framework.api/pom.xml
Expand Up @@ -4,7 +4,7 @@

<parent>
<groupId>net.dempsy</groupId>
<artifactId>dempsy-parent</artifactId>
<artifactId>dempsy-framework.parent</artifactId>
<version>0.9-SNAPSHOT</version>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion dempsy-framework.core/pom.xml
Expand Up @@ -4,7 +4,7 @@

<parent>
<groupId>net.dempsy</groupId>
<artifactId>dempsy-parent</artifactId>
<artifactId>dempsy-framework.parent</artifactId>
<version>0.9-SNAPSHOT</version>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion dempsy-framework.impl/pom.xml
Expand Up @@ -4,7 +4,7 @@

<parent>
<groupId>net.dempsy</groupId>
<artifactId>dempsy-parent</artifactId>
<artifactId>dempsy-framework.parent</artifactId>
<version>0.9-SNAPSHOT</version>
</parent>

Expand Down
Expand Up @@ -34,6 +34,7 @@ public class ClusterGroupInbound {
private final List<Proxy> inbounds = new ArrayList<>();
private boolean started = false;
private GroupDetails groupDetails = null;
private Map<String, ContainerAddress> caByCluster = null;

private static final Map<NodeAddress, Map<String, ClusterGroupInbound>> current = new HashMap<>();

Expand Down Expand Up @@ -140,19 +141,20 @@ private synchronized void maybeStart(final Infrastructure infra, final Proxy ib)
throw new IllegalStateException("The group name isn't set on the inbound for " + ib.clusterId
+ ". This shouldn't be possible. Was the typeId specified correctly?");

if (groupDetails == null)
if (groupDetails == null) {
groupDetails = new GroupDetails(ib.groupName, ib.address.node);
else if (!groupDetails.groupName.equals(ib.groupName))
caByCluster = new HashMap<>();
} else if (!groupDetails.groupName.equals(ib.groupName))
throw new IllegalStateException("The group name for " + ib.clusterId + " is " + ib.groupName
+ " but doesn't match prevous group names supposedly in the same group: " + groupDetails.groupName);
else if (!groupDetails.node.equals(ib.address.node))
throw new IllegalStateException("The node address for " + ib.clusterId + " is " + ib.address.node
+ " but doesn't match prevous group names supposedly in the same group: " + groupDetails.node);

if (groupDetails.caByCluster.containsKey(ib.clusterId.clusterName))
if (caByCluster.containsKey(ib.clusterId.clusterName))
throw new IllegalStateException("There appears to be two inbounds both configured with the same cluster id:" + ib.clusterId);

groupDetails.caByCluster.put(ib.clusterId.clusterName, ib.address);
caByCluster.put(ib.clusterId.clusterName, ib.address);

if (!inbounds.contains(ib))
throw new IllegalStateException(
Expand All @@ -173,6 +175,8 @@ else if (!groupDetails.node.equals(ib.address.node))

this.mask = totalShards - 1;

groupDetails.fillout(caByCluster);

utils = new Utils<GroupDetails>(infra, groupDetails.groupName, groupDetails);
// subscriber first because it registers as a node. If there's no nodes
// there's nothing for the leader to do.
Expand Down
Expand Up @@ -3,6 +3,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -36,6 +37,7 @@ public class ClusterGroupRouter implements Router, IntConsumer {
private final ShardState<GroupDetails> state;
private final Utils<GroupDetails> utils;
private int mask = 0;
private int containerIndex = -1;

ClusterGroupRouter(final ClusterGroupRouterFactory mom, final ClusterId clusterId, final Infrastructure infra, final String groupName) {
this.mommy = mom;
Expand All @@ -61,19 +63,29 @@ public ContainerAddress selectDestinationForMessage(final KeyedMessageWithType m
throw new DempsyException("It appears the " + ClusterGroupRouter.class.getSimpleName() + " strategy for the message key " +
SafeString.objectDescription(message != null ? message.key : null)
+ " is being used prior to initialization or after a failure.");
if (containerIndex < 0) {
containerIndex = getIndex(destinations, clusterName);
if (containerIndex < 0)
return null;
}

final GroupDetails cur = destinations[utils.determineShard(message.key, mask)];
return cur.caByCluster.get(clusterName);
return cur.containerAddresses[containerIndex];
}

@Override
public Collection<ContainerAddress> allDesintations() {
final GroupDetails[] cur = destinations.get();
if (containerIndex < 0) {
containerIndex = getIndex(cur, clusterName);
if (containerIndex < 0)
return Collections.emptyList();
}
if (cur == null)
return new ArrayList<>();
return new ArrayList<>(Arrays.stream(cur)
.filter(gd -> gd != null)
.map(gd -> gd.caByCluster.get(clusterName))
.map(gd -> gd.containerAddresses[containerIndex])
.filter(ca -> ca != null)
.collect(Collectors.toSet()));
}
Expand All @@ -99,17 +111,31 @@ boolean isReady() {
for (final GroupDetails d : ds)
if (d == null)
return false;
final boolean ret = ds.length != 0; // this method is only called in tests and this needs to be true there.

final boolean ret = ds.length != 0 && getIndex(ds, clusterName) >= 0; // this method is only called in tests and this needs to be true there.

if (ret && LOGGER.isDebugEnabled())
LOGGER.debug("at {} to {} is Ready " + shorthand(cvrt(ds, clusterName)), thisNodeId, clusterId);

return ret;
}

private static int getIndex(final GroupDetails[] destinations, final String clustername) {
final GroupDetails gd = Arrays.stream(destinations).filter(g -> g != null).findAny().orElse(null);
if (gd == null)
return -1;

final Integer ret = gd.clusterIndicies.get(clustername);
return ret == null ? -1 : ret.intValue();
}

private static ContainerAddress[] cvrt(final GroupDetails[] gds, final String clusterName) {
final int clusterIndex = getIndex(gds, clusterName);
if (clusterIndex < 0)
return new ContainerAddress[0];

return Arrays.stream(gds)
.map(gd -> gd == null ? null : gd.caByCluster.get(clusterName))
.map(gd -> gd == null ? null : gd.containerAddresses[clusterIndex])
.toArray(ContainerAddress[]::new);
}

Expand Down
@@ -1,8 +1,10 @@
package net.dempsy.router.group.intern;

import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.IntStream;

import net.dempsy.router.RoutingStrategy.ContainerAddress;
import net.dempsy.transport.NodeAddress;
Expand All @@ -12,26 +14,64 @@ public class GroupDetails implements Serializable {

public final String groupName;
public final NodeAddress node;
public final Map<String, ContainerAddress> caByCluster;
public final Map<String, Integer> clusterIndicies;
public ContainerAddress[] containerAddresses;

@SuppressWarnings("unused") // serialization. Yay!
private GroupDetails() {
groupName = null;
node = null;
caByCluster = null;
clusterIndicies = null;
containerAddresses = null;
}

public GroupDetails(final String groupName, final NodeAddress nodeAddress) {
this.groupName = groupName;
this.node = nodeAddress;
this.caByCluster = new HashMap<>();
this.clusterIndicies = new HashMap<>();
this.containerAddresses = null;
}

public void fillout(final Map<String, ContainerAddress> caByCluster) throws IllegalStateException {
final int size = caByCluster.size();
this.containerAddresses = new ContainerAddress[size];
caByCluster.entrySet().forEach(e -> {
final ContainerAddress ca = e.getValue();
final int[] indicies = ca.clusters;
for (final int index : indicies) {
if (containerAddresses[index] != null)
throw new IllegalStateException(
"Two different clusters have the same container index (" + index + "). One is " + e.getKey() + ".");
containerAddresses[index] = ca;
}
});

for (int i = 0; i < containerAddresses.length; i++) {
if (containerAddresses[i] == null)
throw new IllegalStateException("Missing container address at index " + i);
}

// now set clusterIndicies
caByCluster.entrySet().forEach(e -> {
final String cn = e.getKey();
final ContainerAddress ca = e.getValue();
IntStream.of(ca.clusters).forEach(i -> {
final Integer index = Integer.valueOf(i);
if (clusterIndicies.containsKey(cn)) {
if (!clusterIndicies.get(cn).equals(index))
throw new IllegalStateException("cluster " + cn + " seems to corespond to multiple clusters in the group including "
+ clusterIndicies.get(cn) + " and " + i);
} else
clusterIndicies.put(cn, index);
});
});
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((caByCluster == null) ? 0 : caByCluster.hashCode());
result = prime * result + Arrays.hashCode(containerAddresses);
result = prime * result + ((groupName == null) ? 0 : groupName.hashCode());
result = prime * result + ((node == null) ? 0 : node.hashCode());
return result;
Expand All @@ -46,10 +86,7 @@ public boolean equals(final Object obj) {
if (getClass() != obj.getClass())
return false;
final GroupDetails other = (GroupDetails) obj;
if (caByCluster == null) {
if (other.caByCluster != null)
return false;
} else if (!caByCluster.equals(other.caByCluster))
if (!Arrays.equals(containerAddresses, other.containerAddresses))
return false;
if (groupName == null) {
if (other.groupName != null)
Expand All @@ -63,4 +100,5 @@ public boolean equals(final Object obj) {
return false;
return true;
}

}
Expand Up @@ -20,7 +20,6 @@
import net.dempsy.transport.tcp.TcpAddress;
import net.dempsy.transport.tcp.nio.internal.NioUtils;

// TODO: blocking that creates back-pressure
public final class NioSender implements Sender {
private final static Logger LOGGER = LoggerFactory.getLogger(NioSender.class);

Expand Down

0 comments on commit f3de50c

Please sign in to comment.