Skip to content

Commit

Permalink
ARTEMIS-4522 simplify test & complete implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
jbertram committed Dec 7, 2023
1 parent 54f06a0 commit 0b3f36e
Show file tree
Hide file tree
Showing 12 changed files with 192 additions and 301 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class ServerLocatorConfig {
public boolean useGlobalPools = ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS;
public int threadPoolMaxSize = ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE;
public int scheduledThreadPoolMaxSize = ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
public int flowControlThreadPoolMaxSize = ActiveMQClient.DEFAULT_FLOW_CONTROL_THREAD_POOL_MAX_SIZE;
public long retryInterval = ActiveMQClient.DEFAULT_RETRY_INTERVAL;
public double retryIntervalMultiplier = ActiveMQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
public long maxRetryInterval = ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL;
Expand Down Expand Up @@ -76,6 +77,7 @@ public ServerLocatorConfig(final ServerLocatorConfig locator) {
ackBatchSize = locator.ackBatchSize;
useGlobalPools = locator.useGlobalPools;
scheduledThreadPoolMaxSize = locator.scheduledThreadPoolMaxSize;
flowControlThreadPoolMaxSize = locator.flowControlThreadPoolMaxSize;
threadPoolMaxSize = locator.threadPoolMaxSize;
retryInterval = locator.retryInterval;
retryIntervalMultiplier = locator.retryIntervalMultiplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public final class ActiveMQClient {

private static int globalScheduledThreadPoolSize;

private static int flowControlThreadPoolSize;
private static int globalFlowControlThreadPoolSize;

public static final String DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME = RoundRobinConnectionLoadBalancingPolicy.class.getCanonicalName();

Expand Down Expand Up @@ -150,9 +150,11 @@ public final class ActiveMQClient {

public static final String SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY = "activemq.artemis.client.global.scheduled.thread.pool.core.size";

public static final String FLOW_CONTROL_THREAD_POOL_SIZE_PROPERTY_KEY = "activemq.artemis.client.global.flowcontrol.thread.pool.core.size";

private static ExecutorService globalThreadPool;

private static ExecutorService flowControlThreadPool;
private static ExecutorService globalFlowControlThreadPool;

private static boolean injectedPools = false;

Expand All @@ -171,12 +173,13 @@ public static synchronized void clearThreadPools(long time, TimeUnit unit) {
if (injectedPools) {
globalThreadPool = null;
globalScheduledThreadPool = null;
flowControlThreadPool = null;
globalFlowControlThreadPool = null;
injectedPools = false;
return;
}

if (globalThreadPool != null) {
System.out.println("Shutting down main pool...");
globalThreadPool.shutdownNow();
try {
if (!globalThreadPool.awaitTermination(time, unit)) {
Expand All @@ -191,6 +194,7 @@ public static synchronized void clearThreadPools(long time, TimeUnit unit) {
}

if (globalScheduledThreadPool != null) {
System.out.println("Shutting down scheduled pool...");
globalScheduledThreadPool.shutdownNow();
try {
if (!globalScheduledThreadPool.awaitTermination(time, unit)) {
Expand All @@ -204,17 +208,18 @@ public static synchronized void clearThreadPools(long time, TimeUnit unit) {
}
}

if (flowControlThreadPool != null) {
flowControlThreadPool.shutdownNow();
if (globalFlowControlThreadPool != null) {
System.out.println("Shutting down flow-control pool...");
globalFlowControlThreadPool.shutdownNow();
try {
if (!flowControlThreadPool.awaitTermination(time, unit)) {
flowControlThreadPool.shutdownNow();
ActiveMQClientLogger.LOGGER.unableToProcessScheduledlIn10Sec();
if (!globalFlowControlThreadPool.awaitTermination(time, unit)) {
globalFlowControlThreadPool.shutdownNow();
ActiveMQClientLogger.LOGGER.unableToProcessGlobalFlowControlThreadPoolIn10Sec();
}
} catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
} finally {
flowControlThreadPool = null;
globalFlowControlThreadPool = null;
}
}
}
Expand All @@ -223,53 +228,41 @@ public static synchronized void clearThreadPools(long time, TimeUnit unit) {
* Warning: This method has to be called before any clients or servers is started on the JVM otherwise previous ServerLocator would be broken after this call.
*/
public static synchronized void injectPools(ExecutorService globalThreadPool,
ScheduledExecutorService scheduledThreadPool) {
if (globalThreadPool == null || scheduledThreadPool == null)
ScheduledExecutorService scheduledThreadPool,
ExecutorService flowControlThreadPool) {
if (globalThreadPool == null || scheduledThreadPool == null || flowControlThreadPool == null)
throw new IllegalArgumentException("thread pools must not be null");

// We call clearThreadPools as that will shutdown any previously used executor
clearThreadPools();

ActiveMQClient.globalThreadPool = globalThreadPool;
ActiveMQClient.globalScheduledThreadPool = scheduledThreadPool;
ActiveMQClient.globalFlowControlThreadPool = flowControlThreadPool;
injectedPools = true;
}

public static synchronized ExecutorService getGlobalThreadPool() {
if (globalThreadPool == null) {
ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
@Override
public ThreadFactory run() {
return new ActiveMQThreadFactory("ActiveMQ-client-global-threads", true, ClientSessionFactoryImpl.class.getClassLoader());
}
});

if (globalThreadPoolSize == -1) {
globalThreadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
} else {
globalThreadPool = new ActiveMQThreadPoolExecutor(0, ActiveMQClient.globalThreadPoolSize, 60L, TimeUnit.SECONDS, factory);
}
}
globalThreadPool = internalGetGlobalThreadPool(globalThreadPool, "ActiveMQ-client-global-threads", ActiveMQClient.globalThreadPoolSize);
return globalThreadPool;
}

public static synchronized ExecutorService getGlobalFlowControlThreadPool() {
globalFlowControlThreadPool = internalGetGlobalThreadPool(globalFlowControlThreadPool, "ActiveMQ-client-global-flow-control-threads", ActiveMQClient.globalFlowControlThreadPoolSize);
return globalFlowControlThreadPool;
}

public static synchronized ExecutorService getFlowControlThreadPool() {
if (flowControlThreadPool == null) {
ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
@Override
public ThreadFactory run() {
return new ActiveMQThreadFactory("ActiveMQ-client-flow-threads", true, ClientSessionFactoryImpl.class.getClassLoader());
}
});
private static synchronized ExecutorService internalGetGlobalThreadPool(ExecutorService executorService, String groupName, int poolSize) {
if (executorService == null) {
ThreadFactory factory = AccessController.doPrivileged((PrivilegedAction<ThreadFactory>) () -> new ActiveMQThreadFactory(groupName, true, ClientSessionFactoryImpl.class.getClassLoader()));

if (flowControlThreadPoolSize == -1) {
flowControlThreadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), factory);
if (poolSize == -1) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), factory);
} else {
flowControlThreadPool = new ActiveMQThreadPoolExecutor(0, ActiveMQClient.flowControlThreadPoolSize, 60L, TimeUnit.SECONDS, factory);
executorService = new ActiveMQThreadPoolExecutor(0, poolSize, 60L, TimeUnit.SECONDS, factory);
}
}
return flowControlThreadPool;
return executorService;
}

public static synchronized ScheduledExecutorService getGlobalScheduledThreadPool() {
Expand All @@ -294,6 +287,10 @@ public static int getGlobalScheduledThreadPoolSize() {
return globalScheduledThreadPoolSize;
}

public static int getGlobalFlowControlThreadPoolSize() {
return globalFlowControlThreadPoolSize;
}

/**
* Initializes the global thread pools properties from System properties. This method will update the global
* thread pool configuration based on defined System properties (or defaults if they are not set).
Expand All @@ -309,7 +306,7 @@ public static int getGlobalScheduledThreadPoolSize() {
*/
public static void initializeGlobalThreadPoolProperties() {

setGlobalThreadPoolProperties(Integer.valueOf(System.getProperty(ActiveMQClient.THREAD_POOL_MAX_SIZE_PROPERTY_KEY, "" + ActiveMQClient.DEFAULT_GLOBAL_THREAD_POOL_MAX_SIZE)), Integer.valueOf(System.getProperty(ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY, "" + ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE)));
setGlobalThreadPoolProperties(Integer.valueOf(System.getProperty(ActiveMQClient.THREAD_POOL_MAX_SIZE_PROPERTY_KEY, "" + ActiveMQClient.DEFAULT_GLOBAL_THREAD_POOL_MAX_SIZE)), Integer.valueOf(System.getProperty(ActiveMQClient.SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY, "" + ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE)), Integer.valueOf(System.getProperty(ActiveMQClient.FLOW_CONTROL_THREAD_POOL_SIZE_PROPERTY_KEY, "" + ActiveMQClient.DEFAULT_FLOW_CONTROL_THREAD_POOL_MAX_SIZE)));
}

/**
Expand All @@ -321,14 +318,14 @@ public static void initializeGlobalThreadPoolProperties() {
* The min value for globalThreadMaxPoolSize is 2. If the value is not -1, but lower than 2, it will be ignored and will default to 2.
* A value of -1 configures an unbounded thread pool.
*/
public static void setGlobalThreadPoolProperties(int globalThreadMaxPoolSize, int globalScheduledThreadPoolSize) {
public static void setGlobalThreadPoolProperties(int globalThreadMaxPoolSize, int globalScheduledThreadPoolSize, int globalFlowControlThreadPoolSize) {

if (globalThreadMaxPoolSize < 2 && globalThreadMaxPoolSize != -1)
globalThreadMaxPoolSize = 2;

ActiveMQClient.globalScheduledThreadPoolSize = globalScheduledThreadPoolSize;
ActiveMQClient.globalThreadPoolSize = globalThreadMaxPoolSize;
ActiveMQClient.flowControlThreadPoolSize = DEFAULT_FLOW_CONTROL_THREAD_POOL_MAX_SIZE;
ActiveMQClient.globalFlowControlThreadPoolSize = globalFlowControlThreadPoolSize;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,26 @@ ClientSessionFactory createSessionFactory(TransportConfiguration transportConfig
*/
ServerLocator setThreadPoolMaxSize(int threadPoolMaxSize);

/**
* Returns the maximum size of the flow-control thread pool.
* <p>
* Default value is {@link ActiveMQClient#DEFAULT_FLOW_CONTROL_THREAD_POOL_MAX_SIZE}.
*
* @return the maximum size of the flow-control thread pool.
*/
int getFlowControlThreadPoolMaxSize();

/**
* Sets the maximum size of the flow-control thread pool.
* <p>
* This setting is relevant only if this factory does not use global pools.
* Value must be -1 (for unlimited thread pool) or greater than 0.
*
* @param flowControlThreadPoolMaxSize maximum size of the flow-control thread pool.
* @return this ServerLocator
*/
ServerLocator setFlowControlThreadPoolMaxSize(int flowControlThreadPoolMaxSize);

/**
* Returns the time to retry connections created by this factory after failure.
* <p>
Expand Down Expand Up @@ -847,7 +867,7 @@ ClientSessionFactory createSessionFactory(TransportConfiguration transportConfig

String getOutgoingInterceptorList();

boolean setThreadPools(Executor threadPool, ScheduledExecutorService scheduledThreadPoolExecutor);
boolean setThreadPools(Executor threadPool, ScheduledExecutorService scheduledThreadPoolExecutor, Executor flowControlThreadPool);

/** This will only instantiate internal objects such as the topology */
void initialize() throws ActiveMQException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,4 +347,7 @@ public interface ActiveMQClientLogger {

@LogMessage(id = 214034, value = "{} has negative counts {}\n{}", level = LogMessage.Level.ERROR)
void negativeRefCount(String message, String count, String debugString);

@LogMessage(id = 214035, value = "Couldn't finish the client globalFlowControlThreadPool in less than 10 seconds, interrupting it now", level = LogMessage.Level.WARN)
void unableToProcessGlobalFlowControlThreadPoolIn10Sec();
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,12 @@ public ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
final int reconnectAttempts,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool,
final Executor flowControlPool,
final Executor flowControlThreadPool,
final List<Interceptor> incomingInterceptors,
final List<Interceptor> outgoingInterceptors) {
this(serverLocator, new Pair<>(connectorConfig, null),
locatorConfig, reconnectAttempts, threadPool,
scheduledThreadPool,flowControlPool, incomingInterceptors, outgoingInterceptors);
scheduledThreadPool, flowControlThreadPool, incomingInterceptors, outgoingInterceptors);
}


Expand All @@ -190,12 +190,12 @@ public ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
final int reconnectAttempts,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool,
final Executor flowControlPool,
final Executor flowControlThreadPool,
final List<Interceptor> incomingInterceptors,
final List<Interceptor> outgoingInterceptors) {
this(serverLocator, connectorConfig,
locatorConfig, reconnectAttempts, threadPool,
scheduledThreadPool, flowControlPool,incomingInterceptors, outgoingInterceptors, null);
scheduledThreadPool, flowControlThreadPool, incomingInterceptors, outgoingInterceptors, null);
}

ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
Expand All @@ -204,7 +204,7 @@ public ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
final int reconnectAttempts,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool,
final Executor flowControlPoll,
final Executor flowControlThreadPool,
final List<Interceptor> incomingInterceptors,
final List<Interceptor> outgoingInterceptors,
final TransportConfiguration[] connectorConfigs) {
Expand Down Expand Up @@ -256,7 +256,7 @@ public ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,

orderedExecutorFactory = new OrderedExecutorFactory(threadPool);

flowControlExecutor = new OrderedExecutorFactory(flowControlPoll).getExecutor();
flowControlExecutor = new OrderedExecutorFactory(flowControlThreadPool).getExecutor();

closeExecutor = orderedExecutorFactory.getExecutor();

Expand Down
Loading

0 comments on commit 0b3f36e

Please sign in to comment.