Navigation Menu

Skip to content

Commit

Permalink
simplified connection on retry service, nodediscovery service is func…
Browse files Browse the repository at this point in the history
…tionalbut off by default
  • Loading branch information
zznate committed Nov 14, 2010
1 parent 1aa5740 commit 3e11c65
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 32 deletions.
Expand Up @@ -16,7 +16,7 @@ public abstract class BackgroundCassandraHostService {
protected final CassandraHostConfigurator cassandraHostConfigurator;

protected ScheduledFuture sf;
protected int retryDelayInSeconds;
protected int retryDelayInSeconds = DEF_RETRY_DELAY;

public BackgroundCassandraHostService(HConnectionManager connectionManager,
CassandraHostConfigurator cassandraHostConfigurator) {
Expand All @@ -42,4 +42,6 @@ public int getRetryDelayInSeconds() {
public void setRetryDelayInSeconds(int retryDelayInSeconds) {
this.retryDelayInSeconds = retryDelayInSeconds;
}

public static final int DEF_RETRY_DELAY = 10;

This comment has been minimized.

Copy link
@rantav

rantav Nov 14, 2010

Collaborator

Perhaps rename this to DEFAULT_RETRY_DELAY_SEC

}
Expand Up @@ -25,8 +25,7 @@ public class CassandraHostRetryService extends BackgroundCassandraHostService {

private static Logger log = LoggerFactory.getLogger(CassandraHostRetryService.class);

public static final int DEF_QUEUE_SIZE = 3;
public static final int DEF_RETRY_DELAY = 10;
public static final int DEF_QUEUE_SIZE = 3;
private LinkedBlockingQueue<CassandraHost> downedHostQueue;

public CassandraHostRetryService(HConnectionManager connectionManager,
Expand All @@ -35,7 +34,7 @@ public CassandraHostRetryService(HConnectionManager connectionManager,
this.retryDelayInSeconds = cassandraHostConfigurator.getRetryDownedHostsDelayInSeconds();
downedHostQueue = new LinkedBlockingQueue<CassandraHost>(cassandraHostConfigurator.getRetryDownedHostsQueueSize());
sf = executor.scheduleWithFixedDelay(new RetryRunner(), this.retryDelayInSeconds,this.retryDelayInSeconds, TimeUnit.SECONDS);

log.info("Downed Host Retry service started with queue size {} and retry delay {}s",
cassandraHostConfigurator.getRetryDownedHostsQueueSize(),
retryDelayInSeconds);
Expand Down Expand Up @@ -100,19 +99,14 @@ public void run() {

private boolean verifyConnection(CassandraHost cassandraHost) {
if ( cassandraHost == null ) return false;
TTransport tr = cassandraHost.getUseThriftFramedTransport() ?
new TFramedTransport(new TSocket(cassandraHost.getHost(), cassandraHost.getPort(), 10)) :
new TSocket(cassandraHost.getHost(), cassandraHost.getPort(), 10);

TProtocol proto = new TBinaryProtocol(tr);
Cassandra.Client client = new Cassandra.Client(proto);
HThriftClient client = new HThriftClient(cassandraHost);
try {
tr.open();
return client.describe_cluster_name() != null;
client.open();
return client.getCassandra().describe_cluster_name() != null;
} catch (Exception e) {
log.error("Downed Host retry failed attempt to verify CassandraHost", e);
} finally {
tr.close();
client.close();
}
return false;
}
Expand Down
Expand Up @@ -37,7 +37,8 @@ public class HConnectionManager {
LoggerFactory.getLogger("me.prettyprint.cassandra.hector.TimingLogger");

private final NonBlockingIdentityHashMap<CassandraHost,ConcurrentHClientPool> hostPools;
private final CassandraHostRetryService cassandraHostRetryService;
private CassandraHostRetryService cassandraHostRetryService;
private NodeAutoDiscoverService nodeAutoDiscoverService;
private LoadBalancingPolicy loadBalancingPolicy = new LeastActiveBalancingPolicy();

private final ClockResolution clock;
Expand All @@ -51,8 +52,13 @@ public HConnectionManager(CassandraHostConfigurator cassandraHostConfigurator) {
hostPools = new NonBlockingIdentityHashMap<CassandraHost, ConcurrentHClientPool>();
for ( CassandraHost host : cassandraHostConfigurator.buildCassandraHosts() ) {
hostPools.put(host,new ConcurrentHClientPool(host));
}
cassandraHostRetryService = new CassandraHostRetryService(this, cassandraHostConfigurator);
}
if ( cassandraHostConfigurator.getRetryDownedHosts() ) {

This comment has been minimized.

Copy link
@rantav

rantav Nov 14, 2010

Collaborator

minor: remove space after ( and before )

cassandraHostRetryService = new CassandraHostRetryService(this, cassandraHostConfigurator);
}
if ( cassandraHostConfigurator.getAutoDiscoverHosts() ) {
nodeAutoDiscoverService = new NodeAutoDiscoverService(this, cassandraHostConfigurator);
}
monitor = JmxMonitor.getInstance(this).getCassandraMonitor();
exceptionsTranslator = new ExceptionsTranslatorImpl();
}
Expand Down
Expand Up @@ -28,7 +28,7 @@ public class NodeAutoDiscoverService extends BackgroundCassandraHostService {

public NodeAutoDiscoverService(HConnectionManager connectionManager,
CassandraHostConfigurator cassandraHostConfigurator) {
super(connectionManager, cassandraHostConfigurator);
super(connectionManager, cassandraHostConfigurator);

This comment has been minimized.

Copy link
@rantav

rantav Nov 14, 2010

Collaborator

minor: trailing space. use anyedit if you're in eclipse...

sf = executor.scheduleWithFixedDelay(new QueryRing(), this.retryDelayInSeconds,this.retryDelayInSeconds, TimeUnit.SECONDS);
}

Expand All @@ -42,7 +42,7 @@ void shutdown() {
}

public void applyRetryDelay() {

// no op for now
}

class QueryRing implements Runnable {
Expand All @@ -67,23 +67,24 @@ public void run() {

}

private Set<CassandraHost> discoverNodes() {
public Set<CassandraHost> discoverNodes() {
Set<CassandraHost> existingHosts = connectionManager.getHosts();
Set<CassandraHost> foundHosts = new HashSet<CassandraHost>();
TTransport tr = cassandraHost.getUseThriftFramedTransport() ?
new TFramedTransport(new TSocket(cassandraHost.getHost(), cassandraHost.getPort(), 10)) :
new TSocket(cassandraHost.getHost(), cassandraHost.getPort(), 10);

TProtocol proto = new TBinaryProtocol(tr);
Cassandra.Client client = new Cassandra.Client(proto);
HThriftClient thriftClient = null;
try {
tr.open();
List<TokenRange> tokens = client.describe_ring("System");
thriftClient = connectionManager.borrowClient();
List<TokenRange> tokens = thriftClient.getCassandra().describe_ring("System");
for (TokenRange tokenRange : tokens) {
if ( log.isDebugEnabled() ) {
log.debug("Looking over TokenRange {} for new hosts", tokenRange);
}
List<String> endpoints = tokenRange.getEndpoints();
for (String endpoint : endpoints) {
CassandraHost foundHost = new CassandraHost(endpoint,cassandraHostConfigurator.getPort());
if ( !existingHosts.contains(foundHost) ) {
log.info("Found a node we don't know about {} for TokenRange {}", foundHost, tokenRange);
foundHosts.add(foundHost);
}
}
Expand All @@ -92,7 +93,7 @@ private Set<CassandraHost> discoverNodes() {
} catch (Exception e) {
//log.error("Downed Host retry failed attempt to verify CassandraHost", e);

This comment has been minimized.

Copy link
@rantav

rantav Nov 14, 2010

Collaborator

I would remove this commented out line and the catch (Exception ) one

} finally {
tr.close();
connectionManager.releaseClient(thriftClient);
}
return foundHosts;
}
Expand Down
Expand Up @@ -60,9 +60,7 @@ public AbstractCluster(String clusterName, CassandraHostConfigurator cassandraHo
configurator = cassandraHostConfigurator;
failoverPolicy = FailoverPolicy.ON_FAIL_TRY_ALL_AVAILABLE;
cassandraClientMonitor = JmxMonitor.getInstance(connectionManager).getCassandraMonitor();
xtrans = new ExceptionsTranslatorImpl();
// if auto discover, then new NodeAutoDiscoveryService

xtrans = new ExceptionsTranslatorImpl();
}

public HConnectionManager getConnectionManager() {
Expand Down
Expand Up @@ -17,7 +17,7 @@ public final class CassandraHostConfigurator {
private ExhaustedPolicy exhaustedPolicy;
private ClockResolution clockResolution = ClockResolution.MICROSECONDS;
private boolean useThriftFramedTransport = CassandraHost.DEFAULT_USE_FRAMED_THRIFT_TRANSPORT;
private boolean retryDownedHosts = false;
private boolean retryDownedHosts = true;
private boolean autoDiscoverHosts = false;
private int retryDownedHostsQueueSize = CassandraHostRetryService.DEF_QUEUE_SIZE;
private int retryDownedHostsDelayInSeconds = CassandraHostRetryService.DEF_RETRY_DELAY;
Expand Down Expand Up @@ -195,7 +195,7 @@ public void setClockResolution(ClockResolution clockResolution) {
/**
* @return the autoDiscoverHosts
*/
public boolean isAutoDiscoverHosts() {
public boolean getAutoDiscoverHosts() {
return autoDiscoverHosts;
}

Expand Down

0 comments on commit 3e11c65

Please sign in to comment.