Browse files

added limit on how long client pool can be exhausted

  • Loading branch information...
1 parent 31cbcc3 commit 04673ffa8aafe8153e8b5f64453f755e4c34cd75 @shaunkalley shaunkalley committed Sep 5, 2013
View
2 core/src/main/java/me/prettyprint/cassandra/connection/HConnectionManager.java
@@ -277,8 +277,8 @@ public void operateWithFailover(Operation<?> op) throws HectorException {
if (hostPools.isEmpty()) {
throw he;
}
- retryable = op.failoverPolicy.shouldRetryFor(HPoolExhaustedException.class);
excludeHosts.add(pool.getCassandraHost());
+ retryable = op.failoverPolicy.shouldRetryFor(HPoolExhaustedException.class);
monitor.incCounter(Counter.POOL_EXHAUSTED);
}
} else if ( he instanceof HPoolRecoverableException ) {
View
6 core/src/main/java/me/prettyprint/cassandra/service/CassandraHost.java
@@ -40,10 +40,10 @@
public static final long DEFAULT_MAX_WAITTIME_WHEN_EXHAUSTED = -1;
/**
- * The default max exhausted time before suspending. Default value is
- * negative which means it won't suspend.
+ * The default max exhausted time before suspending. Default value is set to
+ * maximum so that it won't suspend.
*/
- public static final long DEFAULT_MAX_EXHAUSTED_TIME_BEFORE_SUSPENDING = -1;
+ public static final long DEFAULT_MAX_EXHAUSTED_TIME_BEFORE_SUSPENDING = Long.MAX_VALUE;
public static final boolean DEFAULT_LIFO = true;
/**
View
100 core/src/test/java/me/prettyprint/cassandra/connection/HConnectionManagerTest.java
@@ -1,20 +1,25 @@
package me.prettyprint.cassandra.connection;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
import me.prettyprint.cassandra.BaseEmbededServerSetupTest;
import me.prettyprint.cassandra.connection.client.HClient;
-import me.prettyprint.cassandra.service.CassandraHost;
-import me.prettyprint.cassandra.service.CassandraHostConfigurator;
-import me.prettyprint.cassandra.service.FailoverPolicy;
-import me.prettyprint.cassandra.service.Operation;
-import me.prettyprint.cassandra.service.OperationType;
+import me.prettyprint.cassandra.connection.factory.HClientFactory;
+import me.prettyprint.cassandra.service.*;
+import me.prettyprint.hector.api.exceptions.HPoolExhaustedException;
import me.prettyprint.hector.api.exceptions.HTimedOutException;
import me.prettyprint.hector.api.exceptions.HectorException;
import org.apache.cassandra.thrift.Cassandra.Client;
+import org.apache.commons.lang.mutable.MutableBoolean;
import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
public class HConnectionManagerTest extends BaseEmbededServerSetupTest {
@@ -68,7 +73,67 @@ public void testTimedOutOperateWithFailover() {
FailoverPolicy fp = FailoverPolicy.ON_FAIL_TRY_ONE_NEXT_AVAILABLE;
connectionManager.operateWithFailover(new TimeoutOp(fp));
}
-
+
+ @Test
+ public void clientPoolShouldBeSuspendedWhenExhaustedForTooLong() throws InterruptedException {
+
+ final int maxActive = 5;
+
+ CassandraHostConfigurator configurator = new CassandraHostConfigurator("127.0.0.1:9170");
+ configurator.setClientFactoryClass(TestClientFactory.class.getName());
+ configurator.setMaxActive(maxActive);
+ configurator.setMaxWaitTimeWhenExhausted(50);
+ configurator.setMaxExhaustedTimeBeforeSuspending(0);
+
+ final HConnectionManager connectionManager = new HConnectionManager("TestCluster", configurator);
+ CassandraHost host = connectionManager.getHosts().iterator().next();
+ ConnectionManagerListener listener = mock(ConnectionManagerListener.class);
+ final MutableBoolean wasSuspended = new MutableBoolean();
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ wasSuspended.setValue(true);
+ return null;
+ }
+ }).when(listener).onSuspendHost(host, true);
+ connectionManager.addListener("TestListener", listener);
+
+ ExecutorService exec = Executors.newCachedThreadPool();
+ final CountDownLatch latch = new CountDownLatch(1);
+ for (int i = 0; i < maxActive + 1; i++) {
+ exec.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ connectionManager.operateWithFailover(new InfiniteOp(FailoverPolicy.FAIL_FAST));
+ } catch (HPoolExhaustedException e) {
+ latch.countDown();
+ }
+ }
+ });
+ Thread.sleep(50);
+ }
+
+ latch.await();
+
+ assertTrue(wasSuspended.booleanValue());
+
+ exec.shutdownNow();
+ }
+
+ public static class TestClientFactory implements HClientFactory {
+
+ @Override
+ public HClient createClient(final CassandraHost ch) {
+ HClient client = mock(HClient.class);
+ when(client.close()).thenReturn(client);
+ when(client.open()).thenReturn(client);
+ when(client.isOpen()).thenReturn(true);
+ when(client.getCassandraHost()).thenReturn(ch);
+ return client;
+ }
+ }
+
abstract class StubOp extends Operation<String> {
StubOp(FailoverPolicy fp) {
@@ -92,4 +157,21 @@ public String execute(Client cassandra) throws HectorException {
throw new HTimedOutException("fake timeout");
}
}
+
+ class InfiniteOp extends StubOp {
+
+ InfiniteOp(FailoverPolicy fp) {
+ super(fp);
+ }
+
+ @Override
+ public String execute(Client cassandra) throws HectorException {
+ try {
+ Thread.sleep(Long.MAX_VALUE);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ return "";
+ }
+ }
}

0 comments on commit 04673ff

Please sign in to comment.