diff --git a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigurationJsonStream.java b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigurationJsonStream.java index 57e983dfb..faebbe453 100644 --- a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigurationJsonStream.java +++ b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigurationJsonStream.java @@ -120,6 +120,7 @@ private static void writeThreadPoolConfigJson(JsonGenerator json, HystrixThreadP json.writeNumberField("maxQueueSize", threadPoolConfig.getMaxQueueSize()); json.writeNumberField("queueRejectionThreshold", threadPoolConfig.getQueueRejectionThreshold()); json.writeNumberField("keepAliveTimeInMinutes", threadPoolConfig.getKeepAliveTimeInMinutes()); + json.writeBooleanField("allowMaximumSizeToDivergeFromCoreSize", threadPoolConfig.getAllowMaximumSizeToDivergeFromCoreSize()); json.writeNumberField("counterBucketSizeInMilliseconds", threadPoolConfig.getRollingCounterBucketSizeInMilliseconds()); json.writeNumberField("counterBucketCount", threadPoolConfig.getRollingCounterNumberOfBuckets()); json.writeEndObject(); diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixThreadPool.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixThreadPool.java index 1e0d399da..e812b46ee 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixThreadPool.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixThreadPool.java @@ -173,11 +173,18 @@ public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThrea HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy(); this.queueSize = properties.maxQueueSize().get(); this.queue = concurrencyStrategy.getBlockingQueue(queueSize); - this.metrics = HystrixThreadPoolMetrics.getInstance( - threadPoolKey, - concurrencyStrategy.getThreadPool(threadPoolKey, properties.coreSize(), properties.maximumSize(), properties.keepAliveTimeMinutes(), TimeUnit.MINUTES, queue), - properties); - this.threadPool = metrics.getThreadPool(); + + if (properties.getAllowMaximumSizeToDivergeFromCoreSize()) { + this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey, + concurrencyStrategy.getThreadPool(threadPoolKey, properties.coreSize(), properties.maximumSize(), properties.keepAliveTimeMinutes(), TimeUnit.MINUTES, queue), + properties); + this.threadPool = this.metrics.getThreadPool(); + } else { + this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey, + concurrencyStrategy.getThreadPool(threadPoolKey, properties.coreSize(), properties.coreSize(), properties.keepAliveTimeMinutes(), TimeUnit.MINUTES, queue), + properties); + this.threadPool = this.metrics.getThreadPool(); + } /* strategy: HystrixMetricsPublisherThreadPool */ HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties); @@ -210,16 +217,21 @@ public Scheduler getScheduler(Func0 shouldInterruptThread) { private void touchConfig() { final int dynamicCoreSize = properties.coreSize().get(); int dynamicMaximumSize = properties.maximumSize().get(); + final boolean allowSizesToDiverge = properties.getAllowMaximumSizeToDivergeFromCoreSize(); + boolean maxTooLow = false; - if (dynamicMaximumSize < dynamicCoreSize) { - logger.error("Hystrix ThreadPool configuration for : " + metrics.getThreadPoolKey().name() + " is using coreSize = " + - dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " + - dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value"); + if (allowSizesToDiverge && dynamicMaximumSize < dynamicCoreSize) { dynamicMaximumSize = dynamicCoreSize; + maxTooLow = true; } // In JDK 6, setCorePoolSize and setMaximumPoolSize will execute a lock operation. Avoid them if the pool size is not changed. - if (threadPool.getCorePoolSize() != dynamicCoreSize || threadPool.getMaximumPoolSize() != dynamicMaximumSize) { + if (threadPool.getCorePoolSize() != dynamicCoreSize || (allowSizesToDiverge && threadPool.getMaximumPoolSize() != dynamicMaximumSize)) { + if (maxTooLow) { + logger.error("Hystrix ThreadPool configuration for : " + metrics.getThreadPoolKey().name() + " is trying to set coreSize = " + + dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " + + dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value"); + } threadPool.setCorePoolSize(dynamicCoreSize); threadPool.setMaximumPoolSize(dynamicMaximumSize); } diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixThreadPoolProperties.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixThreadPoolProperties.java index 3a054033b..369564f93 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixThreadPoolProperties.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixThreadPoolProperties.java @@ -15,6 +15,7 @@ */ package com.netflix.hystrix; +import static com.netflix.hystrix.strategy.properties.HystrixPropertiesChainedProperty.forBoolean; import static com.netflix.hystrix.strategy.properties.HystrixPropertiesChainedProperty.forInteger; import java.util.concurrent.BlockingQueue; @@ -44,13 +45,15 @@ */ public abstract class HystrixThreadPoolProperties { - - /* defaults */ - static int default_coreSize = 10; // core size of thread pool + static int default_coreSize = 10; // core size of thread pool + static int default_maximumSize = 10; // maximum size of thread pool static int default_keepAliveTimeMinutes = 1; // minutes to keep a thread alive - static int default_maxQueueSize = -1; // size of queue (this can't be dynamically changed so we use 'queueSizeRejectionThreshold' to artificially limit and reject) - // -1 turns if off and makes us use SynchronousQueue + static int default_maxQueueSize = -1; // size of queue (this can't be dynamically changed so we use 'queueSizeRejectionThreshold' to artificially limit and reject) + // -1 turns if off and makes us use SynchronousQueue + static boolean default_allow_maximum_size_to_diverge_from_core_size = false; //should the maximumSize config value get read and used in configuring the threadPool + //turning this on should be a conscious decision by the user, so we default it to false + static int default_queueSizeRejectionThreshold = 5; // number of items in queue static int default_threadPoolRollingNumberStatisticalWindow = 10000; // milliseconds for rolling number static int default_threadPoolRollingNumberStatisticalWindowBuckets = 10; // number of buckets in rolling number (10 1-second buckets) @@ -60,6 +63,8 @@ public abstract class HystrixThreadPoolProperties { private final HystrixProperty keepAliveTime; private final HystrixProperty maxQueueSize; private final HystrixProperty queueSizeRejectionThreshold; + private final boolean allowMaximumSizeToDivergeFromCoreSize; + private final HystrixProperty threadPoolRollingNumberStatisticalWindowInMilliseconds; private final HystrixProperty threadPoolRollingNumberStatisticalWindowBuckets; @@ -72,12 +77,13 @@ protected HystrixThreadPoolProperties(HystrixThreadPoolKey key, Setter builder) } protected HystrixThreadPoolProperties(HystrixThreadPoolKey key, Setter builder, String propertyPrefix) { - //we allow maximum pool size to be configured lower than core size here - //however, at runtime, if this configuration gets applied, we will always ensure that maximumSize >= coreSize - this.corePoolSize = getProperty(propertyPrefix, key, "coreSize", builder.getCoreSize(), default_coreSize); + this.allowMaximumSizeToDivergeFromCoreSize = getValueOnce(propertyPrefix, key, "allowMaximumSizeToDivergeFromCoreSize", + builder.getAllowMaximumSizeToDivergeFromCoreSize(), default_allow_maximum_size_to_diverge_from_core_size); - //if left unset, maxiumumSize will default to coreSize - this.maximumPoolSize = getProperty(propertyPrefix, key, "maximumSize", builder.getMaximumSize(), corePoolSize.get()); + this.corePoolSize = getProperty(propertyPrefix, key, "coreSize", builder.getCoreSize(), default_coreSize); + //this object always contains a reference to the configuration value for the maximumSize of the threadpool + //it only gets applied if .threadpool + this.maximumPoolSize = getProperty(propertyPrefix, key, "maximumSize", builder.getMaximumSize(), default_maximumSize); this.keepAliveTime = getProperty(propertyPrefix, key, "keepAliveTimeMinutes", builder.getKeepAliveTimeMinutes(), default_keepAliveTimeMinutes); this.maxQueueSize = getProperty(propertyPrefix, key, "maxQueueSize", builder.getMaxQueueSize(), default_maxQueueSize); @@ -93,6 +99,14 @@ private static HystrixProperty getProperty(String propertyPrefix, Hystr .build(); } + private static boolean getValueOnce(String propertyPrefix, HystrixThreadPoolKey key, String instanceProperty, boolean builderOverrideValue, boolean defaultValue) { + return forBoolean() + .add(propertyPrefix + ".threadpool." + key.name() + "." + instanceProperty, builderOverrideValue) + .add(propertyPrefix + ".threadpool.default." + instanceProperty, defaultValue) + .build() + .get(); + } + /** * Core thread-pool size that gets passed to {@link ThreadPoolExecutor#setCorePoolSize(int)} * @@ -144,6 +158,10 @@ public HystrixProperty queueSizeRejectionThreshold() { return queueSizeRejectionThreshold; } + public boolean getAllowMaximumSizeToDivergeFromCoreSize() { + return allowMaximumSizeToDivergeFromCoreSize; + } + /** * Duration of statistical rolling window in milliseconds. This is passed into {@link HystrixRollingNumber} inside each {@link HystrixThreadPoolMetrics} instance. * @@ -200,6 +218,7 @@ public static class Setter { private Integer keepAliveTimeMinutes = null; private Integer maxQueueSize = null; private Integer queueSizeRejectionThreshold = null; + private boolean allowMaximumSizeToDivergeFromCoreSize = false; private Integer rollingStatisticalWindowInMilliseconds = null; private Integer rollingStatisticalWindowBuckets = null; @@ -226,6 +245,10 @@ public Integer getQueueSizeRejectionThreshold() { return queueSizeRejectionThreshold; } + public boolean getAllowMaximumSizeToDivergeFromCoreSize() { + return allowMaximumSizeToDivergeFromCoreSize; + } + public Integer getMetricsRollingStatisticalWindowInMilliseconds() { return rollingStatisticalWindowInMilliseconds; } @@ -259,6 +282,11 @@ public Setter withQueueSizeRejectionThreshold(int value) { return this; } + public Setter withAllowMaximumSizeToDivergeFromCoreSize(boolean value) { + this.allowMaximumSizeToDivergeFromCoreSize = value; + return this; + } + public Setter withMetricsRollingStatisticalWindowInMilliseconds(int value) { this.rollingStatisticalWindowInMilliseconds = value; return this; diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixThreadPoolConfiguration.java b/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixThreadPoolConfiguration.java index ca19ed2a2..19c8c3996 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixThreadPoolConfiguration.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixThreadPoolConfiguration.java @@ -27,11 +27,12 @@ public class HystrixThreadPoolConfiguration { private final int maxQueueSize; private final int queueRejectionThreshold; private final int keepAliveTimeInMinutes; + private final boolean allowMaximumSizeToDivergeFromCoreSize; private final int rollingCounterNumberOfBuckets; private final int rollingCounterBucketSizeInMilliseconds; public HystrixThreadPoolConfiguration(HystrixThreadPoolKey threadPoolKey, int coreSize, int maximumSize, int maxQueueSize, int queueRejectionThreshold, - int keepAliveTimeInMinutes, int rollingCounterNumberOfBuckets, + int keepAliveTimeInMinutes, boolean allowMaximumSizeToDivergeFromCoreSize, int rollingCounterNumberOfBuckets, int rollingCounterBucketSizeInMilliseconds) { this.threadPoolKey = threadPoolKey; this.coreSize = coreSize; @@ -39,6 +40,7 @@ public HystrixThreadPoolConfiguration(HystrixThreadPoolKey threadPoolKey, int co this.maxQueueSize = maxQueueSize; this.queueRejectionThreshold = queueRejectionThreshold; this.keepAliveTimeInMinutes = keepAliveTimeInMinutes; + this.allowMaximumSizeToDivergeFromCoreSize = allowMaximumSizeToDivergeFromCoreSize; this.rollingCounterNumberOfBuckets = rollingCounterNumberOfBuckets; this.rollingCounterBucketSizeInMilliseconds = rollingCounterBucketSizeInMilliseconds; } @@ -51,6 +53,7 @@ public static HystrixThreadPoolConfiguration sample(HystrixThreadPoolKey threadP threadPoolProperties.maxQueueSize().get(), threadPoolProperties.queueSizeRejectionThreshold().get(), threadPoolProperties.keepAliveTimeMinutes().get(), + threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize(), threadPoolProperties.metricsRollingStatisticalWindowBuckets().get(), threadPoolProperties.metricsRollingStatisticalWindowInMilliseconds().get()); } @@ -64,7 +67,11 @@ public int getCoreSize() { } public int getMaximumSize() { - return maximumSize; + if (allowMaximumSizeToDivergeFromCoreSize) { + return maximumSize; + } else { + return coreSize; + } } public int getMaxQueueSize() { @@ -79,6 +86,10 @@ public int getKeepAliveTimeInMinutes() { return keepAliveTimeInMinutes; } + public boolean getAllowMaximumSizeToDivergeFromCoreSize() { + return allowMaximumSizeToDivergeFromCoreSize; + } + public int getRollingCounterNumberOfBuckets() { return rollingCounterNumberOfBuckets; } diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixConcurrencyStrategy.java b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixConcurrencyStrategy.java index 257559b40..3405cec68 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixConcurrencyStrategy.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixConcurrencyStrategy.java @@ -21,6 +21,8 @@ import com.netflix.hystrix.strategy.HystrixPlugins; import com.netflix.hystrix.strategy.properties.HystrixProperty; import com.netflix.hystrix.util.PlatformSpecific; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; @@ -46,6 +48,8 @@ */ public abstract class HystrixConcurrencyStrategy { + private final static Logger logger = LoggerFactory.getLogger(HystrixConcurrencyStrategy.class); + /** * Factory method to provide {@link ThreadPoolExecutor} instances as desired. *

@@ -88,7 +92,17 @@ public Thread newThread(Runnable r) { threadFactory = PlatformSpecific.getAppEngineThreadFactory(); } - return new ThreadPoolExecutor(corePoolSize.get(), maximumPoolSize.get(), keepAliveTime.get(), unit, workQueue, threadFactory); + final int dynamicCoreSize = corePoolSize.get(); + final int dynamicMaximumSize = maximumPoolSize.get(); + + if (dynamicCoreSize > dynamicMaximumSize) { + logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " + + dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " + + dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value"); + return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime.get(), unit, workQueue, threadFactory); + } else { + return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime.get(), unit, workQueue, threadFactory); + } } /** diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixThreadPoolPropertiesTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixThreadPoolPropertiesTest.java index 5fc6dc63a..912caf4b2 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixThreadPoolPropertiesTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixThreadPoolPropertiesTest.java @@ -104,7 +104,7 @@ public void testSetNeitherCoreNorMaximumSize() { }; assertEquals(HystrixThreadPoolProperties.default_coreSize, properties.coreSize().get().intValue()); - assertEquals(HystrixThreadPoolProperties.default_coreSize, properties.maximumSize().get().intValue()); + assertEquals(HystrixThreadPoolProperties.default_maximumSize, properties.maximumSize().get().intValue()); } @Test @@ -115,7 +115,7 @@ public void testSetCoreSizeOnly() { }; assertEquals(14, properties.coreSize().get().intValue()); - assertEquals(14, properties.maximumSize().get().intValue()); + assertEquals(HystrixThreadPoolProperties.default_maximumSize, properties.maximumSize().get().intValue()); } @Test @@ -129,17 +129,6 @@ public void testSetMaximumSizeOnlyLowerThanDefaultCoreSize() { assertEquals(3, properties.maximumSize().get().intValue()); } - @Test - public void testSetMaximumSizeOnlyEqualToDefaultCoreSize() { - HystrixThreadPoolProperties properties = new HystrixThreadPoolProperties(TestThreadPoolKey.TEST, - HystrixThreadPoolProperties.Setter().withMaximumSize(HystrixThreadPoolProperties.default_coreSize)) { - - }; - - assertEquals(HystrixThreadPoolProperties.default_coreSize, properties.coreSize().get().intValue()); - assertEquals(HystrixThreadPoolProperties.default_coreSize, properties.maximumSize().get().intValue()); - } - @Test public void testSetMaximumSizeOnlyGreaterThanDefaultCoreSize() { HystrixThreadPoolProperties properties = new HystrixThreadPoolProperties(TestThreadPoolKey.TEST, diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixThreadPoolTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixThreadPoolTest.java index c8cf60455..faf22f9be 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixThreadPoolTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixThreadPoolTest.java @@ -122,6 +122,7 @@ public HystrixMetricsPublisherThreadPool getMetricsPublisherForThreadPool(Hystri //Now the HystrixThreadPool ALWAYS has the same reference to the ThreadPoolExecutor so that it no longer matters which //wins to be inserted into the HystrixThreadPool.Factory.threadPools cache. } + @Test(timeout = 2500) public void testUnsubscribeHystrixThreadPool() throws InterruptedException { // methods are package-private so can't test it somewhere else diff --git a/hystrix-serialization/src/main/java/com/netflix/hystrix/serial/SerialHystrixConfiguration.java b/hystrix-serialization/src/main/java/com/netflix/hystrix/serial/SerialHystrixConfiguration.java index 162bfc5e1..03c8dd82b 100644 --- a/hystrix-serialization/src/main/java/com/netflix/hystrix/serial/SerialHystrixConfiguration.java +++ b/hystrix-serialization/src/main/java/com/netflix/hystrix/serial/SerialHystrixConfiguration.java @@ -181,6 +181,7 @@ public static HystrixConfiguration fromByteBuffer(ByteBuffer bb) { threadPool.getValue().path("maxQueueSize").asInt(), threadPool.getValue().path("queueRejectionThreshold").asInt(), threadPool.getValue().path("keepAliveTimeInMinutes").asInt(), + threadPool.getValue().path("allowMaximumSizeToDivergeFromCoreSize").asBoolean(), threadPool.getValue().path("counterBucketCount").asInt(), threadPool.getValue().path("counterBucketSizeInMilliseconds").asInt() ); @@ -257,6 +258,7 @@ private static void writeThreadPoolConfigJson(JsonGenerator json, HystrixThreadP json.writeNumberField("maxQueueSize", threadPoolConfig.getMaxQueueSize()); json.writeNumberField("queueRejectionThreshold", threadPoolConfig.getQueueRejectionThreshold()); json.writeNumberField("keepAliveTimeInMinutes", threadPoolConfig.getKeepAliveTimeInMinutes()); + json.writeBooleanField("allowMaximumSizeToDivergeFromCoreSize", threadPoolConfig.getAllowMaximumSizeToDivergeFromCoreSize()); json.writeNumberField("counterBucketSizeInMilliseconds", threadPoolConfig.getRollingCounterBucketSizeInMilliseconds()); json.writeNumberField("counterBucketCount", threadPoolConfig.getRollingCounterNumberOfBuckets()); json.writeEndObject();