Skip to content

Commit

Permalink
Merge pull request #1371 from mattrjacobs/allow-excess-threads-to-be-…
Browse files Browse the repository at this point in the history
…terminated

Add threadPool maximumSize configuration
  • Loading branch information
mattrjacobs committed Oct 5, 2016
2 parents 069249c + c6fb4e1 commit ae725f5
Show file tree
Hide file tree
Showing 10 changed files with 273 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ private static void writeCommandConfigJson(JsonGenerator json, HystrixCommandKey
private static void writeThreadPoolConfigJson(JsonGenerator json, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolConfiguration threadPoolConfig) throws IOException {
json.writeObjectFieldStart(threadPoolKey.name());
json.writeNumberField("coreSize", threadPoolConfig.getCoreSize());
json.writeNumberField("maximumSize", threadPoolConfig.getMaximumSize());
json.writeNumberField("maxQueueSize", threadPoolConfig.getMaxQueueSize());
json.writeNumberField("queueRejectionThreshold", threadPoolConfig.getQueueRejectionThreshold());
json.writeNumberField("keepAliveTimeInMinutes", threadPoolConfig.getKeepAliveTimeInMinutes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler;
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisherFactory;
import com.netflix.hystrix.strategy.properties.HystrixPropertiesFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Scheduler;
import rx.functions.Func0;

Expand Down Expand Up @@ -158,6 +160,8 @@ public interface HystrixThreadPool {
* @ThreadSafe
*/
/* package */static class HystrixThreadPoolDefault implements HystrixThreadPool {
private static final Logger logger = LoggerFactory.getLogger(HystrixThreadPoolDefault.class);

private final HystrixThreadPoolProperties properties;
private final BlockingQueue<Runnable> queue;
private final ThreadPoolExecutor threadPool;
Expand All @@ -171,7 +175,7 @@ public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThrea
this.queue = concurrencyStrategy.getBlockingQueue(queueSize);
this.metrics = HystrixThreadPoolMetrics.getInstance(
threadPoolKey,
concurrencyStrategy.getThreadPool(threadPoolKey, properties.coreSize(), properties.coreSize(), properties.keepAliveTimeMinutes(), TimeUnit.MINUTES, queue),
concurrencyStrategy.getThreadPool(threadPoolKey, properties.coreSize(), properties.maximumSize(), properties.keepAliveTimeMinutes(), TimeUnit.MINUTES, queue),
properties);
this.threadPool = metrics.getThreadPool();

Expand Down Expand Up @@ -205,11 +209,19 @@ public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
// allow us to change things via fast-properties by setting it each time
private void touchConfig() {
final int dynamicCoreSize = properties.coreSize().get();
int dynamicMaximumSize = properties.maximumSize().get();

if (dynamicMaximumSize < dynamicCoreSize) {
logger.error("Hystrix ThreadPool configuration for : " + metrics.getThreadPoolKey().name() + " is using coreSize = " +
dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " +
dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
dynamicMaximumSize = dynamicCoreSize;
}

// In JDK 6, setCorePoolSize and setMaximumPoolSize will execute a lock operation. Avoid them if the pool size is not changed.
if (threadPool.getCorePoolSize() != dynamicCoreSize) {
if (threadPool.getCorePoolSize() != dynamicCoreSize || threadPool.getMaximumPoolSize() != dynamicMaximumSize) {
threadPool.setCorePoolSize(dynamicCoreSize);
threadPool.setMaximumPoolSize(dynamicCoreSize); // we always want maxSize the same as coreSize, we are not using a dynamically resizing pool
threadPool.setMaximumPoolSize(dynamicMaximumSize);
}

threadPool.setKeepAliveTime(properties.keepAliveTimeMinutes().get(), TimeUnit.MINUTES); // this doesn't really matter since we're not resizing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
*/
package com.netflix.hystrix;

import static com.netflix.hystrix.strategy.properties.HystrixPropertiesChainedProperty.forBoolean;
import static com.netflix.hystrix.strategy.properties.HystrixPropertiesChainedProperty.forInteger;
import static com.netflix.hystrix.strategy.properties.HystrixPropertiesChainedProperty.forString;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
Expand All @@ -32,19 +30,33 @@
* Properties for instances of {@link HystrixThreadPool}.
* <p>
* Default implementation of methods uses Archaius (https://github.com/Netflix/archaius)
*
* Note a change in behavior in 1.5.7. Prior to that version, the configuration for 'coreSize' was used to control
* both coreSize and maximumSize. This is a fixed-size threadpool that can never give up an unused thread. In 1.5.7+,
* the values can diverge, and if you set coreSize < maximumSize, threads can be given up (subject to the keep-alive
* time)
*
* It is OK to leave maximumSize unset using any version of Hystrix. If you do, then maximum size will default to
* core size and you'll have a fixed-size threadpool.
*
* If you accidentally set maximumSize < coreSize, then maximum will be raised to coreSize
* (this prioritizes keeping extra threads around rather than inducing threadpool rejections)
*/
public abstract class HystrixThreadPoolProperties {



/* defaults */
private Integer default_coreSize = 10; // size of thread pool
private Integer default_keepAliveTimeMinutes = 1; // minutes to keep a thread alive (though in practice this doesn't get used as by default we set a fixed size)
private Integer default_maxQueueSize = -1; // size of queue (this can't be dynamically changed so we use 'queueSizeRejectionThreshold' to artificially limit and reject)
static int default_coreSize = 10; // core size of thread pool
static int default_keepAliveTimeMinutes = 1; // minutes to keep a thread alive (though in practice this doesn't get used as by default we set a fixed size)
static int default_maxQueueSize = -1; // size of queue (this can't be dynamically changed so we use 'queueSizeRejectionThreshold' to artificially limit and reject)
// -1 turns if off and makes us use SynchronousQueue
private Integer default_queueSizeRejectionThreshold = 5; // number of items in queue
private Integer default_threadPoolRollingNumberStatisticalWindow = 10000; // milliseconds for rolling number
private Integer default_threadPoolRollingNumberStatisticalWindowBuckets = 10; // number of buckets in rolling number (10 1-second buckets)
static int default_queueSizeRejectionThreshold = 5; // number of items in queue
static int default_threadPoolRollingNumberStatisticalWindow = 10000; // milliseconds for rolling number
static int default_threadPoolRollingNumberStatisticalWindowBuckets = 10; // number of buckets in rolling number (10 1-second buckets)

private final HystrixProperty<Integer> corePoolSize;
private final HystrixProperty<Integer> maximumPoolSize;
private final HystrixProperty<Integer> keepAliveTime;
private final HystrixProperty<Integer> maxQueueSize;
private final HystrixProperty<Integer> queueSizeRejectionThreshold;
Expand All @@ -60,7 +72,13 @@ protected HystrixThreadPoolProperties(HystrixThreadPoolKey key, Setter builder)
}

protected HystrixThreadPoolProperties(HystrixThreadPoolKey key, Setter builder, String propertyPrefix) {
//we allow maximum pool size to be configured lower than core size here
//however, at runtime, if this configuration gets applied, we will always ensure that maximumSize >= coreSize
this.corePoolSize = getProperty(propertyPrefix, key, "coreSize", builder.getCoreSize(), default_coreSize);

//if left unset, maxiumumSize will default to coreSize
this.maximumPoolSize = getProperty(propertyPrefix, key, "maximumSize", builder.getMaximumSize(), corePoolSize.get());

this.keepAliveTime = getProperty(propertyPrefix, key, "keepAliveTimeMinutes", builder.getKeepAliveTimeMinutes(), default_keepAliveTimeMinutes);
this.maxQueueSize = getProperty(propertyPrefix, key, "maxQueueSize", builder.getMaxQueueSize(), default_maxQueueSize);
this.queueSizeRejectionThreshold = getProperty(propertyPrefix, key, "queueSizeRejectionThreshold", builder.getQueueSizeRejectionThreshold(), default_queueSizeRejectionThreshold);
Expand All @@ -84,6 +102,15 @@ public HystrixProperty<Integer> coreSize() {
return corePoolSize;
}

/**
* Maximum thread-pool size that gets passed to {@link ThreadPoolExecutor#setMaximumPoolSize(int)}
*
* @return {@code HystrixProperty<Integer>}
*/
public HystrixProperty<Integer> maximumSize() {
return maximumPoolSize;
}

/**
* Keep-alive time in minutes that gets passed to {@link ThreadPoolExecutor#setKeepAliveTime(long, TimeUnit)}
*
Expand Down Expand Up @@ -169,6 +196,7 @@ public static Setter defaultSetter() {
*/
public static class Setter {
private Integer coreSize = null;
private Integer maximumSize = null;
private Integer keepAliveTimeMinutes = null;
private Integer maxQueueSize = null;
private Integer queueSizeRejectionThreshold = null;
Expand All @@ -182,6 +210,10 @@ public Integer getCoreSize() {
return coreSize;
}

public Integer getMaximumSize() {
return maximumSize;
}

public Integer getKeepAliveTimeMinutes() {
return keepAliveTimeMinutes;
}
Expand All @@ -207,6 +239,11 @@ public Setter withCoreSize(int value) {
return this;
}

public Setter withMaximumSize(int value) {
this.maximumSize = value;
return this;
}

public Setter withKeepAliveTimeMinutes(int value) {
this.keepAliveTimeMinutes = value;
return this;
Expand All @@ -232,65 +269,8 @@ public Setter withMetricsRollingStatisticalWindowBuckets(int value) {
return this;
}

/**
* Base properties for unit testing.
*/
/* package */static Setter getUnitTestPropertiesBuilder() {
return new Setter()
.withCoreSize(10)// size of thread pool
.withKeepAliveTimeMinutes(1)// minutes to keep a thread alive (though in practice this doesn't get used as by default we set a fixed size)
.withMaxQueueSize(100)// size of queue (but we never allow it to grow this big ... this can't be dynamically changed so we use 'queueSizeRejectionThreshold' to artificially limit and reject)
.withQueueSizeRejectionThreshold(10)// number of items in queue at which point we reject (this can be dyamically changed)
.withMetricsRollingStatisticalWindowInMilliseconds(10000)// milliseconds for rolling number
.withMetricsRollingStatisticalWindowBuckets(10);// number of buckets in rolling number (10 1-second buckets)
}

/**
* Return a static representation of the properties with values from the Builder so that UnitTests can create properties that are not affected by the actual implementations which pick up their
* values dynamically.
*
* @param builder builder for a {@link HystrixThreadPoolProperties}
* @return HystrixThreadPoolProperties
*/
/* package */static HystrixThreadPoolProperties asMock(final Setter builder) {
return new HystrixThreadPoolProperties(TestThreadPoolKey.TEST) {

@Override
public HystrixProperty<Integer> coreSize() {
return HystrixProperty.Factory.asProperty(builder.coreSize);
}

@Override
public HystrixProperty<Integer> keepAliveTimeMinutes() {
return HystrixProperty.Factory.asProperty(builder.keepAliveTimeMinutes);
}

@Override
public HystrixProperty<Integer> maxQueueSize() {
return HystrixProperty.Factory.asProperty(builder.maxQueueSize);
}

@Override
public HystrixProperty<Integer> queueSizeRejectionThreshold() {
return HystrixProperty.Factory.asProperty(builder.queueSizeRejectionThreshold);
}

@Override
public HystrixProperty<Integer> metricsRollingStatisticalWindowInMilliseconds() {
return HystrixProperty.Factory.asProperty(builder.rollingStatisticalWindowInMilliseconds);
}

@Override
public HystrixProperty<Integer> metricsRollingStatisticalWindowBuckets() {
return HystrixProperty.Factory.asProperty(builder.rollingStatisticalWindowBuckets);
}

};

}

private static enum TestThreadPoolKey implements HystrixThreadPoolKey {
TEST
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,19 @@ public class HystrixThreadPoolConfiguration {
private static final String VERSION = "1";
private final HystrixThreadPoolKey threadPoolKey;
private final int coreSize;
private final int maximumSize;
private final int maxQueueSize;
private final int queueRejectionThreshold;
private final int keepAliveTimeInMinutes;
private final int rollingCounterNumberOfBuckets;
private final int rollingCounterBucketSizeInMilliseconds;

public HystrixThreadPoolConfiguration(HystrixThreadPoolKey threadPoolKey, int coreSize, int maxQueueSize, int queueRejectionThreshold,
public HystrixThreadPoolConfiguration(HystrixThreadPoolKey threadPoolKey, int coreSize, int maximumSize, int maxQueueSize, int queueRejectionThreshold,
int keepAliveTimeInMinutes, int rollingCounterNumberOfBuckets,
int rollingCounterBucketSizeInMilliseconds) {
this.threadPoolKey = threadPoolKey;
this.coreSize = coreSize;
this.maximumSize = maximumSize;
this.maxQueueSize = maxQueueSize;
this.queueRejectionThreshold = queueRejectionThreshold;
this.keepAliveTimeInMinutes = keepAliveTimeInMinutes;
Expand All @@ -45,6 +47,7 @@ public static HystrixThreadPoolConfiguration sample(HystrixThreadPoolKey threadP
return new HystrixThreadPoolConfiguration(
threadPoolKey,
threadPoolProperties.coreSize().get(),
threadPoolProperties.maximumSize().get(),
threadPoolProperties.maxQueueSize().get(),
threadPoolProperties.queueSizeRejectionThreshold().get(),
threadPoolProperties.keepAliveTimeMinutes().get(),
Expand All @@ -60,6 +63,10 @@ public int getCoreSize() {
return coreSize;
}

public int getMaximumSize() {
return maximumSize;
}

public int getMaxQueueSize() {
return maxQueueSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ public HystrixCommandProperties getCommandProperties(HystrixCommandKey commandKe
@Override
public HystrixThreadPoolProperties getThreadPoolProperties(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter builder) {
if (builder == null) {
builder = HystrixThreadPoolProperties.Setter.getUnitTestPropertiesBuilder();
builder = HystrixThreadPoolPropertiesTest.getUnitTestPropertiesBuilder();
}
return HystrixThreadPoolProperties.Setter.asMock(builder);
return HystrixThreadPoolPropertiesTest.asMock(builder);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5335,8 +5335,8 @@ private static class TestThreadIsolationWithSemaphoreSetSmallCommand extends Tes
private TestThreadIsolationWithSemaphoreSetSmallCommand(TestCircuitBreaker circuitBreaker, int poolSize, Action0 action) {
super(testPropsBuilder().setCircuitBreaker(circuitBreaker).setMetrics(circuitBreaker.metrics)
.setThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(TestThreadIsolationWithSemaphoreSetSmallCommand.class.getSimpleName()))
.setThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter.getUnitTestPropertiesBuilder()
.withCoreSize(poolSize).withMaxQueueSize(0))
.setThreadPoolPropertiesDefaults(HystrixThreadPoolPropertiesTest.getUnitTestPropertiesBuilder()
.withCoreSize(poolSize).withMaximumSize(poolSize).withMaxQueueSize(0))
.setCommandPropertiesDefaults(HystrixCommandPropertiesTest.getUnitTestPropertiesSetter()
.withExecutionIsolationStrategy(ExecutionIsolationStrategy.THREAD)
.withExecutionIsolationSemaphoreMaxConcurrentRequests(1)));
Expand Down

0 comments on commit ae725f5

Please sign in to comment.