Permalink
Browse files

Add host selector strategy configuration

  • Loading branch information...
1 parent 33426a1 commit ac34f4c6bd51f53bd694310ca7a46f2d62feaa59 @rschildmeijer rschildmeijer committed Feb 28, 2013
@@ -19,6 +19,7 @@
import java.util.concurrent.ScheduledExecutorService;
import com.netflix.astyanax.AuthenticationCredentials;
+import com.netflix.astyanax.connectionpool.impl.HostSelectorStrategy;
import com.netflix.astyanax.partitioner.Partitioner;
public interface ConnectionPoolConfiguration {
@@ -79,6 +80,13 @@
*/
RetryBackoffStrategy getRetryBackoffStrategy();
+ /**
+ * @return Return the host selector strategy to use.
+ *
+ * @see com.netflix.astyanax.connectionpool.impl.HostSelectorStrategy
+ */
+ HostSelectorStrategy getHostSelectorStrategy();
+
/**
* @return List of comma delimited host:port combinations. If port is not provided
* then getPort() will be used by default. This list must contain at least
@@ -103,6 +103,7 @@
private String seeds = null;
private RetryBackoffStrategy hostRetryBackoffStrategy = null;
+ private HostSelectorStrategy hostSelectorStrategy = HostSelectorStrategy.ROUND_ROBIN;
private LatencyScoreStrategy latencyScoreStrategy = new EmptyLatencyScoreStrategyImpl();
private BadHostDetector badHostDetector = DEFAULT_BAD_HOST_DETECTOR;
private AuthenticationCredentials credentials = null;
@@ -308,6 +309,16 @@ public ConnectionPoolConfigurationImpl setRetryBackoffStrategy(RetryBackoffStrat
return this;
}
+ @Override
+ public HostSelectorStrategy getHostSelectorStrategy() {
+ return this.hostSelectorStrategy;
+ }
+
+ public ConnectionPoolConfigurationImpl setHostSelectorStrategy(HostSelectorStrategy hostSelectorStrategy) {
+ this.hostSelectorStrategy = hostSelectorStrategy;
+ return this;
+ }
+
/*
* (non-Javadoc)
*
@@ -0,0 +1,20 @@
+/*******************************************************************************
+ * Copyright 2011 Netflix
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package com.netflix.astyanax.connectionpool.impl;
+
+public enum HostSelectorStrategy {
+ ROUND_ROBIN, LEAST_OUTSTANDING
+}
@@ -0,0 +1,65 @@
+package com.netflix.astyanax.connectionpool.impl;
+
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import com.netflix.astyanax.connectionpool.Connection;
+import com.netflix.astyanax.connectionpool.ConnectionPoolConfiguration;
+import com.netflix.astyanax.connectionpool.ConnectionPoolMonitor;
+import com.netflix.astyanax.connectionpool.HostConnectionPool;
+import com.netflix.astyanax.connectionpool.Operation;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import com.netflix.astyanax.connectionpool.exceptions.NoAvailableHostsException;
+
+public class LeastOutstandingExecuteWithFailover<CL, R> extends AbstractExecuteWithFailoverImpl<CL, R> {
+ protected HostConnectionPool<CL> pool;
+ private int retryCountdown;
+ protected final List<HostConnectionPool<CL>> pools;
+ protected int waitDelta;
+ protected int waitMultiplier = 1;
+
+ public LeastOutstandingExecuteWithFailover(ConnectionPoolConfiguration config, ConnectionPoolMonitor monitor,
+ List<HostConnectionPool<CL>> pools) throws ConnectionException {
+ super(config, monitor);
+
+ this.pools = Lists.newArrayList(pools);
+
+ if (this.pools == null || this.pools.isEmpty()) {
+ throw new NoAvailableHostsException("No hosts to borrow from");
+ }
+
+ int size = this.pools.size();
+ retryCountdown = Math.min(config.getMaxFailoverCount(), size);
+ if (retryCountdown < 0)
+ retryCountdown = size;
+ else if (retryCountdown == 0)
+ retryCountdown = 1;
+
+ waitDelta = config.getMaxTimeoutWhenExhausted() / retryCountdown;
+ }
+
+ public boolean canRetry() {
+ return --retryCountdown > 0;
+ }
+
+ @Override
+ public HostConnectionPool<CL> getCurrentHostConnectionPool() {
+ return pool;
+ }
+
+ @Override
+ public Connection<CL> borrowConnection(Operation<CL, R> operation) throws ConnectionException {
+ // find the pool with the least outstanding (i.e most idle) active connections
+ Iterator<HostConnectionPool<CL>> iterator = this.pools.iterator();
+ HostConnectionPool eligible = iterator.next();
+ while (iterator.hasNext()) {
+ HostConnectionPool<CL> candidate = iterator.next();
+ if (candidate.getIdleConnectionCount() > eligible.getIdleConnectionCount()) {
+ eligible = candidate;
+ }
+ }
+ return eligible.borrowConnection(waitDelta * waitMultiplier);
+ }
+
+}
@@ -76,8 +76,21 @@ public TokenAwareConnectionPoolImpl(ConnectionPoolConfiguration configuration, C
if (index > MAX_RR_COUNTER) {
roundRobinCounter.set(0);
}
-
- return new RoundRobinExecuteWithFailover<CL, R>(config, monitor, pools, isSorted ? 0 : index);
+
+ AbstractExecuteWithFailoverImpl executeWithFailover = null;
+ switch (config.getHostSelectorStrategy()) {
+ case ROUND_ROBIN:
+ executeWithFailover = new RoundRobinExecuteWithFailover<CL, R>(config, monitor, pools, isSorted ? 0 : index);
+ break;
+ case LEAST_OUTSTANDING:
+ executeWithFailover = new LeastOutstandingExecuteWithFailover<CL, R>(config, monitor, pools);
+ break;
+ default:
+ executeWithFailover = new RoundRobinExecuteWithFailover<CL, R>(config, monitor, pools, isSorted ? 0 : index);
+ break;
+
+ }
+ return executeWithFailover;
}
catch (ConnectionException e) {
monitor.incOperationFailure(e.getHost(), e);

0 comments on commit ac34f4c

Please sign in to comment.