Permalink
Browse files

Merge pull request #646 from jancona/master

Make LeastActiveBalancingPolicy.ShufflingCompare comparisons stable
  • Loading branch information...
2 parents 2724dcb + f0304a7 commit d48e723f4ea3df715e0a2a74d78a49cab3d59685 @zznate zznate committed Jan 13, 2014
@@ -16,15 +16,15 @@
* The list of hosts is shuffled on each pass to account for the case
* where a number of hosts are at the minimum number of connections
* (ie. they are not busy).
- *
- *
+ *
+ *
* @author zznate
*/
public class LeastActiveBalancingPolicy implements LoadBalancingPolicy {
-
+
private static final long serialVersionUID = 329849818218657061L;
private static final Logger log = LoggerFactory.getLogger(LeastActiveBalancingPolicy.class);
-
+
@Override
public HClientPool getPool(Collection<HClientPool> pools, Set<CassandraHost> excludeHosts) {
List<HClientPool> vals = Lists.newArrayList(pools);
@@ -34,7 +34,7 @@ public HClientPool getPool(Collection<HClientPool> pools, Set<CassandraHost> exc
Iterator<HClientPool> iterator = vals.iterator();
HClientPool concurrentHClientPool = iterator.next();
if ( excludeHosts != null && excludeHosts.size() > 0 ) {
- while (iterator.hasNext()) {
+ while (iterator.hasNext()) {
if ( !excludeHosts.contains(concurrentHClientPool.getCassandraHost()) ) {
break;
}
@@ -44,17 +44,34 @@ public HClientPool getPool(Collection<HClientPool> pools, Set<CassandraHost> exc
return concurrentHClientPool;
}
- private final class ShufflingCompare implements Comparator<HClientPool> {
-
+ /**
+ * Make the results of this Comparator stable (and thus transitive) by caching the numActive value
+ * for each HClientPool as they are seen, then reusing the cached value instead of the current value
+ * (which may have changed) if the same pool is compared again.
+ *
+ * Without this change the new TimSort algorithm in Java 7 sometimes throws a:
+ * java.lang.IllegalArgumentException: Comparison method violates its general contract!
+ */
+ static final class ShufflingCompare implements Comparator<HClientPool> {
+ private Map<HClientPool, Integer> cachedActive = new HashMap<HClientPool, Integer>();
+
public int compare(HClientPool o1, HClientPool o2) {
if ( log.isDebugEnabled() ) {
log.debug("comparing 1: {} and count {} with 2: {} and count {}",
new Object[]{o1.getCassandraHost(), o1.getNumActive(), o2.getCassandraHost(), o2.getNumActive()});
}
- return o1.getNumActive() - o2.getNumActive();
+ return getNumActive(o1) - getNumActive(o2);
+ }
+ private int getNumActive(HClientPool p) {
+ Integer ret = cachedActive.get(p);
+ if (ret == null) {
+ ret = p.getNumActive();
+ cachedActive.put(p, ret);
+ }
+ return ret;
}
}
-
+
@Override
public HClientPool createConnection(HClientFactory clientFactory, CassandraHost host, CassandraClientMonitor monitor) {
return new ConcurrentHClientPool(clientFactory, host, monitor);
@@ -2,25 +2,31 @@
import static org.junit.Assert.assertEquals;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
import java.util.Set;
+import me.prettyprint.cassandra.connection.client.HClient;
import me.prettyprint.cassandra.service.CassandraHost;
+import me.prettyprint.hector.api.exceptions.HectorException;
import org.junit.Test;
import org.mockito.Mockito;
public class LeastActiveBalancingPolicyTest extends BaseBalancingPolicyTest {
private LeastActiveBalancingPolicy leastActiveBalancingPolicy;
-
+
@Test
public void testGetPoolOk() {
leastActiveBalancingPolicy = new LeastActiveBalancingPolicy();
assertEquals(poolWith5Active, leastActiveBalancingPolicy.getPool(pools, null));
assertEquals(poolWith5Active, leastActiveBalancingPolicy.getPool(pools, null));
- assertEquals(poolWith5Active, leastActiveBalancingPolicy.getPool(pools, null));
+ assertEquals(poolWith5Active, leastActiveBalancingPolicy.getPool(pools, null));
Mockito.when(poolWith5Active.getNumActive()).thenReturn(8);
assertEquals(poolWith7Active, leastActiveBalancingPolicy.getPool(pools, null));
assertEquals(poolWith7Active, leastActiveBalancingPolicy.getPool(pools, null));
@@ -30,34 +36,108 @@ public void testGetPoolOk() {
assertEquals(poolWith5Active, leastActiveBalancingPolicy.getPool(pools, null));
assertEquals(poolWith5Active, leastActiveBalancingPolicy.getPool(pools, null));
}
-
+
@Test
- public void testSkipExhausted() {
+ public void testSkipExhausted() {
leastActiveBalancingPolicy = new LeastActiveBalancingPolicy();
assertEquals(poolWith7Active, leastActiveBalancingPolicy.getPool(pools, new HashSet<CassandraHost>(Arrays.asList(new CassandraHost("127.0.0.1:9160")))));
assertEquals(poolWith5Active, leastActiveBalancingPolicy.getPool(pools, new HashSet<CassandraHost>(Arrays.asList(new CassandraHost("127.0.0.2:9161")))));
}
-
+
@Test
public void testShuffleOnAllEqual() {
- ConcurrentHClientPool poolWith5Active2 = Mockito.mock(ConcurrentHClientPool.class);
+ ConcurrentHClientPool poolWith5Active2 = Mockito.mock(ConcurrentHClientPool.class);
Mockito.when(poolWith5Active2.getNumActive()).thenReturn(5);
Mockito.when(poolWith5Active2.getCassandraHost()).thenReturn(new CassandraHost("127.0.0.4:9163"));
- ConcurrentHClientPool poolWith5Active3 = Mockito.mock(ConcurrentHClientPool.class);
+ ConcurrentHClientPool poolWith5Active3 = Mockito.mock(ConcurrentHClientPool.class);
Mockito.when(poolWith5Active3.getNumActive()).thenReturn(5);
Mockito.when(poolWith5Active3.getCassandraHost()).thenReturn(new CassandraHost("127.0.0.5:9164"));
-
+
pools.add(poolWith5Active2);
pools.add(poolWith5Active3);
-
+
leastActiveBalancingPolicy = new LeastActiveBalancingPolicy();
// should hit all three equal hosts over the course of 50 runs
Set<CassandraHost> foundHosts = new HashSet<CassandraHost>(3);
for (int i = 0; i < 50; i++) {
HClientPool foundPool = leastActiveBalancingPolicy.getPool(pools, null);
foundHosts.add(foundPool.getCassandraHost());
- assert 5 == foundPool.getNumActive();
+ assert 5 == foundPool.getNumActive();
}
assertEquals(3, foundHosts.size());
}
+
+ @Test
+ public void testShufflingCompareStability() {
+ final int POOL_SIZE = 360;
+ List<HClientPool> pools = new ArrayList<HClientPool>(POOL_SIZE);
+ for (int i = 0; i < POOL_SIZE; i++) {
+ pools.add(new TestPool());
+ }
+ // Do it enough times and it will fail
+ for (int i = 0; i < 50; i++) {
+ Collections.shuffle(pools);
+ Collections.sort(pools, new LeastActiveBalancingPolicy.ShufflingCompare());
+ }
+
+ }
+ private static class TestPool implements HClientPool {
+ private Random rand = new Random();
+
+ public int getNumActive() {
+ return rand.nextInt(30);
+ }
+
+ public int getNumIdle() {
+ return 0;
+ }
+
+ public int getNumBlockedThreads() {
+ return 0;
+ }
+
+ public String getName() {
+ return null;
+ }
+
+ public boolean getIsActive() {
+ return false;
+ }
+
+ public long getExhaustedTime() {
+ return 0;
+ }
+
+ public HClient borrowClient() throws HectorException {
+ return null;
+ }
+
+ public CassandraHost getCassandraHost() {
+ return null;
+ }
+
+ public int getNumBeforeExhausted() {
+ return 0;
+ }
+
+ public boolean isExhausted() {
+ return false;
+ }
+
+ public int getMaxActive() {
+ return 0;
+ }
+
+ public String getStatusAsString() {
+ return null;
+ }
+
+ public void releaseClient(HClient client) throws HectorException {
+
+ }
+
+ public void shutdown() {
+
+ }
+ }
}

0 comments on commit d48e723

Please sign in to comment.