Skip to content

Commit

Permalink
Make all cluster operations go through CassandraCluster and by that a…
Browse files Browse the repository at this point in the history
…dd fault tolerance. A few other small goodies aside
  • Loading branch information
rantav committed Jun 14, 2010
1 parent 11c9f96 commit 1bf8ab5
Show file tree
Hide file tree
Showing 11 changed files with 167 additions and 196 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ Changes by version:
0.6.0-15
========
Add a few more public settings to pass over to GenericObjectPool: lifo, minEvictableIdleTimeMillis and timeBetweenEvictionRunsMillis (contributed by B. Todd Burruss)

Remove some unused meta calls from CassandraClient, such as getConfigFile and getStringProperty
Make the calls of other meta API go through CassandraCluster and by that use an improved version of them in thrift and failover (http://github.com/rantav/hector/issues#issue/11 and http://github.com/rantav/hector/issues#issue/7 and http://github.com/rantav/hector/issues#issue/29)

0.6.0-14
========
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package me.prettyprint.cassandra.service;

import java.util.List;
import java.util.Set;

import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.NotFoundException;
import org.apache.thrift.TException;

import java.util.List;
import java.util.Map;
import java.util.Set;


/**
* Client object, a high level handle to the remove cassandra service.
Expand Down Expand Up @@ -90,16 +89,6 @@ Keyspace getKeyspace(String keyspaceName, ConsistencyLevel consistencyLevel, Fai
throws IllegalArgumentException, NotFoundException, TException;



/**
* Gets a string property from the server, such as:
* "cluster name": cluster name;
* "config file" : all config file content, if need you can try to explain it.
* "token map" : get the token map from local gossip protocal.
*/
String getStringProperty(String propertyName) throws TException;


/**
* @return all keyspaces name of this client.
*/
Expand All @@ -112,20 +101,15 @@ Keyspace getKeyspace(String keyspaceName, ConsistencyLevel consistencyLevel, Fai
String getClusterName() throws TException;

/**
* Gets the token map with an option to refresh the value from cassandra.
* If fresh is false, a local cached value may be returned.
* Gets the list of known hosts.
*
* @param fresh Whether to query cassandra remote host for an up to date value, or to serve
* a possibly cached value.
* @return a map from tokens to hosts.
*/
Map<String, String> getTokenMap(boolean fresh) throws TException;


/**
* @return config file content.
* @throws Exception
* @throws PoolExhaustedException
* @throws IllegalStateException
*/
String getConfigFile() throws TException;
List<String> getKnownHosts(boolean fresh) throws TException, IllegalStateException, PoolExhaustedException, Exception;

/**
* @return Server version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,19 @@ public CassandraClientFactory(String url, int port) {
}

public CassandraClient create() throws TTransportException, TException, UnknownHostException {
CassandraClient c = new CassandraClientImpl(createThriftClient(url, port),
new KeyspaceFactory(clientMonitor), url, port, pool, timestampResolution);
log.debug("Creating client {} (thread={})", c, Thread.currentThread().getName());
CassandraClient c;
try {
c = new CassandraClientImpl(createThriftClient(url, port),
new KeyspaceFactory(clientMonitor), url, port, pool,
CassandraClusterFactory.INSTANCE.create(pool, url + ":" + port), timestampResolution);
} catch (PoolExhaustedException e) {
// TODO(ran): replace this runtime exception with HectorException etc.
throw new RuntimeException(e);
} catch (Exception e) {
// TODO(ran): replace this runtime exception with HectorException etc.
throw new RuntimeException(e);
}
log.debug("Creating client {}", c);
return c;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -14,7 +13,6 @@
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.NotFoundException;
import org.apache.cassandra.thrift.TokenRange;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -27,14 +25,7 @@
*/
/*package*/ class CassandraClientImpl implements CassandraClient {

private final static String PROP_CLUSTER_NAME = "cluster name";
private final static String PROP_CONFIG_FILE = "config file";
@SuppressWarnings("unused")
private final static String PROP_TOKEN_MAP = "token map";
@SuppressWarnings("unused")
private final static String PROP_KEYSPACE = "keyspaces";
private final static String PROP_VERSION = "version";

private static final Logger log = LoggerFactory.getLogger(CassandraClientImpl.class);

/** Serial number of the client used to track client creation for debug purposes */
Expand All @@ -55,10 +46,6 @@

private String clusterName;

private Map<String, String> tokenMap;

private String configFile;

private String serverVersion;

private final KeyspaceFactory keyspaceFactory;
Expand All @@ -68,7 +55,10 @@
private final String url;
private final String ip;

private final CassandraClientPool clientPools;
private final CassandraClientPool cassandraClientPool;

/** An instance of the cluster object used to manage meta-operations */
private final CassandraCluster cassandraCluster;

/** Has the client network connection been closed? */
private boolean closed = false;
Expand All @@ -80,7 +70,11 @@
private boolean released = false;

public CassandraClientImpl(Cassandra.Client cassandraThriftClient,
KeyspaceFactory keyspaceFactory, String url, int port, CassandraClientPool clientPools,
KeyspaceFactory keyspaceFactory,
String url,
int port,
CassandraClientPool clientPools,
CassandraCluster cassandraCluster,
TimestampResolution timestampResolution)
throws UnknownHostException {
this.mySerial = serial.incrementAndGet();
Expand All @@ -89,8 +83,9 @@ public CassandraClientImpl(Cassandra.Client cassandraThriftClient,
this.port = port;
this.url = url;
ip = getIpString(url);
this.clientPools = clientPools;
this.cassandraClientPool = clientPools;
this.timestampResolution = timestampResolution;
this.cassandraCluster = cassandraCluster;
}

private static String getIpString(String url) throws UnknownHostException {
Expand All @@ -99,21 +94,12 @@ private static String getIpString(String url) throws UnknownHostException {

@Override
public String getClusterName() throws TException {
// TODO replace with meta data API
if (clusterName == null) {
clusterName = getStringProperty(PROP_CLUSTER_NAME);
clusterName = cassandraCluster.getClusterName();
}
return clusterName;
}

@Override
public String getConfigFile() throws TException {
if (configFile == null) {
configFile = getStringProperty(PROP_CONFIG_FILE);
}
return configFile;
}

@Override
public Keyspace getKeyspace(String keySpaceName) throws IllegalArgumentException,
NotFoundException, TException {
Expand All @@ -136,7 +122,7 @@ public Keyspace getKeyspace(String keyspaceName, ConsistencyLevel consistencyLev
if (getKeyspaces().contains(keyspaceName)) {
Map<String, Map<String, String>> keyspaceDesc = cassandra.describe_keyspace(keyspaceName);
keyspace = (KeyspaceImpl) keyspaceFactory.create(this, keyspaceName, keyspaceDesc,
consistencyLevel, failoverPolicy, clientPools);
consistencyLevel, failoverPolicy, cassandraClientPool);
KeyspaceImpl tmp = keyspaceMap.putIfAbsent(keyspaceMapKey , keyspace);
if (tmp != null) {
// There was another put that got here before we did.
Expand All @@ -158,38 +144,17 @@ public List<String> getKeyspaces() throws TException {
return keyspaces;
}

@Override
public String getStringProperty(String propertyName) throws TException {
// TODO remove
return cassandra.get_string_property(propertyName);
}

// TODO(ran): fix exception types
@Override
public Map<String, String> getTokenMap(boolean fresh) throws TException {
if (tokenMap == null || fresh) {
tokenMap = new HashMap<String, String>();
List<String> keyspaces = getKeyspaces();
for (String keyspace : keyspaces) {
if ( keyspace.equals("system") )
continue;
List<TokenRange> tokenRanges = cassandra.describe_ring(keyspace);
for (TokenRange tokenRange : tokenRanges) {
for(String host : tokenRange.getEndpoints()) {
log.debug("token start: {} end: {} host: {}",
new Object[]{tokenRange.getStart_token(), tokenRange.getEnd_token(), host});
tokenMap.put(tokenRange.getStart_token() + "-" + tokenRange.getEnd_token(), host);
}
}
}

}
return tokenMap;
public List<String> getKnownHosts(boolean fresh) throws IllegalStateException, PoolExhaustedException, Exception {
return cassandraCluster.getKnownHosts(fresh);
}

@Override
public String getServerVersion() throws TException {
if (serverVersion == null) {
serverVersion = getStringProperty(PROP_VERSION);
serverVersion = cassandraCluster.describeThriftVersion();
}
return serverVersion;
}
Expand Down Expand Up @@ -228,7 +193,6 @@ public String getUrl() {

@Override
public void updateKnownHosts() throws TException {
// TODO rebuild to use meta API
if (closed) {
return;
}
Expand Down Expand Up @@ -263,7 +227,6 @@ public boolean isClosed() {

@Override
public Set<String> getKnownHosts() {
// TODO update to use META API
Set<String> hosts = new HashSet<String>();
if (closed) {
return hosts;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,30 @@ public interface CassandraCluster {
*/
List<TokenRange> describeRing(String keyspace) throws CassandraClusterException;

/**
* Return a Set of hostnames for this cluster
*/
Set<String> getHostNames() throws CassandraClusterException;

/**
* Describe the given keyspace. The key for the outer map is the ColumnFamily name.
* The inner map contains configuration properties mapped to their values.
*/
Map<String, Map<String, String>> describeKeyspace(String keyspace)
throws CassandraClusterException;

/**
* Queries the cluster for its name and returns it.
* @return
*/
String getClusterName();

/**
* Gets the list of known hosts.
* This method is not failover-safe. If will fail fast if the contacted host is down.
*
* @param fresh whether the get fresh list of hosts or reuse the possibly previous value cached.
* @return
* @throws IllegalStateException
* @throws PoolExhaustedException
* @throws Exception
*/
List<String> getKnownHosts(boolean fresh) throws IllegalStateException, PoolExhaustedException,
Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ public static CassandraClusterFactory getInstance() {
private CassandraClusterFactory() {
}

public CassandraCluster create(CassandraClientPool cassandraClientPool)
public CassandraCluster create(CassandraClientPool cassandraClientPool, String preferredClientUrl)
throws PoolExhaustedException, Exception {
return new CassandraClusterImpl(cassandraClientPool);
return new CassandraClusterImpl(cassandraClientPool, preferredClientUrl);
}

}
Loading

0 comments on commit 1bf8ab5

Please sign in to comment.