Skip to content

Commit

Permalink
Merge branch 'issue-296-kerberos'
Browse files Browse the repository at this point in the history
Conflicts:
	core/src/main/java/me/prettyprint/cassandra/connection/CassandraHostRetryService.java
  • Loading branch information
patricioe committed Oct 27, 2011
2 parents 14bf13a + 4d91e44 commit 08149a0
Show file tree
Hide file tree
Showing 24 changed files with 762 additions and 85 deletions.
Expand Up @@ -8,6 +8,8 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import me.prettyprint.cassandra.connection.client.HClient;
import me.prettyprint.cassandra.connection.factory.HClientFactory;
import me.prettyprint.cassandra.service.CassandraHost;
import me.prettyprint.cassandra.service.CassandraHostConfigurator;
import me.prettyprint.cassandra.service.ThriftCluster;
Expand All @@ -26,11 +28,16 @@ public class CassandraHostRetryService extends BackgroundCassandraHostService {

public static final int DEF_QUEUE_SIZE = -1;
public static final int DEF_RETRY_DELAY = 10;

private final HClientFactory clientFactory;
private final LinkedBlockingQueue<CassandraHost> downedHostQueue;

public CassandraHostRetryService(HConnectionManager connectionManager,
public CassandraHostRetryService(HConnectionManager connectionManager, HClientFactory clientFactory,
CassandraHostConfigurator cassandraHostConfigurator) {

super(connectionManager, cassandraHostConfigurator);
this.clientFactory = clientFactory;

this.retryDelayInSeconds = cassandraHostConfigurator.getRetryDownedHostsDelayInSeconds();
downedHostQueue = new LinkedBlockingQueue<CassandraHost>(cassandraHostConfigurator.getRetryDownedHostsQueueSize() < 1
? Integer.MAX_VALUE : cassandraHostConfigurator.getRetryDownedHostsQueueSize());
Expand Down Expand Up @@ -170,7 +177,7 @@ private boolean verifyConnection(CassandraHost cassandraHost) {
return false;
}
boolean found = false;
HThriftClient client = new HThriftClient(cassandraHost);
HClient client = clientFactory.createClient(cassandraHost);
try {
client.open();
found = client.getCassandra().describe_cluster_name() != null;
Expand Down
Expand Up @@ -7,11 +7,14 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import me.prettyprint.cassandra.connection.client.HClient;
import me.prettyprint.cassandra.connection.client.HThriftClient;
import me.prettyprint.cassandra.connection.factory.HClientFactory;
import me.prettyprint.cassandra.service.CassandraHost;
import me.prettyprint.hector.api.exceptions.HInactivePoolException;
import me.prettyprint.hector.api.exceptions.HPoolExhaustedException;
import me.prettyprint.hector.api.exceptions.HectorException;
import me.prettyprint.hector.api.exceptions.HectorTransportException;
import me.prettyprint.hector.api.exceptions.HPoolExhaustedException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -20,7 +23,7 @@ public class ConcurrentHClientPool implements HClientPool {

private static final Logger log = LoggerFactory.getLogger(ConcurrentHClientPool.class);

private final ArrayBlockingQueue<HThriftClient> availableClientQueue;
private final ArrayBlockingQueue<HClient> availableClientQueue;
private final AtomicInteger activeClientsCount;
private final AtomicInteger realActiveClientsCount;

Expand All @@ -32,10 +35,13 @@ public class ConcurrentHClientPool implements HClientPool {

private final long maxWaitTimeWhenExhausted;

public ConcurrentHClientPool(CassandraHost host) {
private final HClientFactory clientFactory;

public ConcurrentHClientPool(HClientFactory clientFactory, CassandraHost host) {
this.clientFactory = clientFactory;
this.cassandraHost = host;

availableClientQueue = new ArrayBlockingQueue<HThriftClient>(cassandraHost.getMaxActive(), true);
availableClientQueue = new ArrayBlockingQueue<HClient>(cassandraHost.getMaxActive(), true);
// This counter can be offset by as much as the number of threads.
activeClientsCount = new AtomicInteger(0);
realActiveClientsCount = new AtomicInteger(0);
Expand All @@ -45,7 +51,7 @@ public ConcurrentHClientPool(CassandraHost host) {
maxWaitTimeWhenExhausted = cassandraHost.getMaxWaitTimeWhenExhausted() < 0 ? 0 : cassandraHost.getMaxWaitTimeWhenExhausted();

for (int i = 0; i < cassandraHost.getMaxActive() / 3; i++) {
availableClientQueue.add(new HThriftClient(cassandraHost).open());
availableClientQueue.add(createClient());
}

if ( log.isDebugEnabled() ) {
Expand All @@ -58,12 +64,12 @@ public ConcurrentHClientPool(CassandraHost host) {


@Override
public HThriftClient borrowClient() throws HectorException {
public HClient borrowClient() throws HectorException {
if ( !active.get() ) {
throw new HInactivePoolException("Attempt to borrow on in-active pool: " + getName());
}

HThriftClient cassandraClient = availableClientQueue.poll();
HClient cassandraClient = availableClientQueue.poll();
int currentActiveClients = activeClientsCount.incrementAndGet();

try {
Expand Down Expand Up @@ -92,8 +98,8 @@ public HThriftClient borrowClient() throws HectorException {
}


private HThriftClient waitForConnection() {
HThriftClient cassandraClient = null;
private HClient waitForConnection() {
HClient cassandraClient = null;
numBlocked.incrementAndGet();

// blocked take on the queue if we are configured to wait forever
Expand Down Expand Up @@ -138,16 +144,13 @@ private HThriftClient waitForConnection() {
* having to wait on polling logic. (But still increment all the counters)
* @return
*/
private HThriftClient createClient() {
if ( log.isDebugEnabled() ) {
log.debug("Creation of new client");
}
return new HThriftClient(cassandraHost).open();
private HClient createClient() {
return clientFactory.createClient(cassandraHost).open();
}

/**
* Controlled shutdown of pool. Go through the list of available clients
* in the queue and call {@link HThriftClient#close()} on each. Toggles
* in the queue and call {@link HClient#close()} on each. Toggles
* a flag to indicate we are going into shutdown mode. Any subsequent calls
* will throw an IllegalArgumentException.
*
Expand All @@ -159,11 +162,11 @@ public void shutdown() {
throw new IllegalArgumentException("shutdown() called for inactive pool: " + getName());
}
log.info("Shutdown triggered on {}", getName());
Set<HThriftClient> clients = new HashSet<HThriftClient>();
Set<HClient> clients = new HashSet<HClient>();
availableClientQueue.drainTo(clients);
if ( clients.size() > 0 ) {
for (HThriftClient hThriftClient : clients) {
hThriftClient.close();
for (HClient hClient : clients) {
hClient.close();
}
}
log.info("Shutdown complete on {}", getName());
Expand Down Expand Up @@ -225,7 +228,7 @@ public String getStatusAsString() {
}

@Override
public void releaseClient(HThriftClient client) throws HectorException {
public void releaseClient(HClient client) throws HectorException {
boolean open = client.isOpen();
if ( open ) {
if ( active.get() ) {
Expand Down Expand Up @@ -257,7 +260,7 @@ public void releaseClient(HThriftClient client) throws HectorException {
* are releasing at the same time).
* @param client
*/
private void addClientToPoolGently(HThriftClient client) {
private void addClientToPoolGently(HClient client) {
try {
availableClientQueue.add(client);
} catch (IllegalStateException ise) {
Expand Down
Expand Up @@ -12,6 +12,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import me.prettyprint.cassandra.connection.factory.HClientFactory;
import me.prettyprint.cassandra.service.CassandraHost;
import me.prettyprint.cassandra.utils.DaemonThreadPoolFactory;

Expand Down Expand Up @@ -118,8 +119,8 @@ public int compare(HClientPool p1, HClientPool p2) {
}

@Override
public HClientPool createConnection(CassandraHost host) {
LatencyAwareHClientPool pool = new LatencyAwareHClientPool(host);
public HClientPool createConnection(HClientFactory clientFactory, CassandraHost host) {
LatencyAwareHClientPool pool = new LatencyAwareHClientPool(clientFactory, host);
add(pool);
return pool;
}
Expand Down
@@ -1,15 +1,16 @@
package me.prettyprint.cassandra.connection;

import me.prettyprint.cassandra.connection.client.HClient;
import me.prettyprint.cassandra.service.CassandraHost;
import me.prettyprint.hector.api.exceptions.HectorException;

public interface HClientPool extends PoolMetric {
public HThriftClient borrowClient() throws HectorException;
public HClient borrowClient() throws HectorException;
public CassandraHost getCassandraHost();
public int getNumBeforeExhausted();
public boolean isExhausted();
public int getMaxActive();
public String getStatusAsString();
public void releaseClient(HThriftClient client) throws HectorException;
public void releaseClient(HClient client) throws HectorException;
void shutdown();
}
Expand Up @@ -9,6 +9,11 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import me.prettyprint.cassandra.connection.client.HClient;
import me.prettyprint.cassandra.connection.client.HThriftClient;
import me.prettyprint.cassandra.connection.factory.HClientFactory;
import me.prettyprint.cassandra.connection.factory.HClientFactoryProvider;
import me.prettyprint.cassandra.connection.factory.HThriftClientFactoryImpl;
import me.prettyprint.cassandra.service.CassandraClientMonitor;
import me.prettyprint.cassandra.service.CassandraClientMonitor.Counter;
import me.prettyprint.cassandra.service.CassandraHost;
Expand Down Expand Up @@ -44,6 +49,7 @@ public class HConnectionManager {
private NodeAutoDiscoverService nodeAutoDiscoverService;
private final LoadBalancingPolicy loadBalancingPolicy;
private final CassandraHostConfigurator cassandraHostConfigurator;
private final HClientFactory clientFactory;
private HostTimeoutTracker hostTimeoutTracker;
private final ClockResolution clock;

Expand All @@ -52,17 +58,20 @@ public class HConnectionManager {
private HOpTimer timer;

public HConnectionManager(String clusterName, CassandraHostConfigurator cassandraHostConfigurator) {

clientFactory = HClientFactoryProvider.createFactory(cassandraHostConfigurator);

loadBalancingPolicy = cassandraHostConfigurator.getLoadBalancingPolicy();
clock = cassandraHostConfigurator.getClockResolution();
hostPools = new ConcurrentHashMap<CassandraHost, HClientPool>();
suspendedHostPools = new ConcurrentHashMap<CassandraHost, HClientPool>();
this.clusterName = clusterName;
if ( cassandraHostConfigurator.getRetryDownedHosts() ) {
cassandraHostRetryService = new CassandraHostRetryService(this, cassandraHostConfigurator);
cassandraHostRetryService = new CassandraHostRetryService(this, clientFactory, cassandraHostConfigurator);
}
for ( CassandraHost host : cassandraHostConfigurator.buildCassandraHosts()) {
try {
HClientPool hcp = loadBalancingPolicy.createConnection(host);
HClientPool hcp = loadBalancingPolicy.createConnection(clientFactory, host);
hostPools.put(host,hcp);
} catch (HectorTransportException hte) {
log.error("Could not start connection pool for host {}", host);
Expand Down Expand Up @@ -101,7 +110,7 @@ public boolean addCassandraHost(CassandraHost cassandraHost) {
HClientPool pool = null;
try {
cassandraHostConfigurator.applyConfig(cassandraHost);
pool = cassandraHostConfigurator.getLoadBalancingPolicy().createConnection(cassandraHost);
pool = cassandraHostConfigurator.getLoadBalancingPolicy().createConnection(clientFactory, cassandraHost);
hostPools.putIfAbsent(cassandraHost, pool);
log.info("Added host {} to pool", cassandraHost.getName());
return true;
Expand Down Expand Up @@ -213,7 +222,7 @@ public List<String> getStatusPerPool() {
public void operateWithFailover(Operation<?> op) throws HectorException {
final Object timerToken = timer.start();
int retries = Math.min(op.failoverPolicy.numRetries, hostPools.size());
HThriftClient client = null;
HClient client = null;
HClientPool pool = null;
boolean success = false;
boolean retryable = false;
Expand Down Expand Up @@ -293,7 +302,7 @@ public void operateWithFailover(Operation<?> op) throws HectorException {
}
}

private void closeClient(HThriftClient client) {
private void closeClient(HClient client) {
if ( client != null ) {
client.close();
}
Expand Down Expand Up @@ -345,11 +354,11 @@ private HClientPool getClientFromLBPolicy(Set<CassandraHost> excludeHosts) {
return loadBalancingPolicy.getPool(hostPoolValues, excludeHosts);
}

void releaseClient(HThriftClient client) {
void releaseClient(HClient client) {
if ( client == null ) return;
HClientPool pool = hostPools.get(client.cassandraHost);
HClientPool pool = hostPools.get(client.getCassandraHost());
if ( pool == null ) {
pool = suspendedHostPools.get(client.cassandraHost);
pool = suspendedHostPools.get(client.getCassandraHost());
}
if ( pool != null ) {
pool.releaseClient(client);
Expand All @@ -359,7 +368,7 @@ void releaseClient(HThriftClient client) {
}
}

HThriftClient borrowClient() {
HClient borrowClient() {
HClientPool pool = getClientFromLBPolicy(null);
if ( pool != null ) {
return pool.borrowClient();
Expand Down
Expand Up @@ -3,6 +3,9 @@
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;

import me.prettyprint.cassandra.connection.client.HClient;
import me.prettyprint.cassandra.connection.client.HThriftClient;
import me.prettyprint.cassandra.connection.factory.HClientFactory;
import me.prettyprint.cassandra.service.CassandraHost;
import me.prettyprint.hector.api.exceptions.HectorException;

Expand All @@ -20,20 +23,20 @@ public class LatencyAwareHClientPool extends ConcurrentHClientPool {
private static final double SENTINEL_COMPARE = 0.768;
private final LinkedBlockingDeque<Double> latencies;

public LatencyAwareHClientPool(CassandraHost host) {
super(host);
public LatencyAwareHClientPool(HClientFactory clientFactory, CassandraHost host) {
super(clientFactory, host);
latencies = new LinkedBlockingDeque<Double>(WINDOW_QUEUE_SIZE);
}

@Override
public HThriftClient borrowClient() throws HectorException {
HThriftClient client = super.borrowClient();
public HClient borrowClient() throws HectorException {
HClient client = super.borrowClient();
client.startToUse();
return client;
}

@Override
public void releaseClient(HThriftClient client) throws HectorException {
public void releaseClient(HClient client) throws HectorException {
add(client.getSinceLastUsed());
super.releaseClient(client);
}
Expand Down
@@ -1,5 +1,6 @@
package me.prettyprint.cassandra.connection;

import me.prettyprint.cassandra.connection.factory.HClientFactory;
import me.prettyprint.cassandra.service.CassandraHost;

import java.util.*;
Expand Down Expand Up @@ -55,7 +56,7 @@ public int compare(HClientPool o1, HClientPool o2) {
}

@Override
public HClientPool createConnection(CassandraHost host) {
return new ConcurrentHClientPool(host);
public HClientPool createConnection(HClientFactory clientFactory, CassandraHost host) {
return new ConcurrentHClientPool(clientFactory, host);
}
}
Expand Up @@ -2,12 +2,32 @@

import java.io.Serializable;
import java.util.Collection;
import java.util.List;
import java.util.Set;

import me.prettyprint.cassandra.connection.factory.HClientFactory;
import me.prettyprint.cassandra.service.CassandraHost;

/**
* Default interface for all load balancing policies.
*
*/
public interface LoadBalancingPolicy extends Serializable {

/**
* Retrieves a pool from the collection of <code>pools</code> excluding <code>excludeHosts</code>.
*
* @param pools collection of all available pools
* @param excludeHosts excluded pools
* @return a pool based on this load balancing policy
*/
HClientPool getPool(Collection<HClientPool> pools, Set<CassandraHost> excludeHosts);
HClientPool createConnection(CassandraHost host);

/**
* Creates a connection pool for <code>host</code>.
*
* @param clientFactory an instance of {@link HClientFactory}
* @param host an instance of {@link CassandraHost} representing the host this pool will represent
* @return a connection pool
*/
HClientPool createConnection(HClientFactory clientFactory, CassandraHost host);
}

0 comments on commit 08149a0

Please sign in to comment.