Skip to content

Commit

Permalink
Remove node from service when they are not in the
Browse files Browse the repository at this point in the history
ring#216.
  • Loading branch information
patricioe committed Oct 26, 2011
1 parent 0d401d3 commit 15390f9
Showing 1 changed file with 49 additions and 11 deletions.
Expand Up @@ -3,15 +3,20 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import me.prettyprint.cassandra.service.CassandraHost;
import me.prettyprint.cassandra.service.CassandraHostConfigurator;
import me.prettyprint.cassandra.service.ExceptionsTranslator;
import me.prettyprint.cassandra.service.ThriftCluster;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.ddl.KeyspaceDefinition;
import me.prettyprint.hector.api.exceptions.HectorTransportException;
import me.prettyprint.hector.api.factory.HFactory;

import org.apache.cassandra.thrift.TokenRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -22,12 +27,10 @@ public class CassandraHostRetryService extends BackgroundCassandraHostService {
public static final int DEF_QUEUE_SIZE = -1;
public static final int DEF_RETRY_DELAY = 10;
private final LinkedBlockingQueue<CassandraHost> downedHostQueue;
private final ExceptionsTranslator exceptionsTranslator;

public CassandraHostRetryService(HConnectionManager connectionManager,
CassandraHostConfigurator cassandraHostConfigurator) {
super(connectionManager, cassandraHostConfigurator);
this.exceptionsTranslator = connectionManager.exceptionsTranslator;
this.retryDelayInSeconds = cassandraHostConfigurator.getRetryDownedHostsDelayInSeconds();
downedHostQueue = new LinkedBlockingQueue<CassandraHost>(cassandraHostConfigurator.getRetryDownedHostsQueueSize() < 1
? Integer.MAX_VALUE : cassandraHostConfigurator.getRetryDownedHostsQueueSize());
Expand Down Expand Up @@ -102,13 +105,27 @@ public void run() {
if( downedHostQueue.isEmpty()) {
log.debug("Retry service fired... nothing to do.");
return;
}
}

// Let's check the ring just once per cycle.
Set<CassandraHost> ringInfo = buildRingInfo();

Iterator<CassandraHost> iter = downedHostQueue.iterator();
while( iter.hasNext() ) {
CassandraHost cassandraHost = iter.next();

if( cassandraHost == null ) {
continue;
}

// The host may have been removed from the ring. It makes no sense to keep trying
// to connect to it.
if (!ringInfo.contains(cassandraHost)) {
log.info("Removing host " + cassandraHost.getName() + " - It does no longer exist in the ring.");
iter.remove();
continue;
}

boolean reconnected = verifyConnection(cassandraHost);
log.info("Downed Host retry status {} with host: {}", reconnected, cassandraHost.getName());
if ( reconnected ) {
Expand All @@ -121,27 +138,48 @@ public void run() {
}
}
}


private Set<CassandraHost> buildRingInfo() {
Set<CassandraHost> ringInfo = new HashSet<CassandraHost>();

ThriftCluster cluster = (ThriftCluster) HFactory.getCluster(connectionManager.getClusterName());

for(KeyspaceDefinition keyspaceDefinition: cluster.describeKeyspaces()) {
if (!keyspaceDefinition.getName().equals(Keyspace.KEYSPACE_SYSTEM)) {
List<TokenRange> tokenRanges = cluster.describeRing(keyspaceDefinition.getName());
for (TokenRange tokenRange : tokenRanges) {
for (String host : tokenRange.getEndpoints()) {
CassandraHost aHost = new CassandraHost(host, cassandraHostConfigurator.getPort());
if (!ringInfo.contains(aHost) ) {
ringInfo.add(aHost);
}
}
}
break;
}
}

return ringInfo;
}
}


private boolean verifyConnection(CassandraHost cassandraHost) {
if ( cassandraHost == null ) {
return false;
}
boolean found = false;
HThriftClient client = new HThriftClient(cassandraHost);
try {

client.open();
found = client.getCassandra().describe_cluster_name() != null;
client.close();
} catch (HectorTransportException he) {
client.close();
} catch (HectorTransportException he) {
log.warn("Downed {} host still appears to be down: {}", cassandraHost, he.getMessage());
} catch (Exception ex) {

log.error("Downed Host retry failed attempt to verify CassandraHost", ex);

}
}
return found;
}
}

0 comments on commit 15390f9

Please sign in to comment.