Skip to content

Commit

Permalink
Refactor the Managed routing strategy to separate guts into a set of …
Browse files Browse the repository at this point in the history
…utilities to be reused in the cluster group strategy.
  • Loading branch information
Jim Carroll committed May 11, 2017
1 parent a4f97f6 commit 74ef3da
Show file tree
Hide file tree
Showing 8 changed files with 291 additions and 156 deletions.
Expand Up @@ -7,15 +7,19 @@
import net.dempsy.config.ClusterId;
import net.dempsy.router.RoutingStrategy.ContainerAddress;
import net.dempsy.router.RoutingStrategy.Inbound;
import net.dempsy.router.shardutils.Leader;
import net.dempsy.router.shardutils.Subscriber;
import net.dempsy.router.shardutils.Utils;

public class ManagedInbound implements Inbound {
private Leader leader;
private Subscriber subscriber;
private Utils utils;
private Leader<ContainerAddress> leader;
private Subscriber<ContainerAddress> subscriber;
private Utils<ContainerAddress> utils;
private ClusterId clusterId;
private ContainerAddress address;
private KeyspaceChangeListener listener;
private final AtomicBoolean isRunning = new AtomicBoolean(true);
private int mask = 0;

@Override
public void setContainerDetails(final ClusterId clusterId, final ContainerAddress address, final KeyspaceChangeListener listener) {
Expand All @@ -26,12 +30,22 @@ public void setContainerDetails(final ClusterId clusterId, final ContainerAddres

@Override
public void start(final Infrastructure infra) {
utils = new Utils(infra, clusterId, address);
final int totalShards = Integer
.parseInt(infra.getConfigValue(ManagedInbound.class, Utils.CONFIG_KEY_TOTAL_SHARDS, Utils.DEFAULT_TOTAL_SHARDS));
final int minNodes = Integer.parseInt(infra.getConfigValue(ManagedInbound.class, Utils.CONFIG_KEY_MIN_NODES, Utils.DEFAULT_MIN_NODES));

if (Integer.bitCount(totalShards) != 1)
throw new IllegalArgumentException("The configuration property \"" + Utils.CONFIG_KEY_TOTAL_SHARDS
+ "\" must be set to a power of 2. It's currently set to " + totalShards);

this.mask = totalShards - 1;

utils = new Utils<ContainerAddress>(infra, clusterId.clusterName, address);
// subscriber first because it registers as a node. If there's no nodes
// there's nothing for the leader to do.
subscriber = new Subscriber(utils, infra, isRunning, listener);
subscriber = new Subscriber<ContainerAddress>(utils, infra, isRunning, listener, totalShards);
subscriber.process();
leader = new Leader(utils, infra, isRunning);
leader = new Leader<ContainerAddress>(utils, totalShards, minNodes, infra, isRunning, ContainerAddress[]::new);
leader.process();
}

Expand All @@ -47,7 +61,7 @@ public boolean isReady() {

@Override
public boolean doesMessageKeyBelongToNode(final Object messageKey) {
final int shardNum = utils.determineShard(messageKey);
final int shardNum = utils.determineShard(messageKey, mask);
return subscriber.doIOwnShard(shardNum);
}
}
Expand Up @@ -3,44 +3,52 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.IntConsumer;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import net.dempsy.DempsyException;
import net.dempsy.Infrastructure;
import net.dempsy.cluster.ClusterInfoException;
import net.dempsy.config.ClusterId;
import net.dempsy.messages.KeyedMessageWithType;
import net.dempsy.router.RoutingStrategy.ContainerAddress;
import net.dempsy.router.RoutingStrategy.Router;
import net.dempsy.router.managed.Utils.ShardAssignment;
import net.dempsy.router.shardutils.ShardState;
import net.dempsy.router.shardutils.Utils;
import net.dempsy.util.SafeString;
import net.dempsy.utils.PersistentTask;

public class ManagedRouter extends PersistentTask implements Router {
public class ManagedRouter implements Router, IntConsumer {
private static Logger LOGGER = LoggerFactory.getLogger(ManagedRouter.class);
private final AtomicReference<ContainerAddress[]> destinations = new AtomicReference<>(null);
private final AtomicReference<ContainerAddress[]> destinations;

final ClusterId clusterId;
private final AtomicBoolean isRunning;
private final Utils utils;
private final ManagedRouterFactory mommy;
private final String thisNodeId;

private final ShardState<ContainerAddress> state;
private final Utils<ContainerAddress> utils;
private int mask = 0;

ManagedRouter(final ManagedRouterFactory mom, final ClusterId clusterId, final Infrastructure infra) {
super(LOGGER, new AtomicBoolean(true), infra.getScheduler(), 500);
this.mommy = mom;
this.clusterId = clusterId;
this.utils = new Utils(infra, clusterId, null);
this.thisNodeId = infra.getNodeId();
this.isRunning = getIsRunningFlag();
process();
this.isRunning = new AtomicBoolean(true);
this.state = new ShardState<ContainerAddress>(clusterId.clusterName, infra, isRunning, ContainerAddress[]::new, this);
this.utils = state.getUtils();
this.destinations = state.getShardContentsArray();
this.state.process();
}

@Override
public void accept(final int mask) {
this.mask = mask;
}

@Override
Expand All @@ -51,7 +59,7 @@ public ContainerAddress selectDestinationForMessage(final KeyedMessageWithType m
SafeString.objectDescription(message != null ? message.key : null)
+ " is being used prior to initialization or after a failure.");

return destinations[utils.determineShard(message.key)];
return destinations[utils.determineShard(message.key, mask)];
}

@Override
Expand All @@ -68,30 +76,6 @@ public synchronized void release() {
isRunning.set(false);
}

@Override
public boolean execute() {
try {
final List<ShardAssignment> assignments = utils.persistentGetData(utils.shardsAssignedDir, this);
if (assignments == null)
return false; // nothing there, try later.

final ContainerAddress[] newState = new ContainerAddress[utils.totalNumShards];
for (final ShardAssignment sa : assignments) {
for (final int index : sa.shards) {
if (newState[index] != null)
LOGGER.warn("There are 2 nodes that think they both have shard " + index + ". The one that will be used is " + newState[index]
+ " the one being skipped is " + sa.addr);
else
newState[index] = sa.addr;
}
}
this.destinations.set(newState);
return true;
} catch (final ClusterInfoException cie) {
throw new RuntimeException(cie);
}
}

@Override
public String toString() {
return "{" + ManagedRouter.class.getSimpleName() + " at " + thisNodeId + " to " + clusterId + "}";
Expand Down
@@ -1,4 +1,4 @@
package net.dempsy.router.managed;
package net.dempsy.router.shardutils;

import static net.dempsy.util.Functional.chain;

Expand All @@ -12,6 +12,7 @@
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntFunction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand All @@ -22,24 +23,35 @@
import net.dempsy.cluster.ClusterInfoException;
import net.dempsy.cluster.ClusterInfoSession;
import net.dempsy.cluster.DirMode;
import net.dempsy.router.RoutingStrategy.ContainerAddress;
import net.dempsy.router.managed.Utils.ShardAssignment;
import net.dempsy.router.managed.Utils.SubdirAndData;
import net.dempsy.router.shardutils.Utils.ShardAssignment;
import net.dempsy.router.shardutils.Utils.SubdirAndData;
import net.dempsy.utils.PersistentTask;

public class Leader extends PersistentTask {
public class Leader<C> extends PersistentTask {
private static final Logger LOGGER = LoggerFactory.getLogger(Leader.class);

private boolean imIt = false;
private final Utils utils;
private final Utils<C> utils;
private final ClusterInfoSession session;
private final PersistentTask nodesChangedTask;
private final AtomicBoolean isReady = new AtomicBoolean(false);
private final IntFunction<C[]> newArraySupplier;
private final int totalNumShards;
private final int minNodes;

public Leader(final Utils msutils, final Infrastructure infra, final AtomicBoolean isRunning) {
public Leader(final Utils<C> msutils, final int totalNumShards, final int minNodes, final Infrastructure infra, final AtomicBoolean isRunning,
final IntFunction<C[]> newArraySupplier) {
super(LOGGER, isRunning, infra.getScheduler(), 500);
this.utils = msutils;
this.session = utils.session;
this.newArraySupplier = newArraySupplier;
this.totalNumShards = totalNumShards;
this.minNodes = minNodes;

if (Integer.bitCount(totalNumShards) != 1)
throw new IllegalArgumentException("The configuration property \"" + Utils.CONFIG_KEY_TOTAL_SHARDS
+ "\" must be set to a power of 2. It's currently set to " + totalNumShards);

this.nodesChangedTask = new PersistentTask(LOGGER, isRunning, infra.getScheduler(), 500) {
@Override
public boolean execute() {
Expand Down Expand Up @@ -108,23 +120,24 @@ private boolean nodesChanged() throws ClusterInfoException {
LOGGER.trace("Master was notifed of node changes");

// current nodes registered, sorted by rank
final List<SubdirAndData<ContainerAddress>> currentNodes = chain(
final List<SubdirAndData<C>> currentNodes = chain(
// I need to be notified when Nodes appear or disappear v
utils.persistentGetSubdirAndData(utils.nodesDir, nodesChangedTask, null), p -> Utils.rankSort(p));

// get all of the current shard assignments
@SuppressWarnings("unchecked")
final List<ShardAssignment> assignments = Optional.ofNullable((List<ShardAssignment>) utils.persistentGetData(utils.shardsAssignedDir, null))
final List<ShardAssignment<C>> assignments = Optional
.ofNullable((List<ShardAssignment<C>>) utils.persistentGetData(utils.shardsAssignedDir, null))
.orElse(new ArrayList<>());

// Build a lookup to determine the rank of a given node denoted by its ContainerAddress
final Map<ContainerAddress, Integer> rankByCa = new HashMap<>();
final Map<C, Integer> rankByCa = new HashMap<>();
for (int i = 0; i < currentNodes.size(); i++)
rankByCa.put(currentNodes.get(i).data, Integer.valueOf(i));

// create an array of already assigned and accepted addresses
final ContainerAddress[] assignedTo = new ContainerAddress[utils.totalNumShards];
for (final ShardAssignment sa : assignments) {
final C[] assignedTo = newArraySupplier.apply(totalNumShards);
for (final ShardAssignment<C> sa : assignments) {
// do we know about this destination?
if (rankByCa.get(sa.addr) != null) {
for (final int shard : sa.shards) {
Expand All @@ -141,7 +154,7 @@ private boolean nodesChanged() throws ClusterInfoException {

// for each node in order I need to build a reduced state.
for (int i = currentNodes.size() - 1; i >= 0; i--) {
final ContainerAddress cur = currentNodes.get(i).data;
final C cur = currentNodes.get(i).data;

final Set<Integer> shardsToRelease = perNodeRelease(cur, assignedTo, currentNodes.size(), i);
for (final Integer shard : shardsToRelease) {
Expand All @@ -151,17 +164,17 @@ private boolean nodesChanged() throws ClusterInfoException {

// now go through and add
for (int i = 0; i < currentNodes.size(); i++) {
final ContainerAddress cur = currentNodes.get(i).data;
final C cur = currentNodes.get(i).data;
final Set<Integer> shardsToAdd = perNodeAcquire(cur, assignedTo, currentNodes.size(), i);
for (final Integer shard : shardsToAdd) {
assignedTo[shard] = cur;
}
}

// now write the results.
final Map<ContainerAddress, List<Integer>> tmp = new HashMap<>();
final Map<C, List<Integer>> tmp = new HashMap<>();
for (int i = 0; i < assignedTo.length; i++) {
final ContainerAddress cur = assignedTo[i];
final C cur = assignedTo[i];
if (cur != null) {
List<Integer> shards = tmp.get(cur);
if (shards == null) {
Expand All @@ -173,15 +186,15 @@ private boolean nodesChanged() throws ClusterInfoException {
}

// now create the list of new assignments.
final List<ShardAssignment> newAssignments = new ArrayList<>(tmp.entrySet().stream()
.map(e -> new ShardAssignment(e.getValue().stream().mapToInt(i -> i.intValue()).toArray(), e.getKey()))
final List<ShardAssignment<C>> newAssignments = new ArrayList<>(tmp.entrySet().stream()
.map(e -> new ShardAssignment<C>(e.getValue().stream().mapToInt(i -> i.intValue()).toArray(), e.getKey(), totalNumShards, minNodes))
.collect(Collectors.toList()));

session.setData(utils.shardsAssignedDir, newAssignments);
return true;
}

private static List<Integer> buildDestinationsAcquired(final ContainerAddress thisNodeAddress, final ContainerAddress[] currentState) {
private static <C> List<Integer> buildDestinationsAcquired(final C thisNodeAddress, final C[] currentState) {
// destinationsAcquired reflects what we already have according to the currentState
return new ArrayList<>(IntStream.range(0, currentState.length)
.filter(i -> thisNodeAddress.equals(currentState[i]))
Expand All @@ -190,9 +203,8 @@ private static List<Integer> buildDestinationsAcquired(final ContainerAddress th
}

// go through and determine which nodes to give up ... if any
private Set<Integer> perNodeRelease(final ContainerAddress thisNodeAddress, final ContainerAddress[] currentState, final int nodeCount,
final int nodeRank) {
final int numberIShouldHave = howManyShouldIHave(utils.totalNumShards, nodeCount, nodeRank);
private Set<Integer> perNodeRelease(final C thisNodeAddress, final C[] currentState, final int nodeCount, final int nodeRank) {
final int numberIShouldHave = howManyShouldIHave(totalNumShards, nodeCount, minNodes, nodeRank);

// destinationsAcquired reflects what we already have according to the currentState
final List<Integer> destinationsAcquired = buildDestinationsAcquired(thisNodeAddress, currentState);
Expand All @@ -215,9 +227,8 @@ private Set<Integer> perNodeRelease(final ContainerAddress thisNodeAddress, fina
return ret;
}

private Set<Integer> perNodeAcquire(final ContainerAddress thisNodeAddress, final ContainerAddress[] currentState, final int nodeCount,
final int nodeRank) {
final int numberIShouldHave = howManyShouldIHave(utils.totalNumShards, nodeCount, nodeRank);
private Set<Integer> perNodeAcquire(final C thisNodeAddress, final C[] currentState, final int nodeCount, final int nodeRank) {
final int numberIShouldHave = howManyShouldIHave(totalNumShards, nodeCount, minNodes, nodeRank);

// destinationsAcquired reflects what we already have according to the currentState
final List<Integer> destinationsAcquired = buildDestinationsAcquired(thisNodeAddress, currentState);
Expand Down Expand Up @@ -248,9 +259,10 @@ private Set<Integer> perNodeAcquire(final ContainerAddress thisNodeAddress, fina
return ret;
}

private final static int howManyShouldIHave(final int totalShardCount, final int numNodes, final int myRank) {
final int base = Math.floorDiv(totalShardCount, numNodes);
final int mod = Math.floorMod(totalShardCount, numNodes);
private final static int howManyShouldIHave(final int totalShardCount, final int numNodes, final int minNodes, final int myRank) {
final int numNodesToConsider = Math.max(numNodes, minNodes);
final int base = Math.floorDiv(totalShardCount, numNodesToConsider);
final int mod = Math.floorMod(totalShardCount, numNodesToConsider);
return myRank < mod ? (base + 1) : base;
}

Expand All @@ -266,7 +278,8 @@ private boolean registerAndConfirmIfImIt() throws ClusterInfoException {
"This is IMPOSSIBLE. There's more than one subdir of " + utils.leaderDir + ". They include " + imItSubdirs);

// make sure it's still mine.
final ContainerAddress registered = (ContainerAddress) session.getData(utils.masterDetermineDir, null);
@SuppressWarnings("unchecked")
final C registered = (C) session.getData(utils.masterDetermineDir, null);

return utils.thisNodeAddress.equals(registered); // am I it, or not?
}
Expand Down

0 comments on commit 74ef3da

Please sign in to comment.