Skip to content

Commit

Permalink
merged in changes from concurrent
Browse files Browse the repository at this point in the history
  • Loading branch information
zznate committed Oct 29, 2010
2 parents 46bbdbd + 4892fc8 commit dee4007
Show file tree
Hide file tree
Showing 61 changed files with 1,187 additions and 3,721 deletions.
3 changes: 1 addition & 2 deletions pom.xml
Expand Up @@ -5,7 +5,7 @@
<artifactId>hector</artifactId>
<packaging>bundle</packaging>
<!-- The version follows Cassandra's major version changes, e.g. 0.5.1 goes with the 0.5.1 cassandra release-->
<version>0.7.0-18</version>
<version>0.7.0-19_10252010</version>
<name>hector</name>
<description>Cassandra Java Client Library</description>
<url>http://github.com/rantav/hector</url>
Expand Down Expand Up @@ -280,7 +280,6 @@
<groupId>org.cliffc.high_scale_lib</groupId>
<artifactId>high-scale-lib</artifactId>
<version>1.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
Expand Down
@@ -1,14 +1,19 @@
package me.prettyprint.cassandra.service;
package me.prettyprint.cassandra.connection;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import me.prettyprint.cassandra.service.CassandraHost;
import me.prettyprint.cassandra.service.CassandraHostConfigurator;

import org.apache.cassandra.thrift.Cassandra;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
Expand All @@ -17,27 +22,27 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DownCassandraHostRetryService implements DownCassandraHostRetryServiceMBean {
public class CassandraHostRetryService {

private static Logger log = LoggerFactory.getLogger(DownCassandraHostRetryService.class);
private static Logger log = LoggerFactory.getLogger(CassandraHostRetryService.class);

public static final int DEF_QUEUE_SIZE = 3;
public static final int DEF_RETRY_DELAY = 30;
public static final int DEF_RETRY_DELAY = 10;
private LinkedBlockingQueue<CassandraHost> downedHostQueue;

private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

private final CassandraClientPool cassandraClientPool;
private final HConnectionManager connectionManager;

private ScheduledFuture<CassandraHost> sf;
private ScheduledFuture sf;
private int retryDelayInSeconds = DEF_RETRY_DELAY;

public DownCassandraHostRetryService(CassandraClientPool cassandraClientPool,
public CassandraHostRetryService(HConnectionManager connectionManager,
CassandraHostConfigurator cassandraHostConfigurator) {
this.cassandraClientPool = cassandraClientPool;
this.connectionManager = connectionManager;
this.retryDelayInSeconds = cassandraHostConfigurator.getRetryDownedHostsDelayInSeconds();
downedHostQueue = new LinkedBlockingQueue<CassandraHost>(cassandraHostConfigurator.getRetryDownedHostsQueueSize());
sf = executor.schedule(new RetryRunner(), this.retryDelayInSeconds, TimeUnit.SECONDS);
sf = executor.scheduleWithFixedDelay(new RetryRunner(), this.retryDelayInSeconds,this.retryDelayInSeconds, TimeUnit.SECONDS);
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
log.error("Downed Host retry shutdown hook called");
Expand Down Expand Up @@ -66,6 +71,10 @@ public boolean contains(CassandraHost cassandraHost) {
return downedHostQueue.contains(cassandraHost);
}

public Set<CassandraHost> getDownedHosts() {
return Collections.unmodifiableSet(new HashSet<CassandraHost>(downedHostQueue));
}

public void applyRetryDelay() {
sf.cancel(false);
executor.schedule(new RetryRunner(), retryDelayInSeconds, TimeUnit.SECONDS);
Expand All @@ -88,27 +97,32 @@ public void setRetryDelayInSeconds(int retryDelayInSeconds) {



class RetryRunner implements Callable<CassandraHost>{
class RetryRunner implements Runnable {

@Override
public CassandraHost call() throws Exception {
public void run() {
CassandraHost cassandraHost = downedHostQueue.poll();
if ( cassandraHost == null ) {
log.info("Retry service fired... nothing to do.");
return;
}
boolean reconnected = verifyConnection(cassandraHost);
log.info("Downed Host retry status {} with host: {}", reconnected, cassandraHost.getName());
if ( reconnected ) {
cassandraClientPool.getCluster().addHost(cassandraHost, true);
//cassandraClientPool.getCluster().addHost(cassandraHost, true);
connectionManager.addCassandraHost(cassandraHost);
}
if ( cassandraHost != null ) {
if ( !reconnected && cassandraHost != null ) {
downedHostQueue.add(cassandraHost);
}
return cassandraHost;

}

private boolean verifyConnection(CassandraHost cassandraHost) {
if ( cassandraHost == null ) return false;
TTransport tr = cassandraHost.getUseThriftFramedTransport() ?
new TFramedTransport(new TSocket(cassandraHost.getHost(), cassandraHost.getPort(), 15)) :
new TSocket(cassandraHost.getHost(), cassandraHost.getPort(), 15);
new TFramedTransport(new TSocket(cassandraHost.getHost(), cassandraHost.getPort(), retryDelayInSeconds / 2)) :
new TSocket(cassandraHost.getHost(), cassandraHost.getPort(), retryDelayInSeconds / 2);

TProtocol proto = new TBinaryProtocol(tr);
Cassandra.Client client = new Cassandra.Client(proto);
Expand Down
@@ -0,0 +1,204 @@
package me.prettyprint.cassandra.connection;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import me.prettyprint.cassandra.service.CassandraClientMonitor;
import me.prettyprint.cassandra.service.CassandraHost;
import me.prettyprint.cassandra.service.JmxMonitor;
import me.prettyprint.hector.api.exceptions.HectorException;
import me.prettyprint.hector.api.exceptions.PoolExhaustedException;

import org.cliffc.high_scale_lib.Counter;
import org.cliffc.high_scale_lib.NonBlockingHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConcurrentHClientPool {

private static final Logger log = LoggerFactory.getLogger(ConcurrentHClientPool.class);

private final ArrayBlockingQueue<HThriftClient> availableClientQueue;
private final NonBlockingHashSet<HThriftClient> activeClients;

private final CassandraHost cassandraHost;
//private final CassandraClientMonitor monitor;
private final AtomicInteger numActive, numBlocked;
private final AtomicBoolean active;

private final long maxWaitTimeWhenExhausted;

public ConcurrentHClientPool(CassandraHost host) {
this.cassandraHost = host;

availableClientQueue = new ArrayBlockingQueue<HThriftClient>(cassandraHost.getMaxActive(), true);
activeClients = new NonBlockingHashSet<HThriftClient>();
numActive = new AtomicInteger();
numBlocked = new AtomicInteger();
active = new AtomicBoolean(true);

maxWaitTimeWhenExhausted = cassandraHost.getMaxWaitTimeWhenExhausted() < 0 ? 0 : cassandraHost.getMaxWaitTimeWhenExhausted();

for (int i = 0; i < cassandraHost.getMaxActive()/3; i++) {
availableClientQueue.add(new HThriftClient(cassandraHost).open());
}
if ( log.isDebugEnabled() ) {
log.debug("Concurrent Host pool started with {} active clients; max: {} exhausted wait: {}",
new Object[]{getNumIdle(),
cassandraHost.getMaxActive(),
maxWaitTimeWhenExhausted});
}
}


public HThriftClient borrowClient() throws HectorException {
if ( !active.get() ) {
throw new HectorException("Attempt to borrow on in-active pool: " + getName());
}
HThriftClient cassandraClient;
int currentActive = numActive.incrementAndGet();
int tillExhausted = cassandraHost.getMaxActive() - currentActive;

numBlocked.incrementAndGet();
cassandraClient = availableClientQueue.poll();
if ( cassandraClient == null ) {
if ( tillExhausted > 0 ) {
// if we start with #of threads == getMaxActive, we could trigger this condition
addClientToPoolGently(new HThriftClient(cassandraHost).open());
log.debug("created new client. NumActive:{} untilExhausted: {}", currentActive, tillExhausted);
}
// blocked take on the queue if we are configured to wait forever
if ( log.isDebugEnabled() ) {
log.debug("blocking on queue - current block count {}", numBlocked.get());
}
// wait and catch, creating a new one if the counts have changed. Infinite wait should just recurse.
if ( maxWaitTimeWhenExhausted == 0 ) {
while (cassandraClient == null && active.get() ) {
try {
cassandraClient = availableClientQueue.poll(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException ie) {
log.error("InterruptedException poll operation on retry forever", ie);
break;
}
}
} else {

try {
cassandraClient = availableClientQueue.poll(maxWaitTimeWhenExhausted, TimeUnit.MILLISECONDS);
if ( cassandraClient == null ) {
throw new PoolExhaustedException(String.format("maxWaitTimeWhenExhausted exceeded for thread %s on host %s",
new Object[]{
Thread.currentThread().getName(),
cassandraHost.getName()}
));
}
} catch (InterruptedException ie) {
//monitor.incCounter(Counter.POOL_EXHAUSTED);
numActive.decrementAndGet();
}
}

}
activeClients.add(cassandraClient);
numBlocked.decrementAndGet();


return cassandraClient;
}

void shutdown() {
if (!active.compareAndSet(true, false) ) {
throw new IllegalArgumentException("shutdown() called for inactive pool: " + getName());
}
log.error("Shutdown triggered on {}", getName());
Set<HThriftClient> clients = new HashSet<HThriftClient>();
availableClientQueue.drainTo(clients);
if ( clients.size() > 0 ) {
for (HThriftClient hThriftClient : clients) {
hThriftClient.close();
}
}
log.error("Shutdown complete on {}", getName());
}

public CassandraHost getCassandraHost() {
return cassandraHost;
}

public String getName() {
return String.format("<ConcurrentCassandraClientPoolByHost>:{}", cassandraHost.getName());
}


public int getNumActive() {
return numActive.intValue();
}


public int getNumBeforeExhausted() {
return cassandraHost.getMaxActive() - numActive.intValue();
}


public int getNumBlockedThreads() {
return numBlocked.intValue();
}


public int getNumIdle() {
return availableClientQueue.size();
}


public boolean isExhausted() {
return getNumBeforeExhausted() == 0;
}

public int getMaxActive() {
return cassandraHost.getMaxActive();
}

public String getStatusAsString() {
return String.format("%s; Active: %d; Blocked: %d; Idle: %d; NumBeforeExhausted: %d",
getName(), getNumActive(), getNumBlockedThreads(), getNumIdle(), getNumBeforeExhausted());
}

public void releaseClient(HThriftClient client) throws HectorException {
activeClients.remove(client);
numActive.decrementAndGet();
boolean open = client.isOpen();
if ( open ) {
addClientToPoolGently(client);
} else {
if ( activeClients.size() < getMaxActive() && numBlocked.get() > 0) {
addClientToPoolGently(new HThriftClient(cassandraHost).open());
}
}

if ( log.isDebugEnabled() ) {
log.debug("Status of releaseClient {} to queue: {}", client.toString(), open);
}
}

/**
* Avoids a race condition on adding clients back to the pool if pool is almost full.
* Almost always a result of batch operation startup and shutdown (when multiple threads
* are releasing at the same time).
* @param client
*/
private void addClientToPoolGently(HThriftClient client) {
try {
availableClientQueue.add(client);
} catch (IllegalStateException ise) {
log.error("Capacity hit adding client back to queue. Closing extra.");
client.close();
}
}



}

0 comments on commit dee4007

Please sign in to comment.