Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
62 changed files
with
1,187 additions
and
3,724 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
204 changes: 204 additions & 0 deletions
204
src/main/java/me/prettyprint/cassandra/connection/ConcurrentHClientPool.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,204 @@ | ||
package me.prettyprint.cassandra.connection; | ||
|
||
import java.util.HashSet; | ||
import java.util.Set; | ||
import java.util.concurrent.ArrayBlockingQueue; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
|
||
import me.prettyprint.cassandra.service.CassandraClientMonitor; | ||
import me.prettyprint.cassandra.service.CassandraHost; | ||
import me.prettyprint.cassandra.service.JmxMonitor; | ||
import me.prettyprint.hector.api.exceptions.HectorException; | ||
import me.prettyprint.hector.api.exceptions.PoolExhaustedException; | ||
|
||
import org.cliffc.high_scale_lib.Counter; | ||
import org.cliffc.high_scale_lib.NonBlockingHashSet; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class ConcurrentHClientPool { | ||
|
||
private static final Logger log = LoggerFactory.getLogger(ConcurrentHClientPool.class); | ||
|
||
private final ArrayBlockingQueue<HThriftClient> availableClientQueue; | ||
private final NonBlockingHashSet<HThriftClient> activeClients; | ||
|
||
private final CassandraHost cassandraHost; | ||
//private final CassandraClientMonitor monitor; | ||
private final AtomicInteger numActive, numBlocked; | ||
private final AtomicBoolean active; | ||
|
||
private final long maxWaitTimeWhenExhausted; | ||
|
||
public ConcurrentHClientPool(CassandraHost host) { | ||
this.cassandraHost = host; | ||
|
||
availableClientQueue = new ArrayBlockingQueue<HThriftClient>(cassandraHost.getMaxActive(), true); | ||
activeClients = new NonBlockingHashSet<HThriftClient>(); | ||
numActive = new AtomicInteger(); | ||
numBlocked = new AtomicInteger(); | ||
active = new AtomicBoolean(true); | ||
|
||
maxWaitTimeWhenExhausted = cassandraHost.getMaxWaitTimeWhenExhausted() < 0 ? 0 : cassandraHost.getMaxWaitTimeWhenExhausted(); | ||
|
||
for (int i = 0; i < cassandraHost.getMaxActive()/3; i++) { | ||
availableClientQueue.add(new HThriftClient(cassandraHost).open()); | ||
} | ||
if ( log.isDebugEnabled() ) { | ||
log.debug("Concurrent Host pool started with {} active clients; max: {} exhausted wait: {}", | ||
new Object[]{getNumIdle(), | ||
cassandraHost.getMaxActive(), | ||
maxWaitTimeWhenExhausted}); | ||
} | ||
} | ||
|
||
|
||
public HThriftClient borrowClient() throws HectorException { | ||
if ( !active.get() ) { | ||
throw new HectorException("Attempt to borrow on in-active pool: " + getName()); | ||
} | ||
HThriftClient cassandraClient; | ||
int currentActive = numActive.incrementAndGet(); | ||
int tillExhausted = cassandraHost.getMaxActive() - currentActive; | ||
|
||
numBlocked.incrementAndGet(); | ||
cassandraClient = availableClientQueue.poll(); | ||
if ( cassandraClient == null ) { | ||
if ( tillExhausted > 0 ) { | ||
// if we start with #of threads == getMaxActive, we could trigger this condition | ||
addClientToPoolGently(new HThriftClient(cassandraHost).open()); | ||
log.debug("created new client. NumActive:{} untilExhausted: {}", currentActive, tillExhausted); | ||
} | ||
// blocked take on the queue if we are configured to wait forever | ||
if ( log.isDebugEnabled() ) { | ||
log.debug("blocking on queue - current block count {}", numBlocked.get()); | ||
} | ||
// wait and catch, creating a new one if the counts have changed. Infinite wait should just recurse. | ||
if ( maxWaitTimeWhenExhausted == 0 ) { | ||
while (cassandraClient == null && active.get() ) { | ||
try { | ||
cassandraClient = availableClientQueue.poll(100, TimeUnit.MILLISECONDS); | ||
} catch (InterruptedException ie) { | ||
log.error("InterruptedException poll operation on retry forever", ie); | ||
break; | ||
} | ||
} | ||
} else { | ||
|
||
try { | ||
cassandraClient = availableClientQueue.poll(maxWaitTimeWhenExhausted, TimeUnit.MILLISECONDS); | ||
if ( cassandraClient == null ) { | ||
throw new PoolExhaustedException(String.format("maxWaitTimeWhenExhausted exceeded for thread %s on host %s", | ||
new Object[]{ | ||
Thread.currentThread().getName(), | ||
cassandraHost.getName()} | ||
)); | ||
} | ||
} catch (InterruptedException ie) { | ||
//monitor.incCounter(Counter.POOL_EXHAUSTED); | ||
numActive.decrementAndGet(); | ||
} | ||
} | ||
|
||
} | ||
activeClients.add(cassandraClient); | ||
numBlocked.decrementAndGet(); | ||
|
||
|
||
return cassandraClient; | ||
} | ||
|
||
void shutdown() { | ||
if (!active.compareAndSet(true, false) ) { | ||
throw new IllegalArgumentException("shutdown() called for inactive pool: " + getName()); | ||
} | ||
log.error("Shutdown triggered on {}", getName()); | ||
Set<HThriftClient> clients = new HashSet<HThriftClient>(); | ||
availableClientQueue.drainTo(clients); | ||
if ( clients.size() > 0 ) { | ||
for (HThriftClient hThriftClient : clients) { | ||
hThriftClient.close(); | ||
} | ||
} | ||
log.error("Shutdown complete on {}", getName()); | ||
} | ||
|
||
public CassandraHost getCassandraHost() { | ||
return cassandraHost; | ||
} | ||
|
||
public String getName() { | ||
return String.format("<ConcurrentCassandraClientPoolByHost>:{}", cassandraHost.getName()); | ||
} | ||
|
||
|
||
public int getNumActive() { | ||
return numActive.intValue(); | ||
} | ||
|
||
|
||
public int getNumBeforeExhausted() { | ||
return cassandraHost.getMaxActive() - numActive.intValue(); | ||
} | ||
|
||
|
||
public int getNumBlockedThreads() { | ||
return numBlocked.intValue(); | ||
} | ||
|
||
|
||
public int getNumIdle() { | ||
return availableClientQueue.size(); | ||
} | ||
|
||
|
||
public boolean isExhausted() { | ||
return getNumBeforeExhausted() == 0; | ||
} | ||
|
||
public int getMaxActive() { | ||
return cassandraHost.getMaxActive(); | ||
} | ||
|
||
public String getStatusAsString() { | ||
return String.format("%s; Active: %d; Blocked: %d; Idle: %d; NumBeforeExhausted: %d", | ||
getName(), getNumActive(), getNumBlockedThreads(), getNumIdle(), getNumBeforeExhausted()); | ||
} | ||
|
||
public void releaseClient(HThriftClient client) throws HectorException { | ||
activeClients.remove(client); | ||
numActive.decrementAndGet(); | ||
boolean open = client.isOpen(); | ||
if ( open ) { | ||
addClientToPoolGently(client); | ||
} else { | ||
if ( activeClients.size() < getMaxActive() && numBlocked.get() > 0) { | ||
addClientToPoolGently(new HThriftClient(cassandraHost).open()); | ||
} | ||
} | ||
|
||
if ( log.isDebugEnabled() ) { | ||
log.debug("Status of releaseClient {} to queue: {}", client.toString(), open); | ||
} | ||
} | ||
|
||
/** | ||
* Avoids a race condition on adding clients back to the pool if pool is almost full. | ||
* Almost always a result of batch operation startup and shutdown (when multiple threads | ||
* are releasing at the same time). | ||
* @param client | ||
*/ | ||
private void addClientToPoolGently(HThriftClient client) { | ||
try { | ||
availableClientQueue.add(client); | ||
} catch (IllegalStateException ise) { | ||
log.error("Capacity hit adding client back to queue. Closing extra."); | ||
client.close(); | ||
} | ||
} | ||
|
||
|
||
|
||
} |
Oops, something went wrong.