Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import java.net.URI;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
Expand All @@ -46,9 +48,9 @@
*/
public final class ActiveMQClient {

public static int globalThreadMaxPoolSize;
private static int globalThreadPoolSize;

public static int globalScheduledThreadPoolSize;
private static int globalScheduledThreadPoolSize;

public static final String DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME = RoundRobinConnectionLoadBalancingPolicy.class.getCanonicalName();

Expand Down Expand Up @@ -136,11 +138,11 @@ public final class ActiveMQClient {

public static final String SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY = "activemq.artemis.client.global.scheduled.thread.pool.core.size";

private static ThreadPoolExecutor globalThreadPool;
private static ExecutorService globalThreadPool;

private static boolean injectedPools = false;

private static ScheduledThreadPoolExecutor globalScheduledThreadPool;
private static ScheduledExecutorService globalScheduledThreadPool;


static {
Expand Down Expand Up @@ -195,17 +197,19 @@ public static synchronized void clearThreadPools(long time, TimeUnit unit) {
}

/** Warning: This method has to be called before any clients or servers is started on the JVM otherwise previous ServerLocator would be broken after this call. */
public static synchronized void injectPools(ThreadPoolExecutor globalThreadPool, ScheduledThreadPoolExecutor scheduledThreadPoolExecutor) {
public static synchronized void injectPools(ExecutorService globalThreadPool, ScheduledExecutorService scheduledThreadPool) {
if (globalThreadPool == null || scheduledThreadPool == null)
throw new IllegalArgumentException("thread pools must not be null");

// We call clearThreadPools as that will shutdown any previously used executor
clearThreadPools();

ActiveMQClient.globalThreadPool = globalThreadPool;
ActiveMQClient.globalScheduledThreadPool = scheduledThreadPoolExecutor;
ActiveMQClient.globalScheduledThreadPool = scheduledThreadPool;
injectedPools = true;
}

public static synchronized ThreadPoolExecutor getGlobalThreadPool() {
public static synchronized ExecutorService getGlobalThreadPool() {
if (globalThreadPool == null) {
ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
@Override
Expand All @@ -214,17 +218,17 @@ public ThreadFactory run() {
}
});

if (globalThreadMaxPoolSize == -1) {
if (globalThreadPoolSize == -1) {
globalThreadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
}
else {
globalThreadPool = new ThreadPoolExecutor(ActiveMQClient.globalThreadMaxPoolSize, ActiveMQClient.globalThreadMaxPoolSize, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), factory);
globalThreadPool = new ThreadPoolExecutor(ActiveMQClient.globalThreadPoolSize, ActiveMQClient.globalThreadPoolSize, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), factory);
}
}
return globalThreadPool;
}

public static synchronized ScheduledThreadPoolExecutor getGlobalScheduledThreadPool() {
public static synchronized ScheduledExecutorService getGlobalScheduledThreadPool() {
if (globalScheduledThreadPool == null) {
ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
@Override
Expand All @@ -238,52 +242,47 @@ public ThreadFactory run() {
return globalScheduledThreadPool;
}

public static int getGlobalThreadPoolSize() {
return globalThreadPoolSize;
}


public static int getGlobalScheduledThreadPoolSize() {
return globalScheduledThreadPoolSize;
}

/**
* (Re)Initializes the global thread pools properties from System properties. This method will update the global
* thread pool configuration based on defined System properties (or defaults if they are not set) notifying
* all globalThreadPoolListeners. The System properties key names are as follow:
* Initializes the global thread pools properties from System properties. This method will update the global
* thread pool configuration based on defined System properties (or defaults if they are not set).
* The System properties key names are as follow:
*
* ActiveMQClient.THREAD_POOL_MAX_SIZE_PROPERTY_KEY="activemq.artemis.client.global.thread.pool.max.size"
* ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY="activemq.artemis.client.global.scheduled.thread.pool.core.size
*
* The min value for max thread pool size is 2. Providing a value lower than 2 will be ignored and will defaul to 2.
* The min value for max thread pool size is 2. If the value is not -1, but lower than 2, it will be ignored and will default to 2.
* A value of -1 configures an unbounded thread pool.
*
* Note. The ServerLocatorImpl registers a listener and uses it to configure it's global thread pools. If global
* thread pools have already been created, they will be updated with these new values.
* Note: If global thread pools have already been created, they will not be updated with these new values.
*/
public static void initializeGlobalThreadPoolProperties() {

setGlobalThreadPoolProperties(Integer.valueOf(Integer.valueOf(System.getProperty(ActiveMQClient.THREAD_POOL_MAX_SIZE_PROPERTY_KEY, "" + ActiveMQClient.DEFAULT_GLOBAL_THREAD_POOL_MAX_SIZE))), Integer.valueOf(System.getProperty(ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY, "" + ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE)));
setGlobalThreadPoolProperties(Integer.valueOf(System.getProperty(ActiveMQClient.THREAD_POOL_MAX_SIZE_PROPERTY_KEY, "" + ActiveMQClient.DEFAULT_GLOBAL_THREAD_POOL_MAX_SIZE)), Integer.valueOf(System.getProperty(ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY, "" + ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE)));
}

/**
* Allows programatically configuration of global thread pools properties. This method will update the global
* thread pool configuration based on the provided values notifying all globalThreadPoolListeners.
*
* Note. The ServerLocatorImpl registers a listener and uses it to configure it's global thread pools. If global
* thread pools have already been created, they will be updated with these new values.
* Note: If global thread pools have already been created, they will not be updated with these new values.
*
* The min value for max thread pool size is 2. Providing a value lower than 2 will be ignored and will default to 2.
* The min value for globalThreadMaxPoolSize is 2. If the value is not -1, but lower than 2, it will be ignored and will default to 2.
* A value of -1 configures an unbounded thread pool.
*/
public static void setGlobalThreadPoolProperties(int globalThreadMaxPoolSize, int globalScheduledThreadPoolSize) {

if (globalThreadMaxPoolSize < 2) globalThreadMaxPoolSize = 2;
if (globalThreadMaxPoolSize < 2 && globalThreadMaxPoolSize != -1) globalThreadMaxPoolSize = 2;

ActiveMQClient.globalScheduledThreadPoolSize = globalScheduledThreadPoolSize;
ActiveMQClient.globalThreadMaxPoolSize = globalThreadMaxPoolSize;

// if injected, we won't do anything with the pool as they're not ours
if (!injectedPools) {
// Right now I'm ignoring the corePool size on purpose as there's no way to have two values for the number of threads
// this is basically a fixed size thread pool (although the pool grows on demand)
getGlobalThreadPool().setCorePoolSize(globalThreadMaxPoolSize);
getGlobalThreadPool().setMaximumPoolSize(globalThreadMaxPoolSize);

getGlobalScheduledThreadPool().setCorePoolSize(globalScheduledThreadPoolSize);
}
ActiveMQClient.globalThreadPoolSize = globalThreadMaxPoolSize;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public static void tearDown() {
System.clearProperty(ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY);
ActiveMQClient.initializeGlobalThreadPoolProperties();
ActiveMQClient.clearThreadPools();
Assert.assertEquals(ActiveMQClient.DEFAULT_GLOBAL_THREAD_POOL_MAX_SIZE, ActiveMQClient.globalThreadMaxPoolSize);
Assert.assertEquals(ActiveMQClient.DEFAULT_GLOBAL_THREAD_POOL_MAX_SIZE, ActiveMQClient.getGlobalThreadPoolSize());
}

@Test
Expand All @@ -65,14 +65,15 @@ public void testSystemPropertyThreadPoolSettings() throws Exception {
System.setProperty(ActiveMQClient.THREAD_POOL_MAX_SIZE_PROPERTY_KEY, "" + threadPoolMaxSize);
System.setProperty(ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY, "" + scheduledThreadPoolSize);
ActiveMQClient.initializeGlobalThreadPoolProperties();
ActiveMQClient.clearThreadPools();

testSystemPropertiesThreadPoolSettings(threadPoolMaxSize, scheduledThreadPoolSize);
}

@Test
public void testShutdownPoolInUse() throws Exception {
ActiveMQClient.clearThreadPools();
ActiveMQClient.setGlobalThreadPoolProperties(10, 1);
ActiveMQClient.clearThreadPools();

final CountDownLatch inUse = new CountDownLatch(1);
final CountDownLatch neverLeave = new CountDownLatch(1);
Expand Down Expand Up @@ -146,6 +147,7 @@ public void testStaticPropertiesThreadPoolSettings() throws Exception {
int testScheduleSize = 9;

ActiveMQClient.setGlobalThreadPoolProperties(testMaxSize, testScheduleSize);
ActiveMQClient.clearThreadPools();
testSystemPropertiesThreadPoolSettings(testMaxSize, testScheduleSize);
}

Expand All @@ -156,6 +158,7 @@ public void testSmallPool() throws Exception {
int testScheduleSize = 9;

ActiveMQClient.setGlobalThreadPoolProperties(testMaxSize, testScheduleSize);
ActiveMQClient.clearThreadPools();
testSystemPropertiesThreadPoolSettings(testMaxSize, testScheduleSize);
}

Expand All @@ -174,7 +177,7 @@ private void testSystemPropertiesThreadPoolSettings(int expectedMax, int expecte
threadPoolField.setAccessible(true);
scheduledThreadPoolField.setAccessible(true);

ThreadPoolExecutor threadPool = ActiveMQClient.getGlobalThreadPool();
ThreadPoolExecutor threadPool = (ThreadPoolExecutor) ActiveMQClient.getGlobalThreadPool();

final CountDownLatch doneMax = new CountDownLatch(expectedMax);
final CountDownLatch latch = new CountDownLatch(1);
Expand Down Expand Up @@ -255,6 +258,7 @@ public void cleanup() {
// Resets the global thread pool properties back to default.
System.setProperties(systemProperties);
ActiveMQClient.initializeGlobalThreadPoolProperties();
ActiveMQClient.clearThreadPools();
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ public static void resetThreadPool() {

private static ExecutorService getInVMExecutor() {
if (threadPoolExecutor == null) {
if (ActiveMQClient.globalThreadMaxPoolSize <= -1) {
if (ActiveMQClient.getGlobalThreadPoolSize() <= -1) {
threadPoolExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Executors.defaultThreadFactory());
}
else {
threadPoolExecutor = Executors.newFixedThreadPool(ActiveMQClient.globalThreadMaxPoolSize);
threadPoolExecutor = Executors.newFixedThreadPool(ActiveMQClient.getGlobalThreadPoolSize());
}
}
return threadPoolExecutor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,12 @@ public void testCoreClientWithInjectedThreadPools() throws Exception {
@Test
public void testCoreClientWithGlobalThreadPoolParamtersChanged() throws Exception {

int originalScheduled = ActiveMQClient.globalScheduledThreadPoolSize;
int originalGlobal = ActiveMQClient.globalThreadMaxPoolSize;
int originalScheduled = ActiveMQClient.getGlobalScheduledThreadPoolSize();
int originalGlobal = ActiveMQClient.getGlobalThreadPoolSize();

try {
ActiveMQClient.setGlobalThreadPoolProperties(2, 1);
ActiveMQClient.clearThreadPools();
ServerLocator locator = createNonHALocator(false);
testCoreClient(true, locator);
}
Expand Down