Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

added limit on how long client pool can be exhausted

  • Loading branch information...
commit 024a32d5fd30d4039f1c7489077b9910404ba1bf 1 parent eaf1e78
@shaunkalley shaunkalley authored Shaun Kalley committed
View
59 core/src/main/java/me/prettyprint/cassandra/connection/HConnectionManager.java
@@ -130,8 +130,8 @@ public boolean removeCassandraHost(CassandraHost cassandraHost) {
boolean removed = getHosts().contains(cassandraHost);
String message;
if ( removed ) {
- HClientPool pool = hostPools.remove(cassandraHost);
- message = "Removed from hostPools";
+ HClientPool pool = hostPools.remove(cassandraHost);
+ message = "Removed from hostPools";
if ( pool == null ) {
log.info("removeCassandraHost looking for host {} in suspendedHostPools", cassandraHost);
pool = suspendedHostPools.remove(cassandraHost);
@@ -145,12 +145,12 @@ public boolean removeCassandraHost(CassandraHost cassandraHost) {
log.info("removeCassandraHost attempt miss for CassandraHost {} May have been beaten by another thread?", cassandraHost);
}
} else if ( cassandraHostRetryService != null && cassandraHostRetryService.contains(cassandraHost)) {
- log.info("Host {} not in active pools, but found in retry service.", cassandraHost);
- removed = cassandraHostRetryService.remove(cassandraHost);
- message = "Removed from Downed hosts";
+ log.info("Host {} not in active pools, but found in retry service.", cassandraHost);
+ removed = cassandraHostRetryService.remove(cassandraHost);
+ message = "Removed from Downed hosts";
} else {
- message = "Host not found";
- log.info("Remove requested on a host that was not found in active or disabled pools: {}", cassandraHost);
+ message = "Host not found";
+ log.info("Remove requested on a host that was not found in active or disabled pools: {}", cassandraHost);
}
log.info("Remove status for CassandraHost pool {} was {}", cassandraHost, removed);
listenerHandler.fireOnRemoveHost(cassandraHost, removed, message);
@@ -211,7 +211,7 @@ public boolean unsuspendCassandraHost(CassandraHost cassandraHost) {
public List<String> getStatusPerPool() {
List<String> stats = new ArrayList<String>();
for (HClientPool clientPool : hostPools.values()) {
- stats.add(clientPool.getStatusAsString());
+ stats.add(clientPool.getStatusAsString());
}
return stats;
}
@@ -272,12 +272,9 @@ public void operateWithFailover(Operation<?> op) throws HectorException {
// TODO timecheck on how long we've been waiting on timeouts here
// suggestion per user moores on hector-users
} else if (he instanceof HPoolExhaustedException) {
- if (pool.getExhaustedTime() >= pool.getCassandraHost().getMaxExhaustedTimeBeforeSuspending()) {
- if (suspendCassandraHost(pool.getCassandraHost())) {
- log.warn("Client pool for {} was exhausted for {} ms and was suspended", pool.getCassandraHost(), pool.getExhaustedTime());
- } else {
- log.warn("Client pool for {} was exhausted for {} ms but could not be suspended", pool.getCassandraHost(), pool.getExhaustedTime());
- }
+ if (pool.getExhaustedTime() >= pool.getCassandraHost().getMaxExhaustedTimeBeforeMarkingAsDown()) {
+ markHostAsDown(pool.getCassandraHost());
+ log.warn("Client pool for {} was exhausted for {} ms and was marked as down", pool.getCassandraHost(), pool.getExhaustedTime());
}
if (hostPools.isEmpty()) {
throw he;
@@ -338,7 +335,7 @@ public void setTimer(HOpTimer timer) {
* @param listener - a {@link me.prettyprint.cassandra.connection.ConnectionManagerListener} listener
*/
public void addListener(String listenerName, ConnectionManagerListener listener){
- listenerHandler.put(listenerName, listener);
+ listenerHandler.put(listenerName, listener);
}
/**
@@ -346,14 +343,14 @@ public void addListener(String listenerName, ConnectionManagerListener listener)
* @param listenerName - the name of the listener to remove
*/
public void removeListener(String listenerName){
- listenerHandler.remove(listenerName);
+ listenerHandler.remove(listenerName);
}
/**
* removes all listeners from the connectionManager
*/
public void removeAllListeners(){
- listenerHandler.clear();
+ listenerHandler.clear();
}
/**
@@ -370,22 +367,22 @@ private void doTimeoutCheck(CassandraHost cassandraHost) {
}
/**
- * Sleeps for the specified time as determined by sleepBetweenHostsMilli.
- * In many cases failing over to other hosts is done b/c the cluster is too busy, so the sleep b/w
- * hosts may help reduce load on the cluster.
- */
- private void sleepBetweenHostSkips(FailoverPolicy failoverPolicy) {
- if (failoverPolicy.sleepBetweenHostsMilli > 0) {
- if ( log.isDebugEnabled() ) {
- log.debug("Will sleep for {} millisec", failoverPolicy.sleepBetweenHostsMilli);
- }
- try {
- Thread.sleep(failoverPolicy.sleepBetweenHostsMilli);
- } catch (InterruptedException e) {
- log.warn("Sleep between hosts interrupted", e);
- }
+ * Sleeps for the specified time as determined by sleepBetweenHostsMilli.
+ * In many cases failing over to other hosts is done b/c the cluster is too busy, so the sleep b/w
+ * hosts may help reduce load on the cluster.
+ */
+ private void sleepBetweenHostSkips(FailoverPolicy failoverPolicy) {
+ if (failoverPolicy.sleepBetweenHostsMilli > 0) {
+ if ( log.isDebugEnabled() ) {
+ log.debug("Will sleep for {} millisec", failoverPolicy.sleepBetweenHostsMilli);
+ }
+ try {
+ Thread.sleep(failoverPolicy.sleepBetweenHostsMilli);
+ } catch (InterruptedException e) {
+ log.warn("Sleep between hosts interrupted", e);
}
}
+ }
private HClientPool getClientFromLBPolicy(Set<CassandraHost> excludeHosts) {
if ( hostPools.isEmpty() ) {
View
6 core/src/main/java/me/prettyprint/cassandra/jndi/CassandraClientJndiResourceFactory.java
@@ -93,7 +93,7 @@ private void configure(Reference resourceRef) throws Exception {
// optional
RefAddr maxActiveRefAddr = resourceRef.get("maxActive");
RefAddr maxWaitTimeWhenExhausted = resourceRef.get("maxWaitTimeWhenExhausted");
- RefAddr maxExhaustedTimeBeforeSuspending = resourceRef.get("maxExhaustedTimeBeforeSuspending");
+ RefAddr maxExhaustedTimeBeforeMarkingAsDown = resourceRef.get("maxExhaustedTimeBeforeMarkingAsDown");
RefAddr autoDiscoverHosts = resourceRef.get("autoDiscoverHosts");
RefAddr runAutoDiscoverAtStartup = resourceRef.get("runAutoDiscoveryAtStartup");
RefAddr retryDownedHostDelayInSeconds = resourceRef.get("retryDownedHostDelayInSeconds");
@@ -120,8 +120,8 @@ private void configure(Reference resourceRef) throws Exception {
cassandraHostConfigurator.setMaxActive(Integer.parseInt((String)maxActiveRefAddr.getContent()));
if ( maxWaitTimeWhenExhausted != null )
cassandraHostConfigurator.setMaxWaitTimeWhenExhausted(Integer.parseInt((String)maxWaitTimeWhenExhausted.getContent()));
- if (maxExhaustedTimeBeforeSuspending != null) {
- cassandraHostConfigurator.setMaxExhaustedTimeBeforeSuspending(Integer.parseInt((String) maxExhaustedTimeBeforeSuspending.getContent()));
+ if (maxExhaustedTimeBeforeMarkingAsDown != null) {
+ cassandraHostConfigurator.setMaxExhaustedTimeBeforeMarkingAsDown(Integer.parseInt((String) maxExhaustedTimeBeforeMarkingAsDown.getContent()));
}
if ( log.isDebugEnabled() )
View
12 core/src/main/java/me/prettyprint/cassandra/service/CassandraHost.java
@@ -43,7 +43,7 @@
* The default max exhausted time before suspending. Default value is set to
* maximum so that it won't suspend.
*/
- public static final long DEFAULT_MAX_EXHAUSTED_TIME_BEFORE_SUSPENDING = Long.MAX_VALUE;
+ public static final long DEFAULT_MAX_EXHAUSTED_TIME_BEFORE_MARKING_AS_DOWN = Long.MAX_VALUE;
public static final boolean DEFAULT_LIFO = true;
/**
@@ -66,7 +66,7 @@
private boolean lifo = DEFAULT_LIFO;
private long maxWaitTimeWhenExhausted = DEFAULT_MAX_WAITTIME_WHEN_EXHAUSTED;
- private long maxExhaustedTimeBeforeSuspending = DEFAULT_MAX_EXHAUSTED_TIME_BEFORE_SUSPENDING;
+ private long maxExhaustedTimeBeforeMarkingAsDown = DEFAULT_MAX_EXHAUSTED_TIME_BEFORE_MARKING_AS_DOWN;
private int cassandraThriftSocketTimeout;
private boolean useThriftFramedTransport = DEFAULT_USE_FRAMED_THRIFT_TRANSPORT;
private int maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
@@ -175,12 +175,12 @@ public void setMaxWaitTimeWhenExhausted(long maxWaitTimeWhenExhausted) {
this.maxWaitTimeWhenExhausted = maxWaitTimeWhenExhausted;
}
- public long getMaxExhaustedTimeBeforeSuspending() {
- return maxExhaustedTimeBeforeSuspending;
+ public long getMaxExhaustedTimeBeforeMarkingAsDown() {
+ return maxExhaustedTimeBeforeMarkingAsDown;
}
- public void setMaxExhaustedTimeBeforeSuspending(long maxExhaustedTimeBeforeSuspending) {
- this.maxExhaustedTimeBeforeSuspending = maxExhaustedTimeBeforeSuspending;
+ public void setMaxExhaustedTimeBeforeMarkingAsDown(long maxExhaustedTimeBeforeMarkingAsDown) {
+ this.maxExhaustedTimeBeforeMarkingAsDown = maxExhaustedTimeBeforeMarkingAsDown;
}
public int getCassandraThriftSocketTimeout() {
View
12 core/src/main/java/me/prettyprint/cassandra/service/CassandraHostConfigurator.java
@@ -25,7 +25,7 @@
private int maxActive = CassandraHost.DEFAULT_MAX_ACTIVE;
private boolean lifo = CassandraHost.DEFAULT_LIFO;
private long maxWaitTimeWhenExhausted = CassandraHost.DEFAULT_MAX_WAITTIME_WHEN_EXHAUSTED;
- private long maxExhaustedTimeBeforeSuspending = CassandraHost.DEFAULT_MAX_EXHAUSTED_TIME_BEFORE_SUSPENDING;
+ private long maxExhaustedTimeBeforeMarkingAsDown = CassandraHost.DEFAULT_MAX_EXHAUSTED_TIME_BEFORE_MARKING_AS_DOWN;
private int cassandraThriftSocketTimeout;
private boolean useThriftFramedTransport = CassandraHost.DEFAULT_USE_FRAMED_THRIFT_TRANSPORT;
private int maxFrameSize = CassandraHost.DEFAULT_MAX_FRAME_SIZE;
@@ -83,7 +83,7 @@ public void applyConfig(CassandraHost cassandraHost) {
cassandraHost.setMaxActive(maxActive);
cassandraHost.setLifo(lifo);
cassandraHost.setMaxWaitTimeWhenExhausted(maxWaitTimeWhenExhausted);
- cassandraHost.setMaxExhaustedTimeBeforeSuspending(maxExhaustedTimeBeforeSuspending);
+ cassandraHost.setMaxExhaustedTimeBeforeMarkingAsDown(maxExhaustedTimeBeforeMarkingAsDown);
cassandraHost.setUseThriftFramedTransport(useThriftFramedTransport);
cassandraHost.setMaxFrameSize(maxFrameSize);
cassandraHost.setUseSocketKeepalive(useSocketKeepalive);
@@ -114,8 +114,8 @@ public void setMaxWaitTimeWhenExhausted(long maxWaitTimeWhenExhausted) {
this.maxWaitTimeWhenExhausted = maxWaitTimeWhenExhausted;
}
- public void setMaxExhaustedTimeBeforeSuspending(long maxExhaustedTimeBeforeSuspending) {
- this.maxExhaustedTimeBeforeSuspending = maxExhaustedTimeBeforeSuspending;
+ public void setMaxExhaustedTimeBeforeMarkingAsDown(long maxExhaustedTimeBeforeMarkingAsDown) {
+ this.maxExhaustedTimeBeforeMarkingAsDown = maxExhaustedTimeBeforeMarkingAsDown;
}
/**
@@ -178,8 +178,8 @@ public String toString() {
s.append(cassandraThriftSocketTimeout);
s.append("&maxWaitTimeWhenExhausted=");
s.append(maxWaitTimeWhenExhausted);
- s.append("&maxExhaustedTimeBeforeSuspending=");
- s.append(maxExhaustedTimeBeforeSuspending);
+ s.append("&maxExhaustedTimeBeforeMarkingAsDown=");
+ s.append(maxExhaustedTimeBeforeMarkingAsDown);
s.append("&maxActive=");
s.append(maxActive);
s.append("&hosts=");
View
17 core/src/test/java/me/prettyprint/cassandra/connection/HConnectionManagerTest.java
@@ -1,8 +1,5 @@
package me.prettyprint.cassandra.connection;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -21,6 +18,9 @@
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
public class HConnectionManagerTest extends BaseEmbededServerSetupTest {
@Test
@@ -83,19 +83,20 @@ public void clientPoolShouldBeSuspendedWhenExhaustedForTooLong() throws Interrup
configurator.setClientFactoryClass(TestClientFactory.class.getName());
configurator.setMaxActive(maxActive);
configurator.setMaxWaitTimeWhenExhausted(50);
- configurator.setMaxExhaustedTimeBeforeSuspending(0);
+ configurator.setMaxExhaustedTimeBeforeMarkingAsDown(0);
+ configurator.setRetryDownedHosts(false);
final HConnectionManager connectionManager = new HConnectionManager("TestCluster", configurator);
CassandraHost host = connectionManager.getHosts().iterator().next();
ConnectionManagerListener listener = mock(ConnectionManagerListener.class);
- final MutableBoolean wasSuspended = new MutableBoolean();
+ final MutableBoolean wasRemoved = new MutableBoolean();
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
- wasSuspended.setValue(true);
+ wasRemoved.setValue(true);
return null;
}
- }).when(listener).onSuspendHost(host, true);
+ }).when(listener).onHostDown(host);
connectionManager.addListener("TestListener", listener);
ExecutorService exec = Executors.newCachedThreadPool();
@@ -116,7 +117,7 @@ public void run() {
latch.await();
- assertTrue(wasSuspended.booleanValue());
+ assertTrue(wasRemoved.booleanValue());
exec.shutdownNow();
}
View
4 core/src/test/java/me/prettyprint/cassandra/service/CassandraHostConfiguratorTest.java
@@ -43,13 +43,13 @@ public void testConfigValuesPropogated() {
cassandraHostConfigurator.setMaxActive(20);
cassandraHostConfigurator.setCassandraThriftSocketTimeout(3000);
cassandraHostConfigurator.setMaxWaitTimeWhenExhausted(4000);
- cassandraHostConfigurator.setMaxExhaustedTimeBeforeSuspending(5000);
+ cassandraHostConfigurator.setMaxExhaustedTimeBeforeMarkingAsDown(5000);
CassandraHost[] cassandraHosts = cassandraHostConfigurator.buildCassandraHosts();
// no need to test all, just a smattering
assertEquals(20, cassandraHosts[1].getMaxActive());
assertEquals(20, cassandraHosts[0].getMaxActive());
assertEquals(4000, cassandraHosts[1].getMaxWaitTimeWhenExhausted());
- assertEquals(5000, cassandraHosts[0].getMaxExhaustedTimeBeforeSuspending());
+ assertEquals(5000, cassandraHosts[0].getMaxExhaustedTimeBeforeMarkingAsDown());
assertEquals(3000, cassandraHosts[2].getCassandraThriftSocketTimeout());
assertEquals(3000, cassandraHosts[0].getCassandraThriftSocketTimeout());
}
Please sign in to comment.
Something went wrong with that request. Please try again.