From 3e11c65f667f0c63b937a7ef98fae022f780f658 Mon Sep 17 00:00:00 2001 From: zznate Date: Sat, 13 Nov 2010 18:27:33 -0600 Subject: [PATCH] simplified connection on retry service, nodediscovery service is functionalbut off by default --- .../BackgroundCassandraHostService.java | 4 +++- .../connection/CassandraHostRetryService.java | 18 +++++---------- .../connection/HConnectionManager.java | 12 +++++++--- .../connection/NodeAutoDiscoverService.java | 23 ++++++++++--------- .../cassandra/service/AbstractCluster.java | 4 +--- .../service/CassandraHostConfigurator.java | 4 ++-- 6 files changed, 33 insertions(+), 32 deletions(-) diff --git a/src/main/java/me/prettyprint/cassandra/connection/BackgroundCassandraHostService.java b/src/main/java/me/prettyprint/cassandra/connection/BackgroundCassandraHostService.java index 895181de0..63a3b4736 100644 --- a/src/main/java/me/prettyprint/cassandra/connection/BackgroundCassandraHostService.java +++ b/src/main/java/me/prettyprint/cassandra/connection/BackgroundCassandraHostService.java @@ -16,7 +16,7 @@ public abstract class BackgroundCassandraHostService { protected final CassandraHostConfigurator cassandraHostConfigurator; protected ScheduledFuture sf; - protected int retryDelayInSeconds; + protected int retryDelayInSeconds = DEF_RETRY_DELAY; public BackgroundCassandraHostService(HConnectionManager connectionManager, CassandraHostConfigurator cassandraHostConfigurator) { @@ -42,4 +42,6 @@ public int getRetryDelayInSeconds() { public void setRetryDelayInSeconds(int retryDelayInSeconds) { this.retryDelayInSeconds = retryDelayInSeconds; } + + public static final int DEF_RETRY_DELAY = 10; } diff --git a/src/main/java/me/prettyprint/cassandra/connection/CassandraHostRetryService.java b/src/main/java/me/prettyprint/cassandra/connection/CassandraHostRetryService.java index 9e7f4d462..b248fe518 100644 --- a/src/main/java/me/prettyprint/cassandra/connection/CassandraHostRetryService.java +++ b/src/main/java/me/prettyprint/cassandra/connection/CassandraHostRetryService.java @@ -25,8 +25,7 @@ public class CassandraHostRetryService extends BackgroundCassandraHostService { private static Logger log = LoggerFactory.getLogger(CassandraHostRetryService.class); - public static final int DEF_QUEUE_SIZE = 3; - public static final int DEF_RETRY_DELAY = 10; + public static final int DEF_QUEUE_SIZE = 3; private LinkedBlockingQueue downedHostQueue; public CassandraHostRetryService(HConnectionManager connectionManager, @@ -35,7 +34,7 @@ public CassandraHostRetryService(HConnectionManager connectionManager, this.retryDelayInSeconds = cassandraHostConfigurator.getRetryDownedHostsDelayInSeconds(); downedHostQueue = new LinkedBlockingQueue(cassandraHostConfigurator.getRetryDownedHostsQueueSize()); sf = executor.scheduleWithFixedDelay(new RetryRunner(), this.retryDelayInSeconds,this.retryDelayInSeconds, TimeUnit.SECONDS); - + log.info("Downed Host Retry service started with queue size {} and retry delay {}s", cassandraHostConfigurator.getRetryDownedHostsQueueSize(), retryDelayInSeconds); @@ -100,19 +99,14 @@ public void run() { private boolean verifyConnection(CassandraHost cassandraHost) { if ( cassandraHost == null ) return false; - TTransport tr = cassandraHost.getUseThriftFramedTransport() ? - new TFramedTransport(new TSocket(cassandraHost.getHost(), cassandraHost.getPort(), 10)) : - new TSocket(cassandraHost.getHost(), cassandraHost.getPort(), 10); - - TProtocol proto = new TBinaryProtocol(tr); - Cassandra.Client client = new Cassandra.Client(proto); + HThriftClient client = new HThriftClient(cassandraHost); try { - tr.open(); - return client.describe_cluster_name() != null; + client.open(); + return client.getCassandra().describe_cluster_name() != null; } catch (Exception e) { log.error("Downed Host retry failed attempt to verify CassandraHost", e); } finally { - tr.close(); + client.close(); } return false; } diff --git a/src/main/java/me/prettyprint/cassandra/connection/HConnectionManager.java b/src/main/java/me/prettyprint/cassandra/connection/HConnectionManager.java index 75bb0526f..ef6b2a7ce 100644 --- a/src/main/java/me/prettyprint/cassandra/connection/HConnectionManager.java +++ b/src/main/java/me/prettyprint/cassandra/connection/HConnectionManager.java @@ -37,7 +37,8 @@ public class HConnectionManager { LoggerFactory.getLogger("me.prettyprint.cassandra.hector.TimingLogger"); private final NonBlockingIdentityHashMap hostPools; - private final CassandraHostRetryService cassandraHostRetryService; + private CassandraHostRetryService cassandraHostRetryService; + private NodeAutoDiscoverService nodeAutoDiscoverService; private LoadBalancingPolicy loadBalancingPolicy = new LeastActiveBalancingPolicy(); private final ClockResolution clock; @@ -51,8 +52,13 @@ public HConnectionManager(CassandraHostConfigurator cassandraHostConfigurator) { hostPools = new NonBlockingIdentityHashMap(); for ( CassandraHost host : cassandraHostConfigurator.buildCassandraHosts() ) { hostPools.put(host,new ConcurrentHClientPool(host)); - } - cassandraHostRetryService = new CassandraHostRetryService(this, cassandraHostConfigurator); + } + if ( cassandraHostConfigurator.getRetryDownedHosts() ) { + cassandraHostRetryService = new CassandraHostRetryService(this, cassandraHostConfigurator); + } + if ( cassandraHostConfigurator.getAutoDiscoverHosts() ) { + nodeAutoDiscoverService = new NodeAutoDiscoverService(this, cassandraHostConfigurator); + } monitor = JmxMonitor.getInstance(this).getCassandraMonitor(); exceptionsTranslator = new ExceptionsTranslatorImpl(); } diff --git a/src/main/java/me/prettyprint/cassandra/connection/NodeAutoDiscoverService.java b/src/main/java/me/prettyprint/cassandra/connection/NodeAutoDiscoverService.java index d133b0f71..5c775c651 100644 --- a/src/main/java/me/prettyprint/cassandra/connection/NodeAutoDiscoverService.java +++ b/src/main/java/me/prettyprint/cassandra/connection/NodeAutoDiscoverService.java @@ -28,7 +28,7 @@ public class NodeAutoDiscoverService extends BackgroundCassandraHostService { public NodeAutoDiscoverService(HConnectionManager connectionManager, CassandraHostConfigurator cassandraHostConfigurator) { - super(connectionManager, cassandraHostConfigurator); + super(connectionManager, cassandraHostConfigurator); sf = executor.scheduleWithFixedDelay(new QueryRing(), this.retryDelayInSeconds,this.retryDelayInSeconds, TimeUnit.SECONDS); } @@ -42,7 +42,7 @@ void shutdown() { } public void applyRetryDelay() { - + // no op for now } class QueryRing implements Runnable { @@ -67,23 +67,24 @@ public void run() { } - private Set discoverNodes() { + public Set discoverNodes() { Set existingHosts = connectionManager.getHosts(); Set foundHosts = new HashSet(); - TTransport tr = cassandraHost.getUseThriftFramedTransport() ? - new TFramedTransport(new TSocket(cassandraHost.getHost(), cassandraHost.getPort(), 10)) : - new TSocket(cassandraHost.getHost(), cassandraHost.getPort(), 10); - TProtocol proto = new TBinaryProtocol(tr); - Cassandra.Client client = new Cassandra.Client(proto); + HThriftClient thriftClient = null; + try { - tr.open(); - List tokens = client.describe_ring("System"); + thriftClient = connectionManager.borrowClient(); + List tokens = thriftClient.getCassandra().describe_ring("System"); for (TokenRange tokenRange : tokens) { + if ( log.isDebugEnabled() ) { + log.debug("Looking over TokenRange {} for new hosts", tokenRange); + } List endpoints = tokenRange.getEndpoints(); for (String endpoint : endpoints) { CassandraHost foundHost = new CassandraHost(endpoint,cassandraHostConfigurator.getPort()); if ( !existingHosts.contains(foundHost) ) { + log.info("Found a node we don't know about {} for TokenRange {}", foundHost, tokenRange); foundHosts.add(foundHost); } } @@ -92,7 +93,7 @@ private Set discoverNodes() { } catch (Exception e) { //log.error("Downed Host retry failed attempt to verify CassandraHost", e); } finally { - tr.close(); + connectionManager.releaseClient(thriftClient); } return foundHosts; } diff --git a/src/main/java/me/prettyprint/cassandra/service/AbstractCluster.java b/src/main/java/me/prettyprint/cassandra/service/AbstractCluster.java index 7466fcc9d..65497f6cd 100644 --- a/src/main/java/me/prettyprint/cassandra/service/AbstractCluster.java +++ b/src/main/java/me/prettyprint/cassandra/service/AbstractCluster.java @@ -60,9 +60,7 @@ public AbstractCluster(String clusterName, CassandraHostConfigurator cassandraHo configurator = cassandraHostConfigurator; failoverPolicy = FailoverPolicy.ON_FAIL_TRY_ALL_AVAILABLE; cassandraClientMonitor = JmxMonitor.getInstance(connectionManager).getCassandraMonitor(); - xtrans = new ExceptionsTranslatorImpl(); - // if auto discover, then new NodeAutoDiscoveryService - + xtrans = new ExceptionsTranslatorImpl(); } public HConnectionManager getConnectionManager() { diff --git a/src/main/java/me/prettyprint/cassandra/service/CassandraHostConfigurator.java b/src/main/java/me/prettyprint/cassandra/service/CassandraHostConfigurator.java index d47ac1dcc..5079560ba 100644 --- a/src/main/java/me/prettyprint/cassandra/service/CassandraHostConfigurator.java +++ b/src/main/java/me/prettyprint/cassandra/service/CassandraHostConfigurator.java @@ -17,7 +17,7 @@ public final class CassandraHostConfigurator { private ExhaustedPolicy exhaustedPolicy; private ClockResolution clockResolution = ClockResolution.MICROSECONDS; private boolean useThriftFramedTransport = CassandraHost.DEFAULT_USE_FRAMED_THRIFT_TRANSPORT; - private boolean retryDownedHosts = false; + private boolean retryDownedHosts = true; private boolean autoDiscoverHosts = false; private int retryDownedHostsQueueSize = CassandraHostRetryService.DEF_QUEUE_SIZE; private int retryDownedHostsDelayInSeconds = CassandraHostRetryService.DEF_RETRY_DELAY; @@ -195,7 +195,7 @@ public void setClockResolution(ClockResolution clockResolution) { /** * @return the autoDiscoverHosts */ - public boolean isAutoDiscoverHosts() { + public boolean getAutoDiscoverHosts() { return autoDiscoverHosts; }