Skip to content

Commit

Permalink
Merge branch 'master' of github.com:rantav/hector
Browse files Browse the repository at this point in the history
  • Loading branch information
rantav committed Nov 21, 2010
2 parents 5fa0cd8 + d0a2e49 commit c45a87c
Show file tree
Hide file tree
Showing 19 changed files with 127 additions and 131 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -5,7 +5,7 @@
<artifactId>hector</artifactId>
<packaging>bundle</packaging>
<!-- The version follows Cassandra's major version changes, e.g. 0.5.1 goes with the 0.5.1 cassandra release-->
<version>0.7.0-19_11042010</version>
<version>0.7.0-19_11162010</version>
<name>hector</name>
<description>Cassandra Java Client Library</description>
<url>http://github.com/rantav/hector</url>
Expand Down
Expand Up @@ -16,7 +16,7 @@ public abstract class BackgroundCassandraHostService {
protected final CassandraHostConfigurator cassandraHostConfigurator;

protected ScheduledFuture sf;
protected int retryDelayInSeconds = DEF_RETRY_DELAY;
protected int retryDelayInSeconds;

public BackgroundCassandraHostService(HConnectionManager connectionManager,
CassandraHostConfigurator cassandraHostConfigurator) {
Expand All @@ -42,6 +42,5 @@ public int getRetryDelayInSeconds() {
public void setRetryDelayInSeconds(int retryDelayInSeconds) {
this.retryDelayInSeconds = retryDelayInSeconds;
}

public static final int DEF_RETRY_DELAY = 10;

}
Expand Up @@ -26,6 +26,7 @@ public class CassandraHostRetryService extends BackgroundCassandraHostService {
private static Logger log = LoggerFactory.getLogger(CassandraHostRetryService.class);

public static final int DEF_QUEUE_SIZE = 3;
public static final int DEF_RETRY_DELAY = 10;
private LinkedBlockingQueue<CassandraHost> downedHostQueue;

public CassandraHostRetryService(HConnectionManager connectionManager,
Expand Down
@@ -1,5 +1,6 @@
package me.prettyprint.cassandra.connection;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
Expand All @@ -22,21 +23,19 @@
import me.prettyprint.hector.api.exceptions.PoolExhaustedException;

import org.apache.cassandra.thrift.Cassandra;
import org.cliffc.high_scale_lib.NonBlockingIdentityHashMap;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.perf4j.StopWatch;
import org.perf4j.slf4j.Slf4JStopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import sun.security.action.GetLongAction;

public class HConnectionManager {

private static final Logger log = LoggerFactory.getLogger(HConnectionManager.class);
private static final Logger perf4jLogger =
LoggerFactory.getLogger("me.prettyprint.cassandra.hector.TimingLogger");

private final NonBlockingIdentityHashMap<CassandraHost,ConcurrentHClientPool> hostPools;
private final NonBlockingHashMap<CassandraHost,ConcurrentHClientPool> hostPools;
private CassandraHostRetryService cassandraHostRetryService;
private NodeAutoDiscoverService nodeAutoDiscoverService;
private LoadBalancingPolicy loadBalancingPolicy = new LeastActiveBalancingPolicy();
Expand All @@ -49,7 +48,7 @@ public class HConnectionManager {

public HConnectionManager(CassandraHostConfigurator cassandraHostConfigurator) {
clock = cassandraHostConfigurator.getClockResolution();
hostPools = new NonBlockingIdentityHashMap<CassandraHost, ConcurrentHClientPool>();
hostPools = new NonBlockingHashMap<CassandraHost, ConcurrentHClientPool>();
for ( CassandraHost host : cassandraHostConfigurator.buildCassandraHosts() ) {
hostPools.put(host,new ConcurrentHClientPool(host));
}
Expand All @@ -64,7 +63,7 @@ public HConnectionManager(CassandraHostConfigurator cassandraHostConfigurator) {
}

public void addCassandraHost(CassandraHost cassandraHost) {
if ( !hostPools.containsKey(cassandraHost) ) {
if ( !getHosts().contains(cassandraHost) ) {
hostPools.put(cassandraHost, new ConcurrentHClientPool(cassandraHost));
log.info("Added host {} to pool", cassandraHost.getName());
} else {
Expand Down Expand Up @@ -204,6 +203,10 @@ public Set<CassandraHost> getDownedHosts() {
return cassandraHostRetryService.getDownedHosts();
}

public Collection<ConcurrentHClientPool> getActivePools() {
return Collections.unmodifiableCollection(hostPools.values());
}

public void setLoadBalancingPolicy(LoadBalancingPolicy loadBalancingPolicy) {
this.loadBalancingPolicy = loadBalancingPolicy;
}
Expand Down
Expand Up @@ -56,7 +56,7 @@ HThriftClient close() {
transport.flush();

} catch (Exception e) {
log.error("Could not close transport in close for client" + toString(), e);
log.error("Could not flush transport (to be expected if the pool is shutting down) in close for client: " + toString(), e);
} finally {
try {
transport.close();
Expand Down
Expand Up @@ -6,7 +6,9 @@
import java.util.concurrent.TimeUnit;

import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.KsDef;
import org.apache.cassandra.thrift.TokenRange;
import org.apache.cassandra.thrift.Cassandra.Client;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
Expand All @@ -18,18 +20,25 @@
import me.prettyprint.cassandra.connection.CassandraHostRetryService.RetryRunner;
import me.prettyprint.cassandra.service.CassandraHost;
import me.prettyprint.cassandra.service.CassandraHostConfigurator;
import me.prettyprint.cassandra.service.Operation;
import me.prettyprint.cassandra.service.OperationType;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.exceptions.HectorException;


public class NodeAutoDiscoverService extends BackgroundCassandraHostService {

private static final Logger log = LoggerFactory.getLogger(NodeAutoDiscoverService.class);

private CassandraHost cassandraHost;
public static final int DEF_AUTO_DISCOVERY_DELAY = 30;


public NodeAutoDiscoverService(HConnectionManager connectionManager,
CassandraHostConfigurator cassandraHostConfigurator) {
super(connectionManager, cassandraHostConfigurator);
sf = executor.scheduleWithFixedDelay(new QueryRing(), this.retryDelayInSeconds,this.retryDelayInSeconds, TimeUnit.SECONDS);
super(connectionManager, cassandraHostConfigurator);
this.retryDelayInSeconds = cassandraHostConfigurator.getAutoDiscoveryDelayInSeconds();
sf = executor.scheduleWithFixedDelay(new QueryRing(), retryDelayInSeconds,retryDelayInSeconds, TimeUnit.SECONDS);
}

void shutdown() {
Expand Down Expand Up @@ -72,30 +81,32 @@ public Set<CassandraHost> discoverNodes() {
Set<CassandraHost> foundHosts = new HashSet<CassandraHost>();

HThriftClient thriftClient = null;
log.info("using existing hosts {}", existingHosts);
try {
thriftClient = connectionManager.borrowClient();
List<TokenRange> tokens = thriftClient.getCassandra().describe_ring("System");
for (TokenRange tokenRange : tokens) {
if ( log.isDebugEnabled() ) {
log.debug("Looking over TokenRange {} for new hosts", tokenRange);
}
List<String> endpoints = tokenRange.getEndpoints();
for (String endpoint : endpoints) {
CassandraHost foundHost = new CassandraHost(endpoint,cassandraHostConfigurator.getPort());
if ( !existingHosts.contains(foundHost) ) {
log.info("Found a node we don't know about {} for TokenRange {}", foundHost, tokenRange);
foundHosts.add(foundHost);

for (KsDef keyspace : thriftClient.getCassandra().describe_keyspaces()) {
if (!keyspace.getName().equals(Keyspace.KEYSPACE_SYSTEM)) {
List<TokenRange> tokenRanges = thriftClient.getCassandra().describe_ring(keyspace.getName());
for (TokenRange tokenRange : tokenRanges) {
for (String host : tokenRange.getEndpoints()) {
CassandraHost foundHost = new CassandraHost(host,cassandraHostConfigurator.getPort());
if ( !existingHosts.contains(foundHost) ) {
log.info("Found a node we don't know about {} for TokenRange {}", foundHost, tokenRange);
foundHosts.add(foundHost);
}
}
}
break;
}

}
}
} catch (Exception e) {
//log.error("Downed Host retry failed attempt to verify CassandraHost", e);
log.error("Downed Host retry failed attempt to verify CassandraHost", e);
} finally {
connectionManager.releaseClient(thriftClient);
}
return foundHosts;
}

}

Expand Up @@ -29,7 +29,7 @@
public final class ThriftMultigetSliceQuery<K, N, V> extends AbstractSliceQuery<K, N, V, Rows<K, N, V>>
implements MultigetSliceQuery<K, N, V> {

private Collection<K> keys;
private Iterable<K> keys;

public ThriftMultigetSliceQuery(Keyspace k,
Serializer<K> keySerializer,
Expand All @@ -43,6 +43,13 @@ public MultigetSliceQuery<K, N, V> setKeys(K... keys) {
this.keys = Arrays.asList(keys);
return this;
}

@Override
public MultigetSliceQuery<K, N, V> setKeys(Iterable<K> keys) {
this.keys = keys;
return this;
}


@Override
public QueryResult<Rows<K, N,V>> execute() {
Expand All @@ -53,11 +60,13 @@ public QueryResult<Rows<K, N,V>> execute() {
new KeyspaceOperationCallback<Rows<K, N,V>>() {
@Override
public Rows<K, N,V> doInKeyspace(KeyspaceService ks) throws HectorException {
List<K> keysList = new ArrayList<K>();
keysList.addAll(keys);
List<ByteBuffer> keysList = new ArrayList<ByteBuffer>();
for (K k : keys) {
keysList.add(keySerializer.toByteBuffer(k));
}
ColumnParent columnParent = new ColumnParent(columnFamilyName);
Map<K, List<Column>> thriftRet = keySerializer.fromBytesMap(
ks.multigetSlice(keySerializer.toBytesList(keysList), columnParent, getPredicate()));
ks.multigetSlice(keysList, columnParent, getPredicate()));
return new RowsImpl<K, N, V>(thriftRet, columnNameSerializer, valueSerializer);
}
}), this);
Expand Down
Expand Up @@ -26,6 +26,19 @@ public abstract class AbstractSerializer<T> implements Serializer<T> {
@Override
public abstract ByteBuffer toByteBuffer(T obj);

@Override
public byte[] toBytes(T obj) {
ByteBuffer bb = toByteBuffer(obj);
byte[] bytes = new byte[bb.remaining()];
bb.get(bytes, 0, bytes.length);
return bytes;
}

@Override
public T fromBytes(byte[] bytes) {
return fromByteBuffer(ByteBuffer.wrap(bytes));
}

/*
public ByteBuffer toByteBuffer(T obj) {
return ByteBuffer.wrap(toBytes(obj));
Expand Down
Expand Up @@ -40,9 +40,7 @@
*/
public abstract class AbstractCluster implements Cluster {

private final Logger log = LoggerFactory.getLogger(AbstractCluster.class);

protected static final String KEYSPACE_SYSTEM = "system";
private final Logger log = LoggerFactory.getLogger(AbstractCluster.class);

protected final HConnectionManager connectionManager;
private final String name;
Expand Down Expand Up @@ -81,23 +79,7 @@ public Set<CassandraHost> getKnownPoolHosts(boolean refresh) {
return knownPoolHosts;
}

@Override
public Set<String> getClusterHosts(boolean refresh) {
/* create an op
if (refresh || knownClusterHosts == null) {
CassandraClient client = borrowClient();
try {
knownClusterHosts = new HashSet<String>(buildHostNames(client.getCassandra()));
} finally {
releaseClient(client);
}
}
return knownClusterHosts;
*/
return null;
}

protected abstract Set<String> buildHostNames(Client cassandra);

/* (non-Javadoc)
* @see me.prettyprint.cassandra.service.Cluster#addHost(me.prettyprint.cassandra.service.CassandraHost, boolean)
Expand Down
@@ -1,5 +1,6 @@
package me.prettyprint.cassandra.service;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -89,12 +90,7 @@ public long getWriteFail() {

public void updateKnownHosts() throws HectorTransportException {
log.info("Updating all known cassandra hosts on all clients");
/*
TODO is this a noop given retry service?
for (ConcurrentHClientPool pool: pools) {
pool.updateKnownHosts();
}
*/

}


Expand All @@ -105,57 +101,54 @@ public long getNumPoolExhaustedEventCount() {

public Set<String> getExhaustedPoolNames() {
Set<String> ret = new HashSet<String>();
// TODO connectionManager...

return ret;
}


public int getNumActive() {
int ret = 0;
// TODO connectionmanager....
Collection<ConcurrentHClientPool> pools = connectionManager.getActivePools();
for (ConcurrentHClientPool concurrentHClientPool : pools) {
ret += concurrentHClientPool.getNumActive();
}
return ret;
}


public int getNumBlockedThreads() {
int ret = 0;
// TODO connectionManager...
Collection<ConcurrentHClientPool> pools = connectionManager.getActivePools();
for (ConcurrentHClientPool concurrentHClientPool : pools) {
ret += concurrentHClientPool.getNumBlockedThreads();
}
return ret;
}


public int getNumExhaustedPools() {
int ret = 0;
// TODO connectionManager...
return ret;
return connectionManager.getDownedHosts().size();
}


public int getNumIdleConnections() {
int ret = 0;
// TODO ?
Collection<ConcurrentHClientPool> pools = connectionManager.getActivePools();
for (ConcurrentHClientPool concurrentHClientPool : pools) {
ret += concurrentHClientPool.getNumIdle();
}
return ret;
}


public int getNumPools() {
int ret = 0;
// TODO connectionManager....
return ret;
return connectionManager.getHosts().size();
}


public Set<String> getPoolNames() {
Set<String> ret = new HashSet<String>();
// TODO connectionManager...
return ret;
}


public Set<CassandraHost> getKnownHosts() {
Set<CassandraHost> ret = new HashSet<CassandraHost>();
// TODO connectionManager...
return ret;
return connectionManager.getHosts();
}


Expand All @@ -167,13 +160,7 @@ public long getRecoverableTransportExceptionCount() {
public long getRecoverableErrorCount() {
return getRecoverableTimedOutCount() + getRecoverableTransportExceptionCount() +
getRecoverableUnavailableCount() + getRecoverableLoadBalancedConnectErrors();
}

/*
public void addPool(CassandraClientPool pool) {
// TODO no longer germain?
}
*/
}


public long getRecoverableLoadBalancedConnectErrors() {
Expand Down

0 comments on commit c45a87c

Please sign in to comment.