Skip to content

Commit

Permalink
Merge pull request #605 from dmdevito/fixingfirewallconnectioncut_3
Browse files Browse the repository at this point in the history
added JMX counters for renewed idle/too long connections + added/modified tests
  • Loading branch information
zznate committed Mar 25, 2013
2 parents 1fe97f3 + 40932f7 commit b392682
Show file tree
Hide file tree
Showing 12 changed files with 61 additions and 18 deletions.
Expand Up @@ -10,6 +10,8 @@
import me.prettyprint.cassandra.connection.client.HClient; import me.prettyprint.cassandra.connection.client.HClient;
import me.prettyprint.cassandra.connection.factory.HClientFactory; import me.prettyprint.cassandra.connection.factory.HClientFactory;
import me.prettyprint.cassandra.service.CassandraHost; import me.prettyprint.cassandra.service.CassandraHost;
import me.prettyprint.cassandra.service.CassandraClientMonitor;
import me.prettyprint.cassandra.service.CassandraClientMonitor.Counter;
import me.prettyprint.hector.api.exceptions.HInactivePoolException; import me.prettyprint.hector.api.exceptions.HInactivePoolException;
import me.prettyprint.hector.api.exceptions.HPoolExhaustedException; import me.prettyprint.hector.api.exceptions.HPoolExhaustedException;
import me.prettyprint.hector.api.exceptions.HectorException; import me.prettyprint.hector.api.exceptions.HectorException;
Expand All @@ -36,9 +38,12 @@ public class ConcurrentHClientPool implements HClientPool {


private final HClientFactory clientFactory; private final HClientFactory clientFactory;


public ConcurrentHClientPool(HClientFactory clientFactory, CassandraHost host) { private final CassandraClientMonitor monitor;

public ConcurrentHClientPool(HClientFactory clientFactory, CassandraHost host, CassandraClientMonitor monitor) {
this.clientFactory = clientFactory; this.clientFactory = clientFactory;
this.cassandraHost = host; this.cassandraHost = host;
this.monitor = monitor;


availableClientQueue = new ArrayBlockingQueue<HClient>(cassandraHost.getMaxActive(), true); availableClientQueue = new ArrayBlockingQueue<HClient>(cassandraHost.getMaxActive(), true);
// This counter can be offset by as much as the number of threads. // This counter can be offset by as much as the number of threads.
Expand Down Expand Up @@ -79,6 +84,8 @@ public HClient borrowClient() throws HectorException {
System.currentTimeMillis() - cassandraClient.getLastSuccessTime()); System.currentTimeMillis() - cassandraClient.getLastSuccessTime());
cassandraClient.close(); cassandraClient.close();
cassandraClient = null; cassandraClient = null;

monitor.incCounter(Counter.RENEWED_IDLE_CONNECTIONS);
} }
} }
if (cassandraClient != null) { if (cassandraClient != null) {
Expand All @@ -88,6 +95,8 @@ public HClient borrowClient() throws HectorException {
System.currentTimeMillis() - cassandraClient.getCreatedTime()); System.currentTimeMillis() - cassandraClient.getCreatedTime());
cassandraClient.close(); cassandraClient.close();
cassandraClient = null; cassandraClient = null;

monitor.incCounter(Counter.RENEWED_TOO_LONG_CONNECTIONS);
} }
} }
if ( cassandraClient == null ) { if ( cassandraClient == null ) {
Expand Down
Expand Up @@ -14,6 +14,7 @@


import me.prettyprint.cassandra.connection.factory.HClientFactory; import me.prettyprint.cassandra.connection.factory.HClientFactory;
import me.prettyprint.cassandra.service.CassandraHost; import me.prettyprint.cassandra.service.CassandraHost;
import me.prettyprint.cassandra.service.CassandraClientMonitor;
import me.prettyprint.cassandra.utils.DaemonThreadPoolFactory; import me.prettyprint.cassandra.utils.DaemonThreadPoolFactory;


import org.slf4j.Logger; import org.slf4j.Logger;
Expand Down Expand Up @@ -119,8 +120,8 @@ public int compare(HClientPool p1, HClientPool p2) {
} }


@Override @Override
public HClientPool createConnection(HClientFactory clientFactory, CassandraHost host) { public HClientPool createConnection(HClientFactory clientFactory, CassandraHost host, CassandraClientMonitor monitor) {
LatencyAwareHClientPool pool = new LatencyAwareHClientPool(clientFactory, host); LatencyAwareHClientPool pool = new LatencyAwareHClientPool(clientFactory, host, monitor);
add(pool); add(pool);
return pool; return pool;
} }
Expand Down
Expand Up @@ -62,9 +62,10 @@ public HConnectionManager(String clusterName, CassandraHostConfigurator cassandr
if ( cassandraHostConfigurator.getRetryDownedHosts() ) { if ( cassandraHostConfigurator.getRetryDownedHosts() ) {
cassandraHostRetryService = new CassandraHostRetryService(this, clientFactory, cassandraHostConfigurator, listenerHandler); cassandraHostRetryService = new CassandraHostRetryService(this, clientFactory, cassandraHostConfigurator, listenerHandler);
} }
monitor = JmxMonitor.getInstance().getCassandraMonitor(this);
for ( CassandraHost host : cassandraHostConfigurator.buildCassandraHosts()) { for ( CassandraHost host : cassandraHostConfigurator.buildCassandraHosts()) {
try { try {
HClientPool hcp = loadBalancingPolicy.createConnection(clientFactory, host); HClientPool hcp = loadBalancingPolicy.createConnection(clientFactory, host, monitor);
hostPools.put(host,hcp); hostPools.put(host,hcp);
} catch (HectorTransportException hte) { } catch (HectorTransportException hte) {
log.error("Could not start connection pool for host {}", host); log.error("Could not start connection pool for host {}", host);
Expand All @@ -78,7 +79,6 @@ public HConnectionManager(String clusterName, CassandraHostConfigurator cassandr
if ( cassandraHostConfigurator.getUseHostTimeoutTracker() ) { if ( cassandraHostConfigurator.getUseHostTimeoutTracker() ) {
hostTimeoutTracker = new HostTimeoutTracker(this, cassandraHostConfigurator); hostTimeoutTracker = new HostTimeoutTracker(this, cassandraHostConfigurator);
} }
monitor = JmxMonitor.getInstance().getCassandraMonitor(this);
exceptionsTranslator = new ExceptionsTranslatorImpl(); exceptionsTranslator = new ExceptionsTranslatorImpl();
this.cassandraHostConfigurator = cassandraHostConfigurator; this.cassandraHostConfigurator = cassandraHostConfigurator;
hostPoolValues = hostPools.values(); hostPoolValues = hostPools.values();
Expand Down Expand Up @@ -109,7 +109,7 @@ public boolean addCassandraHost(CassandraHost cassandraHost) {
HClientPool pool = null; HClientPool pool = null;
try { try {
cassandraHostConfigurator.applyConfig(cassandraHost); cassandraHostConfigurator.applyConfig(cassandraHost);
pool = cassandraHostConfigurator.getLoadBalancingPolicy().createConnection(clientFactory, cassandraHost); pool = cassandraHostConfigurator.getLoadBalancingPolicy().createConnection(clientFactory, cassandraHost, monitor);
hostPools.putIfAbsent(cassandraHost, pool); hostPools.putIfAbsent(cassandraHost, pool);
log.info("Added host {} to pool", cassandraHost.getName()); log.info("Added host {} to pool", cassandraHost.getName());
listenerHandler.fireOnAddHost(cassandraHost, true, null, null); listenerHandler.fireOnAddHost(cassandraHost, true, null, null);
Expand Down
Expand Up @@ -6,6 +6,7 @@
import me.prettyprint.cassandra.connection.client.HClient; import me.prettyprint.cassandra.connection.client.HClient;
import me.prettyprint.cassandra.connection.factory.HClientFactory; import me.prettyprint.cassandra.connection.factory.HClientFactory;
import me.prettyprint.cassandra.service.CassandraHost; import me.prettyprint.cassandra.service.CassandraHost;
import me.prettyprint.cassandra.service.CassandraClientMonitor;
import me.prettyprint.hector.api.exceptions.HectorException; import me.prettyprint.hector.api.exceptions.HectorException;


/** /**
Expand All @@ -22,8 +23,8 @@ public class LatencyAwareHClientPool extends ConcurrentHClientPool {
private static final double SENTINEL_COMPARE = 0.768; private static final double SENTINEL_COMPARE = 0.768;
private final LinkedBlockingDeque<Double> latencies; private final LinkedBlockingDeque<Double> latencies;


public LatencyAwareHClientPool(HClientFactory clientFactory, CassandraHost host) { public LatencyAwareHClientPool(HClientFactory clientFactory, CassandraHost host, CassandraClientMonitor monitor) {
super(clientFactory, host); super(clientFactory, host, monitor);
latencies = new LinkedBlockingDeque<Double>(WINDOW_QUEUE_SIZE); latencies = new LinkedBlockingDeque<Double>(WINDOW_QUEUE_SIZE);
} }


Expand Down
Expand Up @@ -2,6 +2,7 @@


import me.prettyprint.cassandra.connection.factory.HClientFactory; import me.prettyprint.cassandra.connection.factory.HClientFactory;
import me.prettyprint.cassandra.service.CassandraHost; import me.prettyprint.cassandra.service.CassandraHost;
import me.prettyprint.cassandra.service.CassandraClientMonitor;


import java.util.*; import java.util.*;


Expand Down Expand Up @@ -55,7 +56,7 @@ public int compare(HClientPool o1, HClientPool o2) {
} }


@Override @Override
public HClientPool createConnection(HClientFactory clientFactory, CassandraHost host) { public HClientPool createConnection(HClientFactory clientFactory, CassandraHost host, CassandraClientMonitor monitor) {
return new ConcurrentHClientPool(clientFactory, host); return new ConcurrentHClientPool(clientFactory, host, monitor);
} }
} }
Expand Up @@ -6,6 +6,7 @@


import me.prettyprint.cassandra.connection.factory.HClientFactory; import me.prettyprint.cassandra.connection.factory.HClientFactory;
import me.prettyprint.cassandra.service.CassandraHost; import me.prettyprint.cassandra.service.CassandraHost;
import me.prettyprint.cassandra.service.CassandraClientMonitor;


/** /**
* Default interface for all load balancing policies. * Default interface for all load balancing policies.
Expand All @@ -27,7 +28,8 @@ public interface LoadBalancingPolicy extends Serializable {
* *
* @param clientFactory an instance of {@link HClientFactory} * @param clientFactory an instance of {@link HClientFactory}
* @param host an instance of {@link CassandraHost} representing the host this pool will represent * @param host an instance of {@link CassandraHost} representing the host this pool will represent
* @param monitor the monitor exposing JMX methods
* @return a connection pool * @return a connection pool
*/ */
HClientPool createConnection(HClientFactory clientFactory, CassandraHost host); HClientPool createConnection(HClientFactory clientFactory, CassandraHost host, CassandraClientMonitor monitor);
} }
Expand Up @@ -5,6 +5,7 @@


import me.prettyprint.cassandra.connection.factory.HClientFactory; import me.prettyprint.cassandra.connection.factory.HClientFactory;
import me.prettyprint.cassandra.service.CassandraHost; import me.prettyprint.cassandra.service.CassandraHost;
import me.prettyprint.cassandra.service.CassandraClientMonitor;


import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;


Expand Down Expand Up @@ -63,7 +64,7 @@ private int getAndIncrement(int size) {
} }


@Override @Override
public HClientPool createConnection(HClientFactory clientFactory, CassandraHost host) { public HClientPool createConnection(HClientFactory clientFactory, CassandraHost host, CassandraClientMonitor monitor) {
return new ConcurrentHClientPool(clientFactory, host); return new ConcurrentHClientPool(clientFactory, host, monitor);
} }
} }
Expand Up @@ -40,6 +40,8 @@ public enum Counter {
RECOVERABLE_LB_CONNECT_ERRORS, RECOVERABLE_LB_CONNECT_ERRORS,
/** Connection time errors - unable to connect to host or something... */ /** Connection time errors - unable to connect to host or something... */
CONNECT_ERROR, CONNECT_ERROR,
RENEWED_IDLE_CONNECTIONS,
RENEWED_TOO_LONG_CONNECTIONS
} }


public CassandraClientMonitor(HConnectionManager connectionManager) { public CassandraClientMonitor(HConnectionManager connectionManager) {
Expand Down Expand Up @@ -247,7 +249,14 @@ public boolean setCassandraHostRetryDelay(String retryDelay) {
return false; return false;
} }
} }




@Override
public int getNumRenewedIdleConnections() {
return counters.get(Counter.RENEWED_IDLE_CONNECTIONS).intValue();
}

@Override
public int getNumRenewedTooLongConnections() {
return counters.get(Counter.RENEWED_TOO_LONG_CONNECTIONS).intValue();
}
} }
Expand Up @@ -152,4 +152,16 @@ public interface CassandraClientMonitorMBean {
Set<String> getSuspendedCassandraHosts(); Set<String> getSuspendedCassandraHosts();


boolean setCassandraHostRetryDelay(String retryDelay); boolean setCassandraHostRetryDelay(String retryDelay);


/**
* Total number of connections created due to previous idle connections.
*/
int getNumRenewedIdleConnections();


/**
* Total number of connections created due to previous too long connections.
*/
int getNumRenewedTooLongConnections();
} }
Expand Up @@ -7,6 +7,7 @@
import me.prettyprint.cassandra.connection.factory.HClientFactory; import me.prettyprint.cassandra.connection.factory.HClientFactory;
import me.prettyprint.cassandra.connection.factory.HThriftClientFactoryImpl; import me.prettyprint.cassandra.connection.factory.HThriftClientFactoryImpl;
import me.prettyprint.cassandra.service.CassandraHost; import me.prettyprint.cassandra.service.CassandraHost;
import me.prettyprint.cassandra.service.CassandraClientMonitor;
import me.prettyprint.hector.api.exceptions.HInactivePoolException; import me.prettyprint.hector.api.exceptions.HInactivePoolException;


import org.junit.Before; import org.junit.Before;
Expand All @@ -22,7 +23,7 @@ public void setupTest() {
setupClient(); setupClient();
cassandraHost = cassandraHostConfigurator.buildCassandraHosts()[0]; cassandraHost = cassandraHostConfigurator.buildCassandraHosts()[0];
HClientFactory factory = new HThriftClientFactoryImpl(); HClientFactory factory = new HThriftClientFactoryImpl();
clientPool = new ConcurrentHClientPool(factory, cassandraHost); clientPool = new ConcurrentHClientPool(factory, cassandraHost, new CassandraClientMonitor(connectionManager));
} }


@Test @Test
Expand Down
Expand Up @@ -8,6 +8,7 @@
import me.prettyprint.cassandra.connection.factory.HClientFactory; import me.prettyprint.cassandra.connection.factory.HClientFactory;
import me.prettyprint.cassandra.connection.factory.HThriftClientFactoryImpl; import me.prettyprint.cassandra.connection.factory.HThriftClientFactoryImpl;
import me.prettyprint.cassandra.service.CassandraHost; import me.prettyprint.cassandra.service.CassandraHost;
import me.prettyprint.cassandra.service.CassandraClientMonitor;
import me.prettyprint.cassandra.service.CassandraHostConfigurator; import me.prettyprint.cassandra.service.CassandraHostConfigurator;
import me.prettyprint.hector.api.exceptions.HInactivePoolException; import me.prettyprint.hector.api.exceptions.HInactivePoolException;


Expand All @@ -18,13 +19,15 @@ public class HClientRenewTest extends BaseEmbededServerSetupTest {


private CassandraHost cassandraHost; private CassandraHost cassandraHost;
private ConcurrentHClientPool clientPool; private ConcurrentHClientPool clientPool;
private CassandraClientMonitor monitor;


@Before @Before
public void setupTest() { public void setupTest() {
setupClient(); setupClient();
cassandraHost = cassandraHostConfigurator.buildCassandraHosts()[0]; cassandraHost = cassandraHostConfigurator.buildCassandraHosts()[0];
HClientFactory factory = new HThriftClientFactoryImpl(); HClientFactory factory = new HThriftClientFactoryImpl();
clientPool = new ConcurrentHClientPool(factory, cassandraHost); monitor = new CassandraClientMonitor(connectionManager);
clientPool = new ConcurrentHClientPool(factory, cassandraHost, monitor);
} }


protected void configure(CassandraHostConfigurator configurator) { protected void configure(CassandraHostConfigurator configurator) {
Expand All @@ -39,13 +42,15 @@ public void testBorrowAndRenew() {
client1.updateLastSuccessTime(); client1.updateLastSuccessTime();
clientPool.releaseClient(client1); clientPool.releaseClient(client1);
assertEquals(0, clientPool.getNumActive()); assertEquals(0, clientPool.getNumActive());
int count = monitor.getNumRenewedIdleConnections();
try { try {
Thread.sleep(4 * 1000); Thread.sleep(4 * 1000);
} catch(InterruptedException ex) { } catch(InterruptedException ex) {
fail(); fail();
} }
HClient client2 = clientPool.borrowClient(); HClient client2 = clientPool.borrowClient();
assertEquals(1, clientPool.getNumActive()); assertEquals(1, clientPool.getNumActive());
assertEquals(count + 1, monitor.getNumRenewedIdleConnections());
assertNotSame(client1, client2); assertNotSame(client1, client2);
} }
} }
Expand Up @@ -5,6 +5,7 @@
import me.prettyprint.cassandra.connection.factory.HClientFactory; import me.prettyprint.cassandra.connection.factory.HClientFactory;
import me.prettyprint.cassandra.connection.factory.HThriftClientFactoryImpl; import me.prettyprint.cassandra.connection.factory.HThriftClientFactoryImpl;
import me.prettyprint.cassandra.service.CassandraHost; import me.prettyprint.cassandra.service.CassandraHost;
import me.prettyprint.cassandra.service.CassandraClientMonitor;


import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
Expand All @@ -20,7 +21,7 @@ public void setupTest() {
setupClient(); setupClient();
cassandraHost = cassandraHostConfigurator.buildCassandraHosts()[0]; cassandraHost = cassandraHostConfigurator.buildCassandraHosts()[0];
HClientFactory factory = new HThriftClientFactoryImpl(); HClientFactory factory = new HThriftClientFactoryImpl();
clientPool = new LatencyAwareHClientPool(factory, cassandraHost); clientPool = new LatencyAwareHClientPool(factory, cassandraHost, new CassandraClientMonitor(connectionManager));
} }


@Test @Test
Expand Down

0 comments on commit b392682

Please sign in to comment.