Skip to content

Commit

Permalink
new client structure, pool isolation
Browse files Browse the repository at this point in the history
  • Loading branch information
zznate committed Oct 10, 2010
1 parent 3b2c191 commit 9925524
Show file tree
Hide file tree
Showing 4 changed files with 333 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package me.prettyprint.cassandra.connection;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import me.prettyprint.cassandra.service.CassandraClient;
import me.prettyprint.cassandra.service.CassandraClientPool;
import me.prettyprint.cassandra.service.CassandraHost;
import me.prettyprint.cassandra.service.ConcurrentCassandraClientPoolByHost;
import me.prettyprint.hector.api.exceptions.HectorException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConcurrentHClientPool {

private static final Logger log = LoggerFactory.getLogger(ConcurrentCassandraClientPoolByHost.class);

private final ArrayBlockingQueue<HThriftClient> clientQueue;

private final CassandraHost cassandraHost;
//private final CassandraClientMonitor monitor;
private final AtomicInteger numActive, numBlocked;
private final AtomicBoolean active;

private final long maxWaitTimeWhenExhausted;

public ConcurrentHClientPool(CassandraHost host) {
this.cassandraHost = host;

clientQueue = new ArrayBlockingQueue<HThriftClient>(cassandraHost.getMaxActive(), true);
numActive = new AtomicInteger();
numBlocked = new AtomicInteger();
active = new AtomicBoolean(true);

maxWaitTimeWhenExhausted = cassandraHost.getMaxWaitTimeWhenExhausted() < 0 ? 0 : cassandraHost.getMaxWaitTimeWhenExhausted();

for (int i = 0; i < cassandraHost.getMaxActive()/3; i++) {
clientQueue.add(new HThriftClient(cassandraHost).open());
}
if ( log.isDebugEnabled() ) {
log.debug("Concurrent Host pool started with {} active clients; max: {} exhausted wait: {}",
new Object[]{getNumIdle(),
cassandraHost.getMaxActive(),
maxWaitTimeWhenExhausted});
}
}


public HThriftClient borrowClient() throws HectorException {
HThriftClient cassandraClient;
try {
numBlocked.incrementAndGet();
cassandraClient = clientQueue.poll();
if ( cassandraClient == null ) {
if ( getNumBeforeExhausted() > 0 ) {
cassandraClient = new HThriftClient(cassandraHost).open();
} else {
// blocked take on the queue if we are configured to wait forever
cassandraClient = maxWaitTimeWhenExhausted == 0 ? clientQueue.take() : clientQueue.poll(maxWaitTimeWhenExhausted, TimeUnit.MILLISECONDS);
}
}
numBlocked.decrementAndGet();
numActive.incrementAndGet();
} catch (InterruptedException ie) {
//monitor.incCounter(Counter.POOL_EXHAUSTED);
throw new HectorException(String.format("maxWaitTimeWhenExhausted exceeded for thread {} on host {}",
new Object[]{
Thread.currentThread().getName(),
cassandraHost.getName()}
));
}
if ( log.isDebugEnabled() ) {
log.debug("borrowed client {} from pool", cassandraClient);
}
return cassandraClient;
}



public CassandraHost getCassandraHost() {
return cassandraHost;
}

public String getName() {
return String.format("<ConcurrentCassandraClientPoolByHost>:{}", cassandraHost.getName());
}


public int getNumActive() {
return numActive.get();
}


public int getNumBeforeExhausted() {
return cassandraHost.getMaxActive() - numActive.get();
}


public int getNumBlockedThreads() {
return numBlocked.get();
}


public int getNumIdle() {
return clientQueue.size();
}


public boolean isExhausted() {
return getNumBeforeExhausted() == 0;
}


public void releaseClient(HThriftClient client) throws HectorException {
numActive.decrementAndGet();
boolean open = client.isOpen();
if ( open ) {
clientQueue.add(client);
}
if ( log.isDebugEnabled() ) {
log.debug("Status of releaseClient {} to queue: {}", client.toString(), open);
}
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package me.prettyprint.cassandra.connection;

import me.prettyprint.cassandra.service.CassandraHost;

import org.cliffc.high_scale_lib.NonBlockingIdentityHashMap;

public class HConnectionManager {

private final NonBlockingIdentityHashMap<CassandraHost, ConcurrentHClientPool> hostPools;

public HConnectionManager() {
hostPools = new NonBlockingIdentityHashMap<CassandraHost, ConcurrentHClientPool>();
}



public HThriftClient borrowClient() { // needed? may not be
return null;
}

public void releaseClient(HThriftClient client) { // may not be needed

}

// operationWithFailover(Operation op)


}
136 changes: 136 additions & 0 deletions src/main/java/me/prettyprint/cassandra/connection/HThriftClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package me.prettyprint.cassandra.connection;

import java.util.concurrent.atomic.AtomicLong;

import me.prettyprint.cassandra.service.CassandraHost;
import me.prettyprint.cassandra.service.SystemProperties;
import me.prettyprint.hector.api.exceptions.HectorTransportException;

import org.apache.cassandra.thrift.Cassandra;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class HThriftClient {

private static Logger log = LoggerFactory.getLogger(HThriftClient.class);

private static final AtomicLong serial = new AtomicLong(0);

private final CassandraHost cassandraHost;

private final long mySerial;
private final int timeout;

private TTransport transport;

HThriftClient(CassandraHost cassandraHost) {
this.cassandraHost = cassandraHost;
this.timeout = getTimeout(cassandraHost);
mySerial = serial.incrementAndGet();
}

/**
* Returns a new Cassandra.Client on each invocation using the underlying transport
*
*/
public Cassandra.Client getCassandra() {
if ( !isOpen() ) {
throw new IllegalStateException("getCassandra called on client that was not open. You should not have gotten here.");
}
return new Cassandra.Client(new TBinaryProtocol(transport));
}

HThriftClient close() {
if ( log.isDebugEnabled() ) {
log.debug("Closing client {}", this);
}
if (transport != null) {
try {
transport.flush();
transport.close();
} catch (Exception e) {
log.error("Could not close transport in close for client" + toString(), e);
}
}
return this;
}


HThriftClient open() {
if ( isOpen() ) {
throw new IllegalStateException("Open called on already open connection. You should not have gotten here.");
}
if ( log.isDebugEnabled() ) {
log.debug("Creating a new thrift connection to {}", cassandraHost);
}

if (cassandraHost.getUseThriftFramedTransport()) {
transport = new TFramedTransport(new TSocket(cassandraHost.getHost(), cassandraHost.getPort(), timeout));
} else {
transport = new TSocket(cassandraHost.getHost(), cassandraHost.getPort(), timeout);
}
try {
transport.open();
} catch (TTransportException e) {
// Thrift exceptions aren't very good in reporting, so we have to catch the exception here and
// add details to it.
log.error("Unable to open transport to " + cassandraHost.getName(), e);
//clientMonitor.incCounter(Counter.CONNECT_ERROR);
throw new HectorTransportException("Unable to open transport to " + cassandraHost.getName() +" , " +
e.getLocalizedMessage(), e);
}
return this;
}


boolean isOpen() {
boolean open = false;
if (transport != null) {
open = transport.isOpen();
}
if ( log.isDebugEnabled() ) {
log.debug("Transport open status {} for client {}", open, this);
}
return open;
}

/**
* If CassandraHost was not null we use {@link CassandraHost#getCassandraThriftSocketTimeout()}
* if it was greater than zero. Otherwise look for an environment
* variable name CASSANDRA_THRIFT_SOCKET_TIMEOUT value.
* If doesn't exist, returns 0.
* @param cassandraHost
*/
private int getTimeout(CassandraHost cassandraHost) {
int timeoutVar = 0;
if ( cassandraHost != null && cassandraHost.getCassandraThriftSocketTimeout() > 0 ) {
timeoutVar = cassandraHost.getCassandraThriftSocketTimeout();
} else {
String timeoutStr = System.getProperty(
SystemProperties.CASSANDRA_THRIFT_SOCKET_TIMEOUT.toString());
if (timeoutStr != null && timeoutStr.length() > 0) {
try {
timeoutVar = Integer.valueOf(timeoutStr);
} catch (NumberFormatException e) {
log.error("Invalid value for CASSANDRA_THRIFT_SOCKET_TIMEOUT", e);
}
}
}
return timeoutVar;
}


@Override
public String toString() {
return String.format(NAME_FORMAT, cassandraHost.getUrl(), mySerial);
}

private static final String NAME_FORMAT = "CassandraClient<%s-%d>";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package me.prettyprint.cassandra.connection;

import static org.junit.Assert.*;
import me.prettyprint.cassandra.BaseEmbededServerSetupTest;
import me.prettyprint.cassandra.service.CassandraHost;

import org.junit.Before;
import org.junit.Test;

public class HThriftClientTest extends BaseEmbededServerSetupTest {

private HThriftClient hThriftClient;
// cassandraHostConfigurator = new CassandraHostConfigurator("127.0.0.1:9170");
private CassandraHost cassandraHost;

@Before
public void doSetup() {
cassandraHost = new CassandraHost("127.0.0.1:9170");
hThriftClient = new HThriftClient(cassandraHost);
}

@Test
public void testOpenAndClose() {
assertTrue(hThriftClient.open().isOpen());
assertFalse(hThriftClient.close().isOpen());
}

@Test(expected=IllegalStateException.class)
public void testFailOnDoubleOpen() {
hThriftClient.open().open();
}

@Test(expected=IllegalStateException.class)
public void testGetCassandraNotOpen() {
hThriftClient.getCassandra();
}

}

0 comments on commit 9925524

Please sign in to comment.