Skip to content

Commit

Permalink
Merge pull request #1399 from mattrjacobs/add-property-to-control-max…
Browse files Browse the repository at this point in the history
…imum-thread-pool-size

Add property to control maximum thread pool size
  • Loading branch information
mattrjacobs committed Oct 20, 2016
2 parents 4e08f1a + f68bd2f commit 8b5a075
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 36 deletions.
Expand Up @@ -120,6 +120,7 @@ private static void writeThreadPoolConfigJson(JsonGenerator json, HystrixThreadP
json.writeNumberField("maxQueueSize", threadPoolConfig.getMaxQueueSize());
json.writeNumberField("queueRejectionThreshold", threadPoolConfig.getQueueRejectionThreshold());
json.writeNumberField("keepAliveTimeInMinutes", threadPoolConfig.getKeepAliveTimeInMinutes());
json.writeBooleanField("allowMaximumSizeToDivergeFromCoreSize", threadPoolConfig.getAllowMaximumSizeToDivergeFromCoreSize());
json.writeNumberField("counterBucketSizeInMilliseconds", threadPoolConfig.getRollingCounterBucketSizeInMilliseconds());
json.writeNumberField("counterBucketCount", threadPoolConfig.getRollingCounterNumberOfBuckets());
json.writeEndObject();
Expand Down
Expand Up @@ -173,11 +173,18 @@ public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThrea
HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
this.queueSize = properties.maxQueueSize().get();
this.queue = concurrencyStrategy.getBlockingQueue(queueSize);
this.metrics = HystrixThreadPoolMetrics.getInstance(
threadPoolKey,
concurrencyStrategy.getThreadPool(threadPoolKey, properties.coreSize(), properties.maximumSize(), properties.keepAliveTimeMinutes(), TimeUnit.MINUTES, queue),
properties);
this.threadPool = metrics.getThreadPool();

if (properties.getAllowMaximumSizeToDivergeFromCoreSize()) {
this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
concurrencyStrategy.getThreadPool(threadPoolKey, properties.coreSize(), properties.maximumSize(), properties.keepAliveTimeMinutes(), TimeUnit.MINUTES, queue),
properties);
this.threadPool = this.metrics.getThreadPool();
} else {
this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
concurrencyStrategy.getThreadPool(threadPoolKey, properties.coreSize(), properties.coreSize(), properties.keepAliveTimeMinutes(), TimeUnit.MINUTES, queue),
properties);
this.threadPool = this.metrics.getThreadPool();
}

/* strategy: HystrixMetricsPublisherThreadPool */
HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
Expand Down Expand Up @@ -210,16 +217,21 @@ public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
private void touchConfig() {
final int dynamicCoreSize = properties.coreSize().get();
int dynamicMaximumSize = properties.maximumSize().get();
final boolean allowSizesToDiverge = properties.getAllowMaximumSizeToDivergeFromCoreSize();
boolean maxTooLow = false;

if (dynamicMaximumSize < dynamicCoreSize) {
logger.error("Hystrix ThreadPool configuration for : " + metrics.getThreadPoolKey().name() + " is using coreSize = " +
dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " +
dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
if (allowSizesToDiverge && dynamicMaximumSize < dynamicCoreSize) {
dynamicMaximumSize = dynamicCoreSize;
maxTooLow = true;
}

// In JDK 6, setCorePoolSize and setMaximumPoolSize will execute a lock operation. Avoid them if the pool size is not changed.
if (threadPool.getCorePoolSize() != dynamicCoreSize || threadPool.getMaximumPoolSize() != dynamicMaximumSize) {
if (threadPool.getCorePoolSize() != dynamicCoreSize || (allowSizesToDiverge && threadPool.getMaximumPoolSize() != dynamicMaximumSize)) {
if (maxTooLow) {
logger.error("Hystrix ThreadPool configuration for : " + metrics.getThreadPoolKey().name() + " is trying to set coreSize = " +
dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " +
dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
}
threadPool.setCorePoolSize(dynamicCoreSize);
threadPool.setMaximumPoolSize(dynamicMaximumSize);
}
Expand Down
Expand Up @@ -15,6 +15,7 @@
*/
package com.netflix.hystrix;

import static com.netflix.hystrix.strategy.properties.HystrixPropertiesChainedProperty.forBoolean;
import static com.netflix.hystrix.strategy.properties.HystrixPropertiesChainedProperty.forInteger;

import java.util.concurrent.BlockingQueue;
Expand Down Expand Up @@ -44,13 +45,15 @@
*/
public abstract class HystrixThreadPoolProperties {



/* defaults */
static int default_coreSize = 10; // core size of thread pool
static int default_coreSize = 10; // core size of thread pool
static int default_maximumSize = 10; // maximum size of thread pool
static int default_keepAliveTimeMinutes = 1; // minutes to keep a thread alive
static int default_maxQueueSize = -1; // size of queue (this can't be dynamically changed so we use 'queueSizeRejectionThreshold' to artificially limit and reject)
// -1 turns if off and makes us use SynchronousQueue
static int default_maxQueueSize = -1; // size of queue (this can't be dynamically changed so we use 'queueSizeRejectionThreshold' to artificially limit and reject)
// -1 turns if off and makes us use SynchronousQueue
static boolean default_allow_maximum_size_to_diverge_from_core_size = false; //should the maximumSize config value get read and used in configuring the threadPool
//turning this on should be a conscious decision by the user, so we default it to false

static int default_queueSizeRejectionThreshold = 5; // number of items in queue
static int default_threadPoolRollingNumberStatisticalWindow = 10000; // milliseconds for rolling number
static int default_threadPoolRollingNumberStatisticalWindowBuckets = 10; // number of buckets in rolling number (10 1-second buckets)
Expand All @@ -60,6 +63,8 @@ public abstract class HystrixThreadPoolProperties {
private final HystrixProperty<Integer> keepAliveTime;
private final HystrixProperty<Integer> maxQueueSize;
private final HystrixProperty<Integer> queueSizeRejectionThreshold;
private final boolean allowMaximumSizeToDivergeFromCoreSize;

private final HystrixProperty<Integer> threadPoolRollingNumberStatisticalWindowInMilliseconds;
private final HystrixProperty<Integer> threadPoolRollingNumberStatisticalWindowBuckets;

Expand All @@ -72,12 +77,13 @@ protected HystrixThreadPoolProperties(HystrixThreadPoolKey key, Setter builder)
}

protected HystrixThreadPoolProperties(HystrixThreadPoolKey key, Setter builder, String propertyPrefix) {
//we allow maximum pool size to be configured lower than core size here
//however, at runtime, if this configuration gets applied, we will always ensure that maximumSize >= coreSize
this.corePoolSize = getProperty(propertyPrefix, key, "coreSize", builder.getCoreSize(), default_coreSize);
this.allowMaximumSizeToDivergeFromCoreSize = getValueOnce(propertyPrefix, key, "allowMaximumSizeToDivergeFromCoreSize",
builder.getAllowMaximumSizeToDivergeFromCoreSize(), default_allow_maximum_size_to_diverge_from_core_size);

//if left unset, maxiumumSize will default to coreSize
this.maximumPoolSize = getProperty(propertyPrefix, key, "maximumSize", builder.getMaximumSize(), corePoolSize.get());
this.corePoolSize = getProperty(propertyPrefix, key, "coreSize", builder.getCoreSize(), default_coreSize);
//this object always contains a reference to the configuration value for the maximumSize of the threadpool
//it only gets applied if .threadpool
this.maximumPoolSize = getProperty(propertyPrefix, key, "maximumSize", builder.getMaximumSize(), default_maximumSize);

this.keepAliveTime = getProperty(propertyPrefix, key, "keepAliveTimeMinutes", builder.getKeepAliveTimeMinutes(), default_keepAliveTimeMinutes);
this.maxQueueSize = getProperty(propertyPrefix, key, "maxQueueSize", builder.getMaxQueueSize(), default_maxQueueSize);
Expand All @@ -93,6 +99,14 @@ private static HystrixProperty<Integer> getProperty(String propertyPrefix, Hystr
.build();
}

private static boolean getValueOnce(String propertyPrefix, HystrixThreadPoolKey key, String instanceProperty, boolean builderOverrideValue, boolean defaultValue) {
return forBoolean()
.add(propertyPrefix + ".threadpool." + key.name() + "." + instanceProperty, builderOverrideValue)
.add(propertyPrefix + ".threadpool.default." + instanceProperty, defaultValue)
.build()
.get();
}

/**
* Core thread-pool size that gets passed to {@link ThreadPoolExecutor#setCorePoolSize(int)}
*
Expand Down Expand Up @@ -144,6 +158,10 @@ public HystrixProperty<Integer> queueSizeRejectionThreshold() {
return queueSizeRejectionThreshold;
}

public boolean getAllowMaximumSizeToDivergeFromCoreSize() {
return allowMaximumSizeToDivergeFromCoreSize;
}

/**
* Duration of statistical rolling window in milliseconds. This is passed into {@link HystrixRollingNumber} inside each {@link HystrixThreadPoolMetrics} instance.
*
Expand Down Expand Up @@ -200,6 +218,7 @@ public static class Setter {
private Integer keepAliveTimeMinutes = null;
private Integer maxQueueSize = null;
private Integer queueSizeRejectionThreshold = null;
private boolean allowMaximumSizeToDivergeFromCoreSize = false;
private Integer rollingStatisticalWindowInMilliseconds = null;
private Integer rollingStatisticalWindowBuckets = null;

Expand All @@ -226,6 +245,10 @@ public Integer getQueueSizeRejectionThreshold() {
return queueSizeRejectionThreshold;
}

public boolean getAllowMaximumSizeToDivergeFromCoreSize() {
return allowMaximumSizeToDivergeFromCoreSize;
}

public Integer getMetricsRollingStatisticalWindowInMilliseconds() {
return rollingStatisticalWindowInMilliseconds;
}
Expand Down Expand Up @@ -259,6 +282,11 @@ public Setter withQueueSizeRejectionThreshold(int value) {
return this;
}

public Setter withAllowMaximumSizeToDivergeFromCoreSize(boolean value) {
this.allowMaximumSizeToDivergeFromCoreSize = value;
return this;
}

public Setter withMetricsRollingStatisticalWindowInMilliseconds(int value) {
this.rollingStatisticalWindowInMilliseconds = value;
return this;
Expand Down
Expand Up @@ -27,18 +27,20 @@ public class HystrixThreadPoolConfiguration {
private final int maxQueueSize;
private final int queueRejectionThreshold;
private final int keepAliveTimeInMinutes;
private final boolean allowMaximumSizeToDivergeFromCoreSize;
private final int rollingCounterNumberOfBuckets;
private final int rollingCounterBucketSizeInMilliseconds;

public HystrixThreadPoolConfiguration(HystrixThreadPoolKey threadPoolKey, int coreSize, int maximumSize, int maxQueueSize, int queueRejectionThreshold,
int keepAliveTimeInMinutes, int rollingCounterNumberOfBuckets,
int keepAliveTimeInMinutes, boolean allowMaximumSizeToDivergeFromCoreSize, int rollingCounterNumberOfBuckets,
int rollingCounterBucketSizeInMilliseconds) {
this.threadPoolKey = threadPoolKey;
this.coreSize = coreSize;
this.maximumSize = maximumSize;
this.maxQueueSize = maxQueueSize;
this.queueRejectionThreshold = queueRejectionThreshold;
this.keepAliveTimeInMinutes = keepAliveTimeInMinutes;
this.allowMaximumSizeToDivergeFromCoreSize = allowMaximumSizeToDivergeFromCoreSize;
this.rollingCounterNumberOfBuckets = rollingCounterNumberOfBuckets;
this.rollingCounterBucketSizeInMilliseconds = rollingCounterBucketSizeInMilliseconds;
}
Expand All @@ -51,6 +53,7 @@ public static HystrixThreadPoolConfiguration sample(HystrixThreadPoolKey threadP
threadPoolProperties.maxQueueSize().get(),
threadPoolProperties.queueSizeRejectionThreshold().get(),
threadPoolProperties.keepAliveTimeMinutes().get(),
threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize(),
threadPoolProperties.metricsRollingStatisticalWindowBuckets().get(),
threadPoolProperties.metricsRollingStatisticalWindowInMilliseconds().get());
}
Expand All @@ -64,7 +67,11 @@ public int getCoreSize() {
}

public int getMaximumSize() {
return maximumSize;
if (allowMaximumSizeToDivergeFromCoreSize) {
return maximumSize;
} else {
return coreSize;
}
}

public int getMaxQueueSize() {
Expand All @@ -79,6 +86,10 @@ public int getKeepAliveTimeInMinutes() {
return keepAliveTimeInMinutes;
}

public boolean getAllowMaximumSizeToDivergeFromCoreSize() {
return allowMaximumSizeToDivergeFromCoreSize;
}

public int getRollingCounterNumberOfBuckets() {
return rollingCounterNumberOfBuckets;
}
Expand Down
Expand Up @@ -21,6 +21,8 @@
import com.netflix.hystrix.strategy.HystrixPlugins;
import com.netflix.hystrix.strategy.properties.HystrixProperty;
import com.netflix.hystrix.util.PlatformSpecific;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
Expand All @@ -46,6 +48,8 @@
*/
public abstract class HystrixConcurrencyStrategy {

private final static Logger logger = LoggerFactory.getLogger(HystrixConcurrencyStrategy.class);

/**
* Factory method to provide {@link ThreadPoolExecutor} instances as desired.
* <p>
Expand Down Expand Up @@ -88,7 +92,17 @@ public Thread newThread(Runnable r) {
threadFactory = PlatformSpecific.getAppEngineThreadFactory();
}

return new ThreadPoolExecutor(corePoolSize.get(), maximumPoolSize.get(), keepAliveTime.get(), unit, workQueue, threadFactory);
final int dynamicCoreSize = corePoolSize.get();
final int dynamicMaximumSize = maximumPoolSize.get();

if (dynamicCoreSize > dynamicMaximumSize) {
logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " +
dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime.get(), unit, workQueue, threadFactory);
} else {
return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime.get(), unit, workQueue, threadFactory);
}
}

/**
Expand Down
Expand Up @@ -104,7 +104,7 @@ public void testSetNeitherCoreNorMaximumSize() {
};

assertEquals(HystrixThreadPoolProperties.default_coreSize, properties.coreSize().get().intValue());
assertEquals(HystrixThreadPoolProperties.default_coreSize, properties.maximumSize().get().intValue());
assertEquals(HystrixThreadPoolProperties.default_maximumSize, properties.maximumSize().get().intValue());
}

@Test
Expand All @@ -115,7 +115,7 @@ public void testSetCoreSizeOnly() {
};

assertEquals(14, properties.coreSize().get().intValue());
assertEquals(14, properties.maximumSize().get().intValue());
assertEquals(HystrixThreadPoolProperties.default_maximumSize, properties.maximumSize().get().intValue());
}

@Test
Expand All @@ -129,17 +129,6 @@ public void testSetMaximumSizeOnlyLowerThanDefaultCoreSize() {
assertEquals(3, properties.maximumSize().get().intValue());
}

@Test
public void testSetMaximumSizeOnlyEqualToDefaultCoreSize() {
HystrixThreadPoolProperties properties = new HystrixThreadPoolProperties(TestThreadPoolKey.TEST,
HystrixThreadPoolProperties.Setter().withMaximumSize(HystrixThreadPoolProperties.default_coreSize)) {

};

assertEquals(HystrixThreadPoolProperties.default_coreSize, properties.coreSize().get().intValue());
assertEquals(HystrixThreadPoolProperties.default_coreSize, properties.maximumSize().get().intValue());
}

@Test
public void testSetMaximumSizeOnlyGreaterThanDefaultCoreSize() {
HystrixThreadPoolProperties properties = new HystrixThreadPoolProperties(TestThreadPoolKey.TEST,
Expand Down
Expand Up @@ -122,6 +122,7 @@ public HystrixMetricsPublisherThreadPool getMetricsPublisherForThreadPool(Hystri
//Now the HystrixThreadPool ALWAYS has the same reference to the ThreadPoolExecutor so that it no longer matters which
//wins to be inserted into the HystrixThreadPool.Factory.threadPools cache.
}

@Test(timeout = 2500)
public void testUnsubscribeHystrixThreadPool() throws InterruptedException {
// methods are package-private so can't test it somewhere else
Expand Down
Expand Up @@ -181,6 +181,7 @@ public static HystrixConfiguration fromByteBuffer(ByteBuffer bb) {
threadPool.getValue().path("maxQueueSize").asInt(),
threadPool.getValue().path("queueRejectionThreshold").asInt(),
threadPool.getValue().path("keepAliveTimeInMinutes").asInt(),
threadPool.getValue().path("allowMaximumSizeToDivergeFromCoreSize").asBoolean(),
threadPool.getValue().path("counterBucketCount").asInt(),
threadPool.getValue().path("counterBucketSizeInMilliseconds").asInt()
);
Expand Down Expand Up @@ -257,6 +258,7 @@ private static void writeThreadPoolConfigJson(JsonGenerator json, HystrixThreadP
json.writeNumberField("maxQueueSize", threadPoolConfig.getMaxQueueSize());
json.writeNumberField("queueRejectionThreshold", threadPoolConfig.getQueueRejectionThreshold());
json.writeNumberField("keepAliveTimeInMinutes", threadPoolConfig.getKeepAliveTimeInMinutes());
json.writeBooleanField("allowMaximumSizeToDivergeFromCoreSize", threadPoolConfig.getAllowMaximumSizeToDivergeFromCoreSize());
json.writeNumberField("counterBucketSizeInMilliseconds", threadPoolConfig.getRollingCounterBucketSizeInMilliseconds());
json.writeNumberField("counterBucketCount", threadPoolConfig.getRollingCounterNumberOfBuckets());
json.writeEndObject();
Expand Down

0 comments on commit 8b5a075

Please sign in to comment.