Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into SliceFilter
Browse files Browse the repository at this point in the history
  • Loading branch information
Shane Perry committed Mar 4, 2013
2 parents 520c640 + 21ac921 commit 66a10cc
Show file tree
Hide file tree
Showing 17 changed files with 325 additions and 8 deletions.
2 changes: 1 addition & 1 deletion README
Expand Up @@ -12,7 +12,7 @@ Hector is the greatest warrior in the greek mythology, Troy's builder and brothe
http://en.wikipedia.org/wiki/Hector http://en.wikipedia.org/wiki/Hector
http://en.wikipedia.org/wiki/Cassandra http://en.wikipedia.org/wiki/Cassandra


Hector is currently in use on a number of production systems some of which have node counts into the hundreds. Issues generally are fixed as quickly as possbile and releases done frequently. Hector is currently in use on a number of production systems some of which have node counts into the hundreds. Issues generally are fixed as quickly as possible and releases done frequently.


Some features provided by this client: Some features provided by this client:


Expand Down
4 changes: 4 additions & 0 deletions core/pom.xml
Expand Up @@ -121,6 +121,10 @@
<dependency> <dependency>
<groupId>com.ecyrd.speed4j</groupId> <groupId>com.ecyrd.speed4j</groupId>
<artifactId>speed4j</artifactId> <artifactId>speed4j</artifactId>
</dependency>
<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-core</artifactId>
</dependency> </dependency>
<!-- Note the optional tag. If you need to use fastinfoset serialization, you must include this dependency in your project! --> <!-- Note the optional tag. If you need to use fastinfoset serialization, you must include this dependency in your project! -->
<dependency> <dependency>
Expand Down
Expand Up @@ -71,7 +71,23 @@ public HClient borrowClient() throws HectorException {
int currentActiveClients = activeClientsCount.incrementAndGet(); int currentActiveClients = activeClientsCount.incrementAndGet();


try { try {

if (cassandraClient != null) {
if (cassandraClient.getCassandraHost().getMaxLastSuccessTimeMillis() > 0
&& cassandraClient.getLastSuccessTime() > 0
&& System.currentTimeMillis() - cassandraClient.getLastSuccessTime() > cassandraClient.getCassandraHost().getMaxLastSuccessTimeMillis()) {
log.info("Closing connection to {} due to too long idle time of {} ms", cassandraClient.getCassandraHost().getHost(),
System.currentTimeMillis() - cassandraClient.getLastSuccessTime());
cassandraClient.close();
cassandraClient = null;
}
if (cassandraClient.getCassandraHost().getMaxConnectTimeMillis() > 0
&& System.currentTimeMillis() - cassandraClient.getCreatedTime() > cassandraClient.getCassandraHost().getMaxConnectTimeMillis()) {
log.info("Closing connection to {} due to too long existence time of {} ms", cassandraClient.getCassandraHost().getHost(),
System.currentTimeMillis() - cassandraClient.getCreatedTime());
cassandraClient.close();
cassandraClient = null;
}
}
if ( cassandraClient == null ) { if ( cassandraClient == null ) {


if (currentActiveClients <= cassandraHost.getMaxActive()) { if (currentActiveClients <= cassandraHost.getMaxActive()) {
Expand Down
Expand Up @@ -235,7 +235,7 @@ public List<String> getStatusPerPool() {




public void operateWithFailover(Operation<?> op) throws HectorException { public void operateWithFailover(Operation<?> op) throws HectorException {
final Object timerToken = timer.start(); final Object timerToken = timer.start(op.stopWatchTagName);
int retries = Math.min(op.failoverPolicy.numRetries, hostPools.size()); int retries = Math.min(op.failoverPolicy.numRetries, hostPools.size());
HClient client = null; HClient client = null;
HClientPool pool = null; HClientPool pool = null;
Expand All @@ -257,6 +257,7 @@ public void operateWithFailover(Operation<?> op) throws HectorException {


op.executeAndSetResult(c, pool.getCassandraHost()); op.executeAndSetResult(c, pool.getCassandraHost());
success = true; success = true;
client.updateLastSuccessTime();
timer.stop(timerToken, op.stopWatchTagName, true); timer.stop(timerToken, op.stopWatchTagName, true);
break; break;


Expand Down
Expand Up @@ -11,7 +11,7 @@ public interface HOpTimer {
* @return - a token that will be returned to the timer when stop(...) in * @return - a token that will be returned to the timer when stop(...) in
* invoked * invoked
*/ */
Object start(); Object start(String tagName);


/** /**
* *
Expand Down
@@ -0,0 +1,46 @@
package me.prettyprint.cassandra.connection;

import java.util.concurrent.TimeUnit;

import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
import com.yammer.metrics.core.Timer;
import com.yammer.metrics.core.TimerContext;

public class MetricsOpTimer implements HOpTimer {

private static final String TIMER_TYPE = "hector";
private final MetricsRegistry metricsRegistry;
private final TimeUnit durationUnit;
private final TimeUnit rateUnit;
private final String clusterName;

public MetricsOpTimer(final MetricsRegistry metricsRegistry, final String clusterName,
final TimeUnit durationUnit, final TimeUnit rateUnit) {
this.metricsRegistry = metricsRegistry;
this.clusterName = clusterName;
this.durationUnit = durationUnit;
this.rateUnit = rateUnit;
}

public MetricsOpTimer(final String clusterName) {
this.metricsRegistry = new MetricsRegistry();
this.clusterName = clusterName;
this.durationUnit = TimeUnit.NANOSECONDS;
this.rateUnit = TimeUnit.SECONDS;
}

@Override
public Object start(final String tagName) {
final Timer timer = metricsRegistry.newTimer(new MetricName(clusterName, TIMER_TYPE, tagName),
durationUnit, rateUnit);
return timer.time();
}

@Override
public void stop(final Object token, final String tagName, final boolean success) {
final TimerContext timerContext = (TimerContext) token;
timerContext.stop();
}

}
Expand Up @@ -7,7 +7,7 @@ public class NullOpTimer implements HOpTimer, Serializable {
private static final long serialVersionUID = -4762728985083933452L; private static final long serialVersionUID = -4762728985083933452L;


@Override @Override
public Object start() { public Object start(String tagName) {
return this; return this;
} }


Expand Down
Expand Up @@ -16,7 +16,7 @@ public SpeedForJOpTimer(String clusterName) {
} }


@Override @Override
public Object start() { public Object start(String tagName) {
return stopWatchFactory.getStopWatch(); return stopWatchFactory.getStopWatch();
} }


Expand Down
Expand Up @@ -17,6 +17,12 @@
* *
*/ */
public interface HClient { public interface HClient {
/**
* Returns the time that this HClient was created.
*
* @return the time this client was created
*/
long getCreatedTime();


/** /**
* Returns a new Cassandra.Client on each invocation using the underlying * Returns a new Cassandra.Client on each invocation using the underlying
Expand Down Expand Up @@ -111,4 +117,16 @@ public interface HClient {
*/ */
void clearAuthentication(); void clearAuthentication();


/**
* Retrieves the time of the last success in milliseconds.
*
* @return -1 if no successful operation has already happened, or the time
* of the last success in milliseconds.
*/
long getLastSuccessTime();

/**
* Update the time of the last success with the current time.
*/
void updateLastSuccessTime();
} }
Expand Up @@ -36,6 +36,7 @@
* <p> * <p>
*/ */
public class HThriftClient implements HClient { public class HThriftClient implements HClient {
private long createdTime = System.currentTimeMillis();


private static Logger log = LoggerFactory.getLogger(HThriftClient.class); private static Logger log = LoggerFactory.getLogger(HThriftClient.class);


Expand All @@ -53,6 +54,8 @@ public class HThriftClient implements HClient {
protected TTransport transport; protected TTransport transport;
protected Cassandra.Client cassandraClient; protected Cassandra.Client cassandraClient;
private TSSLTransportParameters params; private TSSLTransportParameters params;

private volatile long lastSuccessTime;


private final Map<String, String> credentials = new HashMap<String, String>(); private final Map<String, String> credentials = new HashMap<String, String>();


Expand Down Expand Up @@ -287,4 +290,27 @@ public void setAuthenticated(Map<String, String> credentials) {
clearAuthentication(); clearAuthentication();
this.credentials.putAll(credentials); this.credentials.putAll(credentials);
} }

/**
* {@inheritDoc}
*/
public long getCreatedTime() {
return createdTime;
}

/**
* {@inheritDoc}
*/
@Override
public long getLastSuccessTime() {
return lastSuccessTime;
}

/**
* {@inheritDoc}
*/
@Override
public void updateLastSuccessTime() {
lastSuccessTime = System.currentTimeMillis();
}
} }
Expand Up @@ -40,6 +40,16 @@ public final class CassandraHost {
public static final long DEFAULT_MAX_WAITTIME_WHEN_EXHAUSTED = -1; public static final long DEFAULT_MAX_WAITTIME_WHEN_EXHAUSTED = -1;


public static final boolean DEFAULT_LIFO = true; public static final boolean DEFAULT_LIFO = true;
/**
* The default number of milliseconds (since creation time) we'll allow a connection
* to stay open. Default value is negative which means indefinitely.
*/
public static final long DEFAULT_MAX_CONNECT_TIME = -1;
/**
* The default number of milliseconds (since last success) we'll allow a connection
* to stay open. Default value is negative which means indefinitely.
*/
public static final long DEFAULT_MAX_LAST_SUCCESS_TIME = -1;


private final String host, ip, url; private final String host, ip, url;
private final int port; private final int port;
Expand All @@ -54,6 +64,8 @@ public final class CassandraHost {
private boolean useThriftFramedTransport = DEFAULT_USE_FRAMED_THRIFT_TRANSPORT; private boolean useThriftFramedTransport = DEFAULT_USE_FRAMED_THRIFT_TRANSPORT;
private int maxFrameSize = DEFAULT_MAX_FRAME_SIZE; private int maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
private boolean useSocketKeepalive; private boolean useSocketKeepalive;
private long maxConnectTimeMillis = DEFAULT_MAX_CONNECT_TIME;
private long maxLastSuccessTimeMillis = DEFAULT_MAX_LAST_SUCCESS_TIME;
//TODO(ran): private FailoverPolicy failoverPolicy = DEFAULT_FAILOVER_POLICY; //TODO(ran): private FailoverPolicy failoverPolicy = DEFAULT_FAILOVER_POLICY;


public CassandraHost(String url) { public CassandraHost(String url) {
Expand Down Expand Up @@ -204,5 +216,21 @@ public void setUseSocketKeepalive(boolean useSocketKeepalive) {
this.useSocketKeepalive = useSocketKeepalive; this.useSocketKeepalive = useSocketKeepalive;
} }


public long getMaxConnectTimeMillis() {
return this.maxConnectTimeMillis ;
}

public void setMaxConnectTimeMillis(long maxConnectTimeMillis) {
this.maxConnectTimeMillis = maxConnectTimeMillis;
}

public long getMaxLastSuccessTimeMillis() {
return this.maxLastSuccessTimeMillis;
}

public void setMaxLastSuccessTimeMillis(long maxLastSuccessTimeMillis) {
this.maxLastSuccessTimeMillis = maxLastSuccessTimeMillis;
}



} }
Expand Up @@ -51,12 +51,18 @@ public final class CassandraHostConfigurator implements Serializable {
private boolean useSocketKeepalive = false; private boolean useSocketKeepalive = false;
private HOpTimer opTimer = new NullOpTimer(); private HOpTimer opTimer = new NullOpTimer();
private Class<? extends HClientFactory> clientFactoryClass = HThriftClientFactoryImpl.class; private Class<? extends HClientFactory> clientFactoryClass = HThriftClientFactoryImpl.class;

private long maxConnectTimeMillis = CassandraHost.DEFAULT_MAX_CONNECT_TIME;
private long maxLastSuccessTimeMillis = CassandraHost.DEFAULT_MAX_LAST_SUCCESS_TIME;


public CassandraHostConfigurator() { public CassandraHostConfigurator() {
this.hosts = null; this.hosts = null;
} }


/**
* Creates a new {@code CassandraHostConfigurator} from the specified hosts String, formatted as
* {@code host[:port][,host[:port]...]}.
* @param hosts The hosts to create {@link CassandraHost}s from.
*/
public CassandraHostConfigurator(String hosts) { public CassandraHostConfigurator(String hosts) {
this.hosts = hosts; this.hosts = hosts;
} }
Expand All @@ -83,13 +89,20 @@ public void applyConfig(CassandraHost cassandraHost) {
cassandraHost.setUseThriftFramedTransport(useThriftFramedTransport); cassandraHost.setUseThriftFramedTransport(useThriftFramedTransport);
cassandraHost.setMaxFrameSize(maxFrameSize); cassandraHost.setMaxFrameSize(maxFrameSize);
cassandraHost.setUseSocketKeepalive(useSocketKeepalive); cassandraHost.setUseSocketKeepalive(useSocketKeepalive);
cassandraHost.setMaxConnectTimeMillis(maxConnectTimeMillis);
cassandraHost.setMaxLastSuccessTimeMillis(maxLastSuccessTimeMillis);


// this is special as it can be passed in as a system property // this is special as it can be passed in as a system property
if (cassandraThriftSocketTimeout > 0) { if (cassandraThriftSocketTimeout > 0) {
cassandraHost.setCassandraThriftSocketTimeout(cassandraThriftSocketTimeout); cassandraHost.setCassandraThriftSocketTimeout(cassandraThriftSocketTimeout);
} }
} }


/**
* Specifies the hosts String, formatted as
* {@code host[:port][,host[:port]...]}.
* @param hosts The hosts to create {@link CassandraHost}s from.
*/
public void setHosts(String hosts) { public void setHosts(String hosts) {
this.hosts = hosts; this.hosts = hosts;
} }
Expand Down Expand Up @@ -351,4 +364,25 @@ public void setClientFactoryClass(String cls) {
public Class<? extends HClientFactory> getClientFactoryClass() { public Class<? extends HClientFactory> getClientFactoryClass() {
return clientFactoryClass; return clientFactoryClass;
} }

/**
* The maximum time in milliseconds that we'll allow a connection to stay open to a host. A negative
* value indicates indefinitely (and is the default).
*
* @return the number of milliseconds
*/
public long getMaxConnectTimeMillis() {
return maxConnectTimeMillis;
}

/**
* Set the maximum time in milliseconds that we'll allow a connection to stay open to a host. A negative
* value indicates indefinitely. This setting is useful if you you need to work around a firewall that
* forcefully closes connections after a fixed amount of time regardless of activity.
*
* @param maxConnectTimeMillis the maximum time to use a connection
*/
public void setMaxConnectTimeMillis(long maxConnectTimeMillis) {
this.maxConnectTimeMillis = maxConnectTimeMillis;
}
} }
@@ -0,0 +1,70 @@
/**
* This class will instill 'normal' iterator behavior to a ColumnFamilyResult.
* Simply instantiate this class while passing your ColumnFamilyResult as a
* constructor argument.
*
* Ex.
*
* ColumnFamilyResultIterator myResultsInterator =
* new ColumnFamilyResultIterator(someColumnFamilyResult);
*
* You can then use myResultsInterator with for loops or iterate with a while loop
* just as with any standard java iterator.
*
*/
package me.prettyprint.cassandra.service.template;

import java.util.Iterator;

import me.prettyprint.cassandra.service.template.ColumnFamilyResult;

public class ColumnFamilyResultIterator implements Iterator<ColumnFamilyResult<?,?>> {
private ColumnFamilyResult<?, ?> res;
private boolean isStart = true;

public ColumnFamilyResultIterator(ColumnFamilyResult<?, ?> res) {
this.res = res;
}

public boolean hasNext()
{
boolean retval = false;
if (isStart)
{
retval = res.hasResults();
}
else
{
retval = res.hasNext();
}
return retval;
}

public ColumnFamilyResult<?, ?> getRes()
{
return res;
}

public void setRes(ColumnFamilyResult<?, ?> res)
{
this.res = res;
}

public ColumnFamilyResult<?, ?> next()
{
if (isStart)
{
isStart = false;
return res;
}
else
{
return (ColumnFamilyResult<?, ?>) res.next();
}
}

public void remove()
{
res.remove();
}
}

0 comments on commit 66a10cc

Please sign in to comment.