Navigation Menu

Skip to content

Commit

Permalink
More changes towards modular hector
Browse files Browse the repository at this point in the history
  • Loading branch information
patricioe committed Jun 6, 2011
1 parent cdbebc5 commit d6ebd22
Show file tree
Hide file tree
Showing 37 changed files with 336 additions and 190 deletions.
@@ -1,4 +1,4 @@
package me.prettyprint.cassandra.service;
package me.prettyprint.cassandra.connection;

import me.prettyprint.hector.api.exceptions.PoolExhaustedException;

Expand Down
@@ -0,0 +1,117 @@
package me.prettyprint.cassandra.connection;

import java.net.InetAddress;
import java.net.UnknownHostException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Encapsulates the information required for connecting to a Cassandra host.
* Also exposes pool configuration parameters for that host.
*
* @author zznate(nate@riptano.com)
*
*/
public interface HCassandraHost {


/**
* The default port number to which we will connect
*/
public static final int DEFAULT_PORT = 9160;

public static final int DEFAULT_MAX_ACTIVE = 50;

/**
* By default, we will use TSocket transport on thrift (matches default Cassandra configs)
*/
public static final boolean DEFAULT_USE_FRAMED_THRIFT_TRANSPORT = true;

/**
* The default max wait time when exhausted happens, default value is negative, which means
* it'll block indefinitely.
*/
public static final long DEFAULT_MAX_WAITTIME_WHEN_EXHAUSTED = -1;

/**
* The default max idle number determines how many idle connections may reside in the pool.
* If -1 then it's infinity.
*/
public static final int DEFAULT_MAX_IDLE = -1;

public static final boolean DEFAULT_LIFO = true;
public static final long DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLIS = 18000000;
public static final long DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS = -1;


public String getUrl();

/**
* Checks whether name resolution should occur.
*
* @return
*/
public boolean isPerformNameResolution();

public String getName();

public String getHost();

public String getIp();

public int getPort();

@Override
public String toString();

/**
* Returns true if the ip and port are equal
*/
@Override
public boolean equals(Object obj);

@Override
public int hashCode();

public int getMaxActive();

public void setMaxActive(int maxActive);

public int getMaxIdle();

public void setMaxIdle(int maxIdle);

public long getMaxWaitTimeWhenExhausted();

public void setMaxWaitTimeWhenExhausted(long maxWaitTimeWhenExhausted);

public ExhaustedPolicy getExhaustedPolicy();

public void setExhaustedPolicy(ExhaustedPolicy exhaustedPolicy);

public int getCassandraThriftSocketTimeout();

public void setCassandraThriftSocketTimeout(int cassandraThriftSocketTimeout);

public boolean getUseThriftFramedTransport();

public void setUseThriftFramedTransport(boolean useThriftFramedTransport);

public boolean getLifo();

public void setLifo(boolean lifo);

public long getMinEvictableIdleTimeMillis();

public void setMinEvictableIdleTimeMillis(long minEvictableIdleTimeMillis);

public long getTimeBetweenEvictionRunsMillis();

public void setTimeBetweenEvictionRunsMillis(long timeBetweenEvictionRunsMillis);

public boolean getUseSocketKeepalive();

public void setUseSocketKeepalive(boolean useSocketKeepalive);

}
@@ -1,11 +1,10 @@
package me.prettyprint.cassandra.connection;

import me.prettyprint.cassandra.service.CassandraHost;
import me.prettyprint.hector.api.exceptions.HectorException;

public interface HClientPool extends PoolMetric {
public HThriftClient borrowClient() throws HectorException;
public CassandraHost getCassandraHost();
public HCassandraHost getCassandraHost();
public int getNumBeforeExhausted();
public boolean isExhausted();
public int getMaxActive();
Expand Down
Expand Up @@ -25,7 +25,7 @@ public interface HConnectionManager {
* @param cassandraHost
* @return
*/
public boolean addCassandraHost(CassandraHost cassandraHost);
public boolean addCassandraHost(HCassandraHost cassandraHost);

/**
* Remove the {@link CassandraHost} from the pool, bypassing retry service. This
Expand All @@ -35,7 +35,7 @@ public interface HConnectionManager {
* suspended map.
* @param cassandraHost
*/
public boolean removeCassandraHost(CassandraHost cassandraHost);
public boolean removeCassandraHost(HCassandraHost cassandraHost);

/**
* Remove the {@link HClientPool} referenced by the {@link CassandraHost} from
Expand All @@ -44,37 +44,37 @@ public interface HConnectionManager {
* @param cassandraHost
* @return true if the operation was successful.
*/
public boolean suspendCassandraHost(CassandraHost cassandraHost);
public boolean suspendCassandraHost(HCassandraHost cassandraHost);

/**
* The opposite of suspendCassandraHost, places the pool back into selection
* @param cassandraHost
* @return true if this operation was successful. A no-op returning false
* if there was no such host in the underlying suspendedHostPool map.
*/
public boolean unsuspendCassandraHost(CassandraHost cassandraHost);
public boolean unsuspendCassandraHost(HCassandraHost cassandraHost);

/**
* Returns a Set of {@link CassandraHost} which are in the suspended status
* @return
*/
public Set<CassandraHost> getSuspendedCassandraHosts();
public Set<HCassandraHost> getSuspendedCassandraHosts();

public Set<CassandraHost> getHosts() ;
public Set<HCassandraHost> getHosts() ;

public List<String> getStatusPerPool();

public void operateWithFailover(Operation<?> op) throws HectorException;

private HClientPool getClientFromLBPolicy(Set<CassandraHost> excludeHosts);
public HClientPool getClientFromLBPolicy(Set<HCassandraHost> excludeHosts);

void releaseClient(HThriftClient client);

HThriftClient borrowClient();

void markHostAsDown(CassandraHost cassandraHost);
void markHostAsDown(HCassandraHost cassandraHost);

public Set<CassandraHost> getDownedHosts();
public Set<HCassandraHost> getDownedHosts();

public Collection<HClientPool> getActivePools();

Expand Down
@@ -0,0 +1,49 @@
package me.prettyprint.cassandra.connection;

import org.apache.cassandra.thrift.Cassandra;


public interface HThriftClient {

/**
* Returns a new Cassandra.Client on each invocation using the underlying transport
*
*/
public Cassandra.Client getCassandra();

public Cassandra.Client getCassandra(String keyspaceNameArg);

HThriftClient close();


HThriftClient open();


boolean isOpen();

/**
* If CassandraHost was not null we use {@link CassandraHost#getCassandraThriftSocketTimeout()}
* if it was greater than zero. Otherwise look for an environment
* variable name CASSANDRA_THRIFT_SOCKET_TIMEOUT value.
* If doesn't exist, returns 0.
* @param cassandraHost
*/
public int getTimeout(HCassandraHost cassandraHost);

public void startToUse();

/**
* @return Time in MS since it was used.
*/
public long getSinceLastUsed();

@Override
public String toString();

/**
* Compares the toString of these clients
*/
@Override
public boolean equals(Object obj);

}
@@ -1,4 +1,4 @@
package me.prettyprint.cassandra.service;
package me.prettyprint.hector;

/**
* System properties used by Hector.
Expand Down
@@ -1,10 +1,5 @@
package me.prettyprint.hector.api;

import me.prettyprint.cassandra.service.clock.MicrosecondsClockResolution;
import me.prettyprint.cassandra.service.clock.MicrosecondsSyncClockResolution;
import me.prettyprint.cassandra.service.clock.MillisecondsClockResolution;
import me.prettyprint.cassandra.service.clock.SecondsClockResolution;

import java.io.Serializable;


Expand Down
@@ -1,8 +1,10 @@
package me.prettyprint.cassandra.service;
package me.prettyprint.cassandra.connection;

import java.net.InetAddress;
import java.net.UnknownHostException;

import me.prettyprint.hector.SystemProperties;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -13,36 +15,9 @@
* @author zznate(nate@riptano.com)
*
*/
public final class CassandraHost {
private static Logger log = LoggerFactory.getLogger(CassandraHost.class);

/**
* The default port number to which we will connect
*/
public static final int DEFAULT_PORT = 9160;

public static final int DEFAULT_MAX_ACTIVE = 50;

/**
* By default, we will use TSocket transport on thrift (matches default Cassandra configs)
*/
public static final boolean DEFAULT_USE_FRAMED_THRIFT_TRANSPORT = true;

/**
* The default max wait time when exhausted happens, default value is negative, which means
* it'll block indefinitely.
*/
public static final long DEFAULT_MAX_WAITTIME_WHEN_EXHAUSTED = -1;

/**
* The default max idle number determines how many idle connections may reside in the pool.
* If -1 then it's infinity.
*/
public static final int DEFAULT_MAX_IDLE = -1;

public static final boolean DEFAULT_LIFO = true;
public static final long DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLIS = 18000000;
public static final long DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS = -1;
public class HCassandraHostImpl implements HCassandraHost {

private static Logger log = LoggerFactory.getLogger(HCassandraHostImpl.class);

private final String host, ip, url;
private final int port;
Expand All @@ -62,12 +37,12 @@ public final class CassandraHost {
private boolean useSocketKeepalive;
//TODO(ran): private FailoverPolicy failoverPolicy = DEFAULT_FAILOVER_POLICY;

public CassandraHost(String url) {
this(url, parsePortFromUrl(url));
public HCassandraHostImpl(String url) {
this(url, HostUtils.parsePortFromUrl(url));
}

public CassandraHost(String url2, int port) {
url2 = parseHostFromUrl(url2);
public HCassandraHostImpl(String url2, int port) {
url2 = HostUtils.parseHostFromUrl(url2);
this.port = port;
StringBuilder b = new StringBuilder();
InetAddress address;
Expand Down Expand Up @@ -134,11 +109,11 @@ public String toString() {
*/
@Override
public boolean equals(Object obj) {
if (! (obj instanceof CassandraHost)) {
if (! (obj instanceof HCassandraHost)) {
return false;
}
CassandraHost other = (CassandraHost) obj;
return other.ip.equals(ip) && other.port == port;
HCassandraHost other = (HCassandraHost) obj;
return other.getIp().equals(ip) && other.getPort() == port;
}

@Override
Expand Down Expand Up @@ -194,14 +169,6 @@ public void setUseThriftFramedTransport(boolean useThriftFramedTransport) {
this.useThriftFramedTransport = useThriftFramedTransport;
}

public static String parseHostFromUrl(String urlPort) {
return urlPort.lastIndexOf(':') > 0 ? urlPort.substring(0, urlPort.lastIndexOf(':')) : urlPort;
}

public static int parsePortFromUrl(String urlPort) {
return urlPort.lastIndexOf(':') > 0 ? Integer.valueOf(urlPort.substring(urlPort.lastIndexOf(':')+1, urlPort.length())) : DEFAULT_PORT;
}

public boolean getLifo() {
return lifo;
}
Expand Down

0 comments on commit d6ebd22

Please sign in to comment.