From 4c7cce27e4f31316b9da8d55afaead94440432c8 Mon Sep 17 00:00:00 2001 From: Bernd Gutjahr Date: Fri, 15 Apr 2016 08:20:05 +0200 Subject: [PATCH 1/3] ARTEMIS-485 Allow configuring an unbounded cached global client thread pool Adapted code to handle -1 correctly to configure an unbounded thread pool. In addition, I removed the capability to reconfigure the max pool size of existing thread pools, because the global thread pool can either be an unbounded cached pool, or a bounded fixed size pool. These 2 kinds of pool also differ in the used blocking queue, therefore cannot be converted into each other. --- .../api/core/client/ActiveMQClient.java | 32 +++++++------------ 1 file changed, 11 insertions(+), 21 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java index b4c83f1b5a0..1674a53a716 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java @@ -242,48 +242,38 @@ public ThreadFactory run() { /** - * (Re)Initializes the global thread pools properties from System properties. This method will update the global - * thread pool configuration based on defined System properties (or defaults if they are not set) notifying - * all globalThreadPoolListeners. The System properties key names are as follow: + * Initializes the global thread pools properties from System properties. This method will update the global + * thread pool configuration based on defined System properties (or defaults if they are not set). + * The System properties key names are as follow: * * ActiveMQClient.THREAD_POOL_MAX_SIZE_PROPERTY_KEY="activemq.artemis.client.global.thread.pool.max.size" * ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY="activemq.artemis.client.global.scheduled.thread.pool.core.size * - * The min value for max thread pool size is 2. Providing a value lower than 2 will be ignored and will defaul to 2. + * The min value for max thread pool size is 2. If the value is not -1, but lower than 2, it will be ignored and will default to 2. + * A value of -1 configures an unbounded thread pool. * - * Note. The ServerLocatorImpl registers a listener and uses it to configure it's global thread pools. If global - * thread pools have already been created, they will be updated with these new values. + * Note: If global thread pools have already been created, they will not be updated with these new values. */ public static void initializeGlobalThreadPoolProperties() { - setGlobalThreadPoolProperties(Integer.valueOf(Integer.valueOf(System.getProperty(ActiveMQClient.THREAD_POOL_MAX_SIZE_PROPERTY_KEY, "" + ActiveMQClient.DEFAULT_GLOBAL_THREAD_POOL_MAX_SIZE))), Integer.valueOf(System.getProperty(ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY, "" + ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE))); + setGlobalThreadPoolProperties(Integer.valueOf(System.getProperty(ActiveMQClient.THREAD_POOL_MAX_SIZE_PROPERTY_KEY, "" + ActiveMQClient.DEFAULT_GLOBAL_THREAD_POOL_MAX_SIZE)), Integer.valueOf(System.getProperty(ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY, "" + ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE))); } /** * Allows programatically configuration of global thread pools properties. This method will update the global * thread pool configuration based on the provided values notifying all globalThreadPoolListeners. * - * Note. The ServerLocatorImpl registers a listener and uses it to configure it's global thread pools. If global - * thread pools have already been created, they will be updated with these new values. + * Note: If global thread pools have already been created, they will not be updated with these new values. * - * The min value for max thread pool size is 2. Providing a value lower than 2 will be ignored and will default to 2. + * The min value for globalThreadMaxPoolSize is 2. If the value is not -1, but lower than 2, it will be ignored and will default to 2. + * A value of -1 configures an unbounded thread pool. */ public static void setGlobalThreadPoolProperties(int globalThreadMaxPoolSize, int globalScheduledThreadPoolSize) { - if (globalThreadMaxPoolSize < 2) globalThreadMaxPoolSize = 2; + if (globalThreadMaxPoolSize < 2 && globalThreadMaxPoolSize != -1) globalThreadMaxPoolSize = 2; ActiveMQClient.globalScheduledThreadPoolSize = globalScheduledThreadPoolSize; ActiveMQClient.globalThreadMaxPoolSize = globalThreadMaxPoolSize; - - // if injected, we won't do anything with the pool as they're not ours - if (!injectedPools) { - // Right now I'm ignoring the corePool size on purpose as there's no way to have two values for the number of threads - // this is basically a fixed size thread pool (although the pool grows on demand) - getGlobalThreadPool().setCorePoolSize(globalThreadMaxPoolSize); - getGlobalThreadPool().setMaximumPoolSize(globalThreadMaxPoolSize); - - getGlobalScheduledThreadPool().setCorePoolSize(globalScheduledThreadPoolSize); - } } /** From daf27cdefe486fc1d3f8e103a10bea3f960db25b Mon Sep 17 00:00:00 2001 From: Bernd Gutjahr Date: Fri, 15 Apr 2016 09:14:30 +0200 Subject: [PATCH 2/3] Protected ActiveMQClient API against misuse. 1. Changed public fields in ActiveMQClient to private and added getters. Exposing fields for thread pool sized allow to modify them in undesired ways. I made these fields private and added corresponding getter methods. In addition, I renamed the field 'globalThreadMaxPoolSize' to 'globalThreadPoolSize' to be more consistent with the 'globalScheduledThreadPoolSize' field name. I also adapted some tests to always call clearThreadPools after the thread pool size configuration has been changed. 2. Protect against injecting null as thread pools ActiveMQClient.injectPools allowed null as injected thread pools. The effect was that internal threads pools were created, but not shutdown correctly. --- .../api/core/client/ActiveMQClient.java | 23 ++++++++++++------- .../artemis/ClientThreadPoolsTest.java | 8 +++++-- .../remoting/impl/invm/InVMConnector.java | 4 ++-- .../integration/client/CoreClientTest.java | 5 ++-- 4 files changed, 26 insertions(+), 14 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java index 1674a53a716..abc3a5d99da 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java @@ -46,9 +46,9 @@ */ public final class ActiveMQClient { - public static int globalThreadMaxPoolSize; + private static int globalThreadPoolSize; - public static int globalScheduledThreadPoolSize; + private static int globalScheduledThreadPoolSize; public static final String DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME = RoundRobinConnectionLoadBalancingPolicy.class.getCanonicalName(); @@ -195,13 +195,15 @@ public static synchronized void clearThreadPools(long time, TimeUnit unit) { } /** Warning: This method has to be called before any clients or servers is started on the JVM otherwise previous ServerLocator would be broken after this call. */ - public static synchronized void injectPools(ThreadPoolExecutor globalThreadPool, ScheduledThreadPoolExecutor scheduledThreadPoolExecutor) { + public static synchronized void injectPools(ThreadPoolExecutor globalThreadPool, ScheduledThreadPoolExecutor scheduledThreadPool) { + if (globalThreadPool == null || scheduledThreadPool == null) + throw new IllegalArgumentException("thread pools must not be null"); // We call clearThreadPools as that will shutdown any previously used executor clearThreadPools(); ActiveMQClient.globalThreadPool = globalThreadPool; - ActiveMQClient.globalScheduledThreadPool = scheduledThreadPoolExecutor; + ActiveMQClient.globalScheduledThreadPool = scheduledThreadPool; injectedPools = true; } @@ -214,11 +216,11 @@ public ThreadFactory run() { } }); - if (globalThreadMaxPoolSize == -1) { + if (globalThreadPoolSize == -1) { globalThreadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue(), factory); } else { - globalThreadPool = new ThreadPoolExecutor(ActiveMQClient.globalThreadMaxPoolSize, ActiveMQClient.globalThreadMaxPoolSize, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue(), factory); + globalThreadPool = new ThreadPoolExecutor(ActiveMQClient.globalThreadPoolSize, ActiveMQClient.globalThreadPoolSize, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue(), factory); } } return globalThreadPool; @@ -238,8 +240,13 @@ public ThreadFactory run() { return globalScheduledThreadPool; } + public static int getGlobalThreadPoolSize() { + return globalThreadPoolSize; + } - + public static int getGlobalScheduledThreadPoolSize() { + return globalScheduledThreadPoolSize; + } /** * Initializes the global thread pools properties from System properties. This method will update the global @@ -273,7 +280,7 @@ public static void setGlobalThreadPoolProperties(int globalThreadMaxPoolSize, in if (globalThreadMaxPoolSize < 2 && globalThreadMaxPoolSize != -1) globalThreadMaxPoolSize = 2; ActiveMQClient.globalScheduledThreadPoolSize = globalScheduledThreadPoolSize; - ActiveMQClient.globalThreadMaxPoolSize = globalThreadMaxPoolSize; + ActiveMQClient.globalThreadPoolSize = globalThreadMaxPoolSize; } /** diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/ClientThreadPoolsTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/ClientThreadPoolsTest.java index 57399cd4c0f..a3dc213d2c8 100644 --- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/ClientThreadPoolsTest.java +++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/ClientThreadPoolsTest.java @@ -54,7 +54,7 @@ public static void tearDown() { System.clearProperty(ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY); ActiveMQClient.initializeGlobalThreadPoolProperties(); ActiveMQClient.clearThreadPools(); - Assert.assertEquals(ActiveMQClient.DEFAULT_GLOBAL_THREAD_POOL_MAX_SIZE, ActiveMQClient.globalThreadMaxPoolSize); + Assert.assertEquals(ActiveMQClient.DEFAULT_GLOBAL_THREAD_POOL_MAX_SIZE, ActiveMQClient.getGlobalThreadPoolSize()); } @Test @@ -65,14 +65,15 @@ public void testSystemPropertyThreadPoolSettings() throws Exception { System.setProperty(ActiveMQClient.THREAD_POOL_MAX_SIZE_PROPERTY_KEY, "" + threadPoolMaxSize); System.setProperty(ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY, "" + scheduledThreadPoolSize); ActiveMQClient.initializeGlobalThreadPoolProperties(); + ActiveMQClient.clearThreadPools(); testSystemPropertiesThreadPoolSettings(threadPoolMaxSize, scheduledThreadPoolSize); } @Test public void testShutdownPoolInUse() throws Exception { - ActiveMQClient.clearThreadPools(); ActiveMQClient.setGlobalThreadPoolProperties(10, 1); + ActiveMQClient.clearThreadPools(); final CountDownLatch inUse = new CountDownLatch(1); final CountDownLatch neverLeave = new CountDownLatch(1); @@ -146,6 +147,7 @@ public void testStaticPropertiesThreadPoolSettings() throws Exception { int testScheduleSize = 9; ActiveMQClient.setGlobalThreadPoolProperties(testMaxSize, testScheduleSize); + ActiveMQClient.clearThreadPools(); testSystemPropertiesThreadPoolSettings(testMaxSize, testScheduleSize); } @@ -156,6 +158,7 @@ public void testSmallPool() throws Exception { int testScheduleSize = 9; ActiveMQClient.setGlobalThreadPoolProperties(testMaxSize, testScheduleSize); + ActiveMQClient.clearThreadPools(); testSystemPropertiesThreadPoolSettings(testMaxSize, testScheduleSize); } @@ -255,6 +258,7 @@ public void cleanup() { // Resets the global thread pool properties back to default. System.setProperties(systemProperties); ActiveMQClient.initializeGlobalThreadPoolProperties(); + ActiveMQClient.clearThreadPools(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java index c222d0d8c24..8c2c13030cf 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java @@ -103,11 +103,11 @@ public static void resetThreadPool() { private static ExecutorService getInVMExecutor() { if (threadPoolExecutor == null) { - if (ActiveMQClient.globalThreadMaxPoolSize <= -1) { + if (ActiveMQClient.getGlobalThreadPoolSize() <= -1) { threadPoolExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue(), Executors.defaultThreadFactory()); } else { - threadPoolExecutor = Executors.newFixedThreadPool(ActiveMQClient.globalThreadMaxPoolSize); + threadPoolExecutor = Executors.newFixedThreadPool(ActiveMQClient.getGlobalThreadPoolSize()); } } return threadPoolExecutor; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/CoreClientTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/CoreClientTest.java index a4ef71a7b3a..c9b8944b142 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/CoreClientTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/CoreClientTest.java @@ -74,11 +74,12 @@ public void testCoreClientWithInjectedThreadPools() throws Exception { @Test public void testCoreClientWithGlobalThreadPoolParamtersChanged() throws Exception { - int originalScheduled = ActiveMQClient.globalScheduledThreadPoolSize; - int originalGlobal = ActiveMQClient.globalThreadMaxPoolSize; + int originalScheduled = ActiveMQClient.getGlobalScheduledThreadPoolSize(); + int originalGlobal = ActiveMQClient.getGlobalThreadPoolSize(); try { ActiveMQClient.setGlobalThreadPoolProperties(2, 1); + ActiveMQClient.clearThreadPools(); ServerLocator locator = createNonHALocator(false); testCoreClient(true, locator); } From b1aad3790729b7ac6d2adb02abab4128fad5fc1d Mon Sep 17 00:00:00 2001 From: Bernd Gutjahr Date: Tue, 19 Apr 2016 13:55:30 +0200 Subject: [PATCH 3/3] abstracted global client thread pools from ThreadPoolExecutor as implementation Changed the ActiveMQClient interface to expose global thread pools as ExecutorService and ScheduledExecutorService interface. This is necessary to allow injecting thread pool implementations that are not based on ThreadPoolExecutor or ScheduledThreadPoolExecutor. --- .../artemis/api/core/client/ActiveMQClient.java | 12 +++++++----- .../activemq/artemis/ClientThreadPoolsTest.java | 2 +- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java index abc3a5d99da..3f5dcb9ec48 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java @@ -19,7 +19,9 @@ import java.net.URI; import java.security.AccessController; import java.security.PrivilegedAction; +import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; @@ -136,11 +138,11 @@ public final class ActiveMQClient { public static final String SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY = "activemq.artemis.client.global.scheduled.thread.pool.core.size"; - private static ThreadPoolExecutor globalThreadPool; + private static ExecutorService globalThreadPool; private static boolean injectedPools = false; - private static ScheduledThreadPoolExecutor globalScheduledThreadPool; + private static ScheduledExecutorService globalScheduledThreadPool; static { @@ -195,7 +197,7 @@ public static synchronized void clearThreadPools(long time, TimeUnit unit) { } /** Warning: This method has to be called before any clients or servers is started on the JVM otherwise previous ServerLocator would be broken after this call. */ - public static synchronized void injectPools(ThreadPoolExecutor globalThreadPool, ScheduledThreadPoolExecutor scheduledThreadPool) { + public static synchronized void injectPools(ExecutorService globalThreadPool, ScheduledExecutorService scheduledThreadPool) { if (globalThreadPool == null || scheduledThreadPool == null) throw new IllegalArgumentException("thread pools must not be null"); @@ -207,7 +209,7 @@ public static synchronized void injectPools(ThreadPoolExecutor globalThreadPool, injectedPools = true; } - public static synchronized ThreadPoolExecutor getGlobalThreadPool() { + public static synchronized ExecutorService getGlobalThreadPool() { if (globalThreadPool == null) { ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction() { @Override @@ -226,7 +228,7 @@ public ThreadFactory run() { return globalThreadPool; } - public static synchronized ScheduledThreadPoolExecutor getGlobalScheduledThreadPool() { + public static synchronized ScheduledExecutorService getGlobalScheduledThreadPool() { if (globalScheduledThreadPool == null) { ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction() { @Override diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/ClientThreadPoolsTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/ClientThreadPoolsTest.java index a3dc213d2c8..f9cd852e856 100644 --- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/ClientThreadPoolsTest.java +++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/ClientThreadPoolsTest.java @@ -177,7 +177,7 @@ private void testSystemPropertiesThreadPoolSettings(int expectedMax, int expecte threadPoolField.setAccessible(true); scheduledThreadPoolField.setAccessible(true); - ThreadPoolExecutor threadPool = ActiveMQClient.getGlobalThreadPool(); + ThreadPoolExecutor threadPool = (ThreadPoolExecutor) ActiveMQClient.getGlobalThreadPool(); final CountDownLatch doneMax = new CountDownLatch(expectedMax); final CountDownLatch latch = new CountDownLatch(1);