Skip to content
This repository has been archived by the owner on Oct 30, 2020. It is now read-only.

Commit

Permalink
Added a correct consistent hash load balancer with test cases.
Browse files Browse the repository at this point in the history
  - The approach is to have one routing ring per partition
  - Rather than performing N (no. of partitions) lookup per routing request,
    it collapse N rings into one by building a unique set of points in a ring
    which for each point, contains the node id for each partition by looking up 
    individual partition routing ring.
  • Loading branch information
Terence Yim committed Feb 29, 2012
1 parent f256da3 commit 2ca216e
Show file tree
Hide file tree
Showing 3 changed files with 446 additions and 0 deletions.
@@ -0,0 +1,134 @@
package com.senseidb.cluster.routing;

import com.linkedin.norbert.javacompat.cluster.Node;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import org.apache.log4j.Logger;


/**
* A {@link SenseiLoadBalancer} that provides consistent hash behavior for all partitions with one lookup per routing.
*/
public class ConsistentHashLoadBalancer implements SenseiLoadBalancer
{
private static Logger logger = Logger.getLogger(ConsistentHashLoadBalancer.class);

private final HashProvider _hashProvider;
private final NavigableMap<Long, RoutingInfo> _routingMap;

public ConsistentHashLoadBalancer(HashProvider hashProvider, int bucketCount, Set<Node> nodes)
{
_hashProvider = hashProvider;
_routingMap = new TreeMap<Long, RoutingInfo>();

// Gather set of nodes for each partition
Map<Integer, Set<Node>> partitionNodes = new TreeMap<Integer, Set<Node>>();
for (Node node : nodes)
{
for (Integer partId : node.getPartitionIds())
{
Set<Node> partNodes = partitionNodes.get(partId);
if (partNodes == null)
{
partNodes = new HashSet<Node>();
partitionNodes.put(partId, partNodes);
}
partNodes.add(node);
}
}

// Build the common data structure shared among all RoutingInfo
int maxSize = 0;
int[] partitions = new int[partitionNodes.size()];
@SuppressWarnings("unchecked")
List<Node>[] nodeLists = new List[partitions.length];
int idx = 0;
for (Map.Entry<Integer, Set<Node>> entry : partitionNodes.entrySet())
{
partitions[idx] = entry.getKey();
nodeLists[idx] = new ArrayList<Node>(entry.getValue());
if (maxSize < nodeLists[idx].size()) {
maxSize = nodeLists[idx].size();
}
idx++;
}

// Builds individual ring for each partitions
Map<Integer, NavigableMap<Long, Integer>> rings = new TreeMap<Integer, NavigableMap<Long, Integer>>();
for (int i = 0; i < partitions.length; i++)
{
Integer partId = partitions[i];
NavigableMap<Long, Integer> ring = rings.get(partId);
if (ring == null)
{
ring = new TreeMap<Long, Integer>();
rings.put(partId, ring);
}

// Put points in ring. BucketCount points per node.
for (int j = 0; j < nodeLists[i].size(); j++)
{
for (int k = 0; k < bucketCount; k++)
{
ring.put(hashProvider.hash(String.format("node-%d-%d", nodeLists[i].get(j).getId(), k)), j);
}
}
}

// Generate points and gather node for each partition on each point
for (int slot = 0; slot < bucketCount * maxSize; slot++)
{
Long point = hashProvider.hash(String.format("ring-%d", slot));

// Choice of node for each partition
int[] nodeChoices = new int[partitions.length];
for (int i = 0; i < partitions.length; i++)
{
nodeChoices[i] = lookup(rings.get(partitions[i]), point);
}

_routingMap.put(point, new RoutingInfo(nodeLists, partitions, nodeChoices));
}
}

@Override
public RoutingInfo route(String routeParam)
{
if (_routingMap.isEmpty())
{
return null;
}

RoutingInfo result = lookup(_routingMap, _hashProvider.hash(routeParam));

if (logger.isDebugEnabled())
{
logger.debug(routeParam + " is sent to " + result.toString());
}

return result;
}

private <K, V> V lookup(NavigableMap<K, V> ring, K key)
{
V result = ring.get(key);
if (result == null)
{ // Not a direct match
Map.Entry<K, V> entry = ring.ceilingEntry(key);
result = (entry == null) ? ring.firstEntry().getValue() : entry.getValue();
}

return result;
}

@Override
public String toString()
{
return _routingMap.toString();
}
}
@@ -0,0 +1,24 @@
package com.senseidb.cluster.routing;


import com.linkedin.norbert.javacompat.cluster.Node;
import java.util.Set;


public class ConsistentHashLoadBalancerFactory implements SenseiLoadBalancerFactory {

private final int _multiplyFactor;
private final HashProvider _hashProvider;

public ConsistentHashLoadBalancerFactory(HashProvider hashProvider, int multiplyFactor)
{
_hashProvider = hashProvider;
_multiplyFactor = multiplyFactor;
}

@Override
public SenseiLoadBalancer newLoadBalancer(Set<Node> nodes)
{
return new ConsistentHashLoadBalancer(_hashProvider, _multiplyFactor, nodes);
}
}

2 comments on commit 2ca216e

@jhartman
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Terence,

I'm the maintainer of norbert, the library that sensei uses for distribution. I'd prefer to not move forward with this changeset, and to instead improve the way sensei uses norbert. Sensei doesn't take advantage of newer norbert features to do request customization and routing. By doing so, it bypasses a lot of norbert's infrastructure to protect itself and it's degrader. This is really, really bad. If it just fixes the way it uses norbert, it will receive this implementation for free. This has been a task on my plate for a long time, but I never got around to it when I was on cloud in SNA since it wasn't prioritized. The initial step was taking in January, when we migrated to the latest norbert release.

Norbert already has a class almost exactly like this. See https://github.com/linkedin-sna/norbert/blob/master/network/src/main/scala/com/linkedin/norbert/network/partitioned/loadbalancer/PartitionedConsistentHashedLoadBalancerFactory.scala. This class builds a consistent hash wheel per partition, unlike your approach. The store space required, however, is the same. It's numPartitions * numBuckets in both cases.

I also believe I see a problem with your approach. You're mapping all the rings onto a single ring. Usually, you look up a position in the ring and "roll" until you find a spot. What happens when a machine goes down? Now a different machine that can't handle the partition will be the next in the slot. So do you intend to iterate until you find the next one?

Let's work together to use norbert's infrastructure properly.

@chtyim
Copy link
Contributor

@chtyim chtyim commented on 2ca216e Mar 1, 2012

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Josh,

Sure. It's good to know that there is plan to fix Sensei to use Norbert correct.

For the approach I take, merging all partitions into one ring is only to reduce the number of lookup per routing (1 lookup vs N lookups for N partitions). When there is changes to node list (add/remove) or partitions, a new ConsistentHashLoadBalancer instance would be created, so no keep iterating (rolling) situation will arise. The promise of the merge is that for clients not routed to the down node won't get affected. For partitions that are affected, clients routed to the down node will evenly distribution (sort of, by the hashing function) among other replicas for that partition.

E.g.
Node 1 has partitions 1,2,3
Node 2 has partitions 2,3
Node 3 has partitions 1,2,3

So the ring might looks like:

1 -> 1,1,2 (p1 handled by node1, p2 handled by node1, p3 handled by node2)
5 -> 3,2,1
20 -> 3,1,2
55 -> 1,3,3
….

Now, if node 2 dies, a new ring will generate and it will look like (notice that routing for partition 1 won't change. Also routing for partition 2 and 3 also won't change for points that were not routed to node 2).

1 -> 1,1,1 (node 2 for p3 replace with the next one down the ring, I.e. Node 1)
5 -> 3,1,1
20 -> 3,1,3
55 -> 1,3,3

You can refer to the "testConsisitentHashRemove" test case for more details.

Terence

Please sign in to comment.