Permalink
Browse files

Merge pull request #633 from shaunkalley/ImprovedConnectionManager

added limit on how long client pool can be exhausted
  • Loading branch information...
2 parents ebcbb9e + 5d0c279 commit 19d0a0ba9f2705c3ce4ccd37b81d5dce5755aa65 @zznate zznate committed Sep 6, 2013
@@ -6,12 +6,13 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import me.prettyprint.cassandra.connection.client.HClient;
import me.prettyprint.cassandra.connection.factory.HClientFactory;
-import me.prettyprint.cassandra.service.CassandraHost;
import me.prettyprint.cassandra.service.CassandraClientMonitor;
import me.prettyprint.cassandra.service.CassandraClientMonitor.Counter;
+import me.prettyprint.cassandra.service.CassandraHost;
import me.prettyprint.hector.api.exceptions.HInactivePoolException;
import me.prettyprint.hector.api.exceptions.HPoolExhaustedException;
import me.prettyprint.hector.api.exceptions.HectorException;
@@ -27,6 +28,7 @@
private final ArrayBlockingQueue<HClient> availableClientQueue;
private final AtomicInteger activeClientsCount;
private final AtomicInteger realActiveClientsCount;
+ private final AtomicLong exhaustedStartTime;
private final CassandraHost cassandraHost;
@@ -49,6 +51,7 @@ public ConcurrentHClientPool(HClientFactory clientFactory, CassandraHost host, C
// This counter can be offset by as much as the number of threads.
activeClientsCount = new AtomicInteger(0);
realActiveClientsCount = new AtomicInteger(0);
+ exhaustedStartTime = new AtomicLong(-1);
numBlocked = new AtomicInteger();
active = new AtomicBoolean(true);
@@ -119,6 +122,9 @@ public HClient borrowClient() throws HectorException {
}
realActiveClientsCount.incrementAndGet();
+ if (isExhausted()) {
+ exhaustedStartTime.set(System.currentTimeMillis());
+ }
return cassandraClient;
}
@@ -246,6 +252,12 @@ public boolean getIsActive() {
return active.get();
}
+ @Override
+ public long getExhaustedTime() {
+ long startTime = exhaustedStartTime.get();
+ return (startTime == -1) ? -1 : System.currentTimeMillis() - startTime;
+ }
+
@Override
public String getStatusAsString() {
return String.format("%s; IsActive?: %s; Active: %d; Blocked: %d; Idle: %d; NumBeforeExhausted: %d",
@@ -275,6 +287,7 @@ public void releaseClient(HClient client) throws HectorException {
}
realActiveClientsCount.decrementAndGet();
+ exhaustedStartTime.set(-1);
activeClientsCount.decrementAndGet();
if ( log.isTraceEnabled() ) {
@@ -78,7 +78,7 @@ public HConnectionManager(String clusterName, CassandraHostConfigurator cassandr
}
public void doAddNodes() {
- new NodeDiscovery(cassandraHostConfigurator, this).doAddNodes();
+ new NodeDiscovery(cassandraHostConfigurator, this).doAddNodes();
}
/**
@@ -130,8 +130,8 @@ public boolean removeCassandraHost(CassandraHost cassandraHost) {
boolean removed = getHosts().contains(cassandraHost);
String message;
if ( removed ) {
- HClientPool pool = hostPools.remove(cassandraHost);
- message = "Removed from hostPools";
+ HClientPool pool = hostPools.remove(cassandraHost);
+ message = "Removed from hostPools";
if ( pool == null ) {
log.info("removeCassandraHost looking for host {} in suspendedHostPools", cassandraHost);
pool = suspendedHostPools.remove(cassandraHost);
@@ -145,12 +145,12 @@ public boolean removeCassandraHost(CassandraHost cassandraHost) {
log.info("removeCassandraHost attempt miss for CassandraHost {} May have been beaten by another thread?", cassandraHost);
}
} else if ( cassandraHostRetryService != null && cassandraHostRetryService.contains(cassandraHost)) {
- log.info("Host {} not in active pools, but found in retry service.", cassandraHost);
- removed = cassandraHostRetryService.remove(cassandraHost);
- message = "Removed from Downed hosts";
+ log.info("Host {} not in active pools, but found in retry service.", cassandraHost);
+ removed = cassandraHostRetryService.remove(cassandraHost);
+ message = "Removed from Downed hosts";
} else {
- message = "Host not found";
- log.info("Remove requested on a host that was not found in active or disabled pools: {}", cassandraHost);
+ message = "Host not found";
+ log.info("Remove requested on a host that was not found in active or disabled pools: {}", cassandraHost);
}
log.info("Remove status for CassandraHost pool {} was {}", cassandraHost, removed);
listenerHandler.fireOnRemoveHost(cassandraHost, removed, message);
@@ -211,14 +211,14 @@ public boolean unsuspendCassandraHost(CassandraHost cassandraHost) {
public List<String> getStatusPerPool() {
List<String> stats = new ArrayList<String>();
for (HClientPool clientPool : hostPools.values()) {
- stats.add(clientPool.getStatusAsString());
+ stats.add(clientPool.getStatusAsString());
}
return stats;
}
public void operateWithFailover(Operation<?> op) throws HectorException {
- final Object timerToken = timer.start(op.stopWatchTagName);
+ final Object timerToken = timer.start(op.stopWatchTagName);
int retries = Math.min(op.failoverPolicy.numRetries, hostPools.size());
HClient client = null;
HClientPool pool = null;
@@ -272,6 +272,17 @@ public void operateWithFailover(Operation<?> op) throws HectorException {
client.close();
// TODO timecheck on how long we've been waiting on timeouts here
// suggestion per user moores on hector-users
+ } else if (he instanceof HPoolExhaustedException) {
+ if (pool.getExhaustedTime() >= pool.getCassandraHost().getMaxExhaustedTimeBeforeMarkingAsDown()) {
+ markHostAsDown(pool.getCassandraHost());
+ log.warn("Client pool for {} was exhausted for {} ms and was marked as down", pool.getCassandraHost(), pool.getExhaustedTime());
+ }
+ if (hostPools.isEmpty()) {
+ throw he;
+ }
+ excludeHosts.add(pool.getCassandraHost());
+ retryable = op.failoverPolicy.shouldRetryFor(HPoolExhaustedException.class);
+ monitor.incCounter(Counter.POOL_EXHAUSTED);
} else if ( he instanceof HPoolRecoverableException ) {
retryable = op.failoverPolicy.shouldRetryFor(HPoolRecoverableException.class);;
if ( hostPools.size() == 1 ) {
@@ -336,22 +347,22 @@ public void setTimer(HOpTimer timer) {
* @param listener - a {@link me.prettyprint.cassandra.connection.ConnectionManagerListener} listener
*/
public void addListener(String listenerName, ConnectionManagerListener listener){
- listenerHandler.put(listenerName, listener);
+ listenerHandler.put(listenerName, listener);
}
/**
* removes a listener from the connectionManager
* @param listenerName - the name of the listener to remove
*/
public void removeListener(String listenerName){
- listenerHandler.remove(listenerName);
+ listenerHandler.remove(listenerName);
}
/**
* removes all listeners from the connectionManager
*/
public void removeAllListeners(){
- listenerHandler.clear();
+ listenerHandler.clear();
}
/**
@@ -368,22 +379,22 @@ private void doTimeoutCheck(CassandraHost cassandraHost) {
}
/**
- * Sleeps for the specified time as determined by sleepBetweenHostsMilli.
- * In many cases failing over to other hosts is done b/c the cluster is too busy, so the sleep b/w
- * hosts may help reduce load on the cluster.
- */
- private void sleepBetweenHostSkips(FailoverPolicy failoverPolicy) {
- if (failoverPolicy.sleepBetweenHostsMilli > 0) {
- if ( log.isDebugEnabled() ) {
- log.debug("Will sleep for {} millisec", failoverPolicy.sleepBetweenHostsMilli);
- }
- try {
- Thread.sleep(failoverPolicy.sleepBetweenHostsMilli);
- } catch (InterruptedException e) {
- log.warn("Sleep between hosts interrupted", e);
- }
+ * Sleeps for the specified time as determined by sleepBetweenHostsMilli.
+ * In many cases failing over to other hosts is done b/c the cluster is too busy, so the sleep b/w
+ * hosts may help reduce load on the cluster.
+ */
+ private void sleepBetweenHostSkips(FailoverPolicy failoverPolicy) {
+ if (failoverPolicy.sleepBetweenHostsMilli > 0) {
+ if ( log.isDebugEnabled() ) {
+ log.debug("Will sleep for {} millisec", failoverPolicy.sleepBetweenHostsMilli);
+ }
+ try {
+ Thread.sleep(failoverPolicy.sleepBetweenHostsMilli);
+ } catch (InterruptedException e) {
+ log.warn("Sleep between hosts interrupted", e);
}
}
+ }
private HClientPool getClientFromLBPolicy(Set<CassandraHost> excludeHosts) {
if ( hostPools.isEmpty() ) {
@@ -7,5 +7,6 @@
int getNumBlockedThreads();
String getName();
boolean getIsActive();
-
+ long getExhaustedTime();
+
}
@@ -8,14 +8,14 @@
import javax.naming.Reference;
import javax.naming.spi.ObjectFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import me.prettyprint.cassandra.service.CassandraHostConfigurator;
import me.prettyprint.hector.api.Cluster;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.factory.HFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* A factory for JNDI Resource managed objects. Responsible for creating a
* {@link Keyspace} references for passing to {@link HFactory}.
@@ -93,6 +93,7 @@ private void configure(Reference resourceRef) throws Exception {
// optional
RefAddr maxActiveRefAddr = resourceRef.get("maxActive");
RefAddr maxWaitTimeWhenExhausted = resourceRef.get("maxWaitTimeWhenExhausted");
+ RefAddr maxExhaustedTimeBeforeMarkingAsDown = resourceRef.get("maxExhaustedTimeBeforeMarkingAsDown");
RefAddr autoDiscoverHosts = resourceRef.get("autoDiscoverHosts");
RefAddr runAutoDiscoverAtStartup = resourceRef.get("runAutoDiscoveryAtStartup");
RefAddr retryDownedHostDelayInSeconds = resourceRef.get("retryDownedHostDelayInSeconds");
@@ -119,6 +120,9 @@ private void configure(Reference resourceRef) throws Exception {
cassandraHostConfigurator.setMaxActive(Integer.parseInt((String)maxActiveRefAddr.getContent()));
if ( maxWaitTimeWhenExhausted != null )
cassandraHostConfigurator.setMaxWaitTimeWhenExhausted(Integer.parseInt((String)maxWaitTimeWhenExhausted.getContent()));
+ if (maxExhaustedTimeBeforeMarkingAsDown != null) {
+ cassandraHostConfigurator.setMaxExhaustedTimeBeforeMarkingAsDown(Integer.parseInt((String) maxExhaustedTimeBeforeMarkingAsDown.getContent()));
+ }
if ( log.isDebugEnabled() )
log.debug("JNDI resource created with CassandraHostConfiguration: {}", cassandraHostConfigurator.getAutoDiscoverHosts());
@@ -39,14 +39,20 @@
*/
public static final long DEFAULT_MAX_WAITTIME_WHEN_EXHAUSTED = -1;
+ /**
+ * The default max exhausted time before suspending. Default value is set to
+ * maximum so that it won't suspend.
+ */
+ public static final long DEFAULT_MAX_EXHAUSTED_TIME_BEFORE_MARKING_AS_DOWN = Long.MAX_VALUE;
+
public static final boolean DEFAULT_LIFO = true;
/**
- * The default number of milliseconds (since creation time) we'll allow a connection
+ * The default number of milliseconds (since creation time) we'll allow a connection
* to stay open. Default value is negative which means indefinitely.
*/
public static final long DEFAULT_MAX_CONNECT_TIME = -1;
/**
- * The default number of milliseconds (since last success) we'll allow a connection
+ * The default number of milliseconds (since last success) we'll allow a connection
* to stay open. Default value is negative which means indefinitely.
*/
public static final long DEFAULT_MAX_LAST_SUCCESS_TIME = -1;
@@ -60,6 +66,7 @@
private boolean lifo = DEFAULT_LIFO;
private long maxWaitTimeWhenExhausted = DEFAULT_MAX_WAITTIME_WHEN_EXHAUSTED;
+ private long maxExhaustedTimeBeforeMarkingAsDown = DEFAULT_MAX_EXHAUSTED_TIME_BEFORE_MARKING_AS_DOWN;
private int cassandraThriftSocketTimeout;
private boolean useThriftFramedTransport = DEFAULT_USE_FRAMED_THRIFT_TRANSPORT;
private int maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
@@ -168,6 +175,14 @@ public void setMaxWaitTimeWhenExhausted(long maxWaitTimeWhenExhausted) {
this.maxWaitTimeWhenExhausted = maxWaitTimeWhenExhausted;
}
+ public long getMaxExhaustedTimeBeforeMarkingAsDown() {
+ return maxExhaustedTimeBeforeMarkingAsDown;
+ }
+
+ public void setMaxExhaustedTimeBeforeMarkingAsDown(long maxExhaustedTimeBeforeMarkingAsDown) {
+ this.maxExhaustedTimeBeforeMarkingAsDown = maxExhaustedTimeBeforeMarkingAsDown;
+ }
+
public int getCassandraThriftSocketTimeout() {
return cassandraThriftSocketTimeout;
}
@@ -232,5 +247,5 @@ public void setMaxLastSuccessTimeMillis(long maxLastSuccessTimeMillis) {
this.maxLastSuccessTimeMillis = maxLastSuccessTimeMillis;
}
-
+
}
@@ -4,17 +4,12 @@
import java.util.Arrays;
import java.util.List;
-import me.prettyprint.cassandra.connection.CassandraHostRetryService;
-import me.prettyprint.cassandra.connection.HOpTimer;
-import me.prettyprint.cassandra.connection.HostTimeoutTracker;
-import me.prettyprint.cassandra.connection.LoadBalancingPolicy;
-import me.prettyprint.cassandra.connection.NodeAutoDiscoverService;
-import me.prettyprint.cassandra.connection.NullOpTimer;
-import me.prettyprint.cassandra.connection.RoundRobinBalancingPolicy;
+import me.prettyprint.cassandra.connection.*;
import me.prettyprint.cassandra.connection.factory.HClientFactory;
import me.prettyprint.cassandra.connection.factory.HThriftClientFactoryImpl;
import me.prettyprint.hector.api.ClockResolution;
import me.prettyprint.hector.api.factory.HFactory;
+
import org.apache.commons.lang.StringUtils;
@@ -30,6 +25,7 @@
private int maxActive = CassandraHost.DEFAULT_MAX_ACTIVE;
private boolean lifo = CassandraHost.DEFAULT_LIFO;
private long maxWaitTimeWhenExhausted = CassandraHost.DEFAULT_MAX_WAITTIME_WHEN_EXHAUSTED;
+ private long maxExhaustedTimeBeforeMarkingAsDown = CassandraHost.DEFAULT_MAX_EXHAUSTED_TIME_BEFORE_MARKING_AS_DOWN;
private int cassandraThriftSocketTimeout;
private boolean useThriftFramedTransport = CassandraHost.DEFAULT_USE_FRAMED_THRIFT_TRANSPORT;
private int maxFrameSize = CassandraHost.DEFAULT_MAX_FRAME_SIZE;
@@ -87,6 +83,7 @@ public void applyConfig(CassandraHost cassandraHost) {
cassandraHost.setMaxActive(maxActive);
cassandraHost.setLifo(lifo);
cassandraHost.setMaxWaitTimeWhenExhausted(maxWaitTimeWhenExhausted);
+ cassandraHost.setMaxExhaustedTimeBeforeMarkingAsDown(maxExhaustedTimeBeforeMarkingAsDown);
cassandraHost.setUseThriftFramedTransport(useThriftFramedTransport);
cassandraHost.setMaxFrameSize(maxFrameSize);
cassandraHost.setUseSocketKeepalive(useSocketKeepalive);
@@ -117,6 +114,10 @@ public void setMaxWaitTimeWhenExhausted(long maxWaitTimeWhenExhausted) {
this.maxWaitTimeWhenExhausted = maxWaitTimeWhenExhausted;
}
+ public void setMaxExhaustedTimeBeforeMarkingAsDown(long maxExhaustedTimeBeforeMarkingAsDown) {
+ this.maxExhaustedTimeBeforeMarkingAsDown = maxExhaustedTimeBeforeMarkingAsDown;
+ }
+
/**
* The value (in milliseconds) which gets passed down to {@link java.net.Socket#setSoTimeout(int)}
* used by the underlying Thrift transport.
@@ -177,6 +178,8 @@ public String toString() {
s.append(cassandraThriftSocketTimeout);
s.append("&maxWaitTimeWhenExhausted=");
s.append(maxWaitTimeWhenExhausted);
+ s.append("&maxExhaustedTimeBeforeMarkingAsDown=");
+ s.append(maxExhaustedTimeBeforeMarkingAsDown);
s.append("&maxActive=");
s.append(maxActive);
s.append("&hosts=");
Oops, something went wrong.

0 comments on commit 19d0a0b

Please sign in to comment.