Skip to content

Commit

Permalink
ARTEMIS-4522 Dedicated thread pool for flow-control-executor
Browse files Browse the repository at this point in the history
  • Loading branch information
Todor Neykov authored and jbertram committed Dec 21, 2023
1 parent d4d4b06 commit ff2b76c
Show file tree
Hide file tree
Showing 5 changed files with 290 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public final class ActiveMQClient {

private static int globalScheduledThreadPoolSize;

private static int flowControlThreadPoolSize;

public static final String DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME = RoundRobinConnectionLoadBalancingPolicy.class.getCanonicalName();

public static final long DEFAULT_CLIENT_FAILURE_CHECK_PERIOD = ActiveMQDefaultConfiguration.getDefaultClientFailureCheckPeriod();
Expand Down Expand Up @@ -130,6 +132,8 @@ public final class ActiveMQClient {

public static final int DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE = 5;

public static final int DEFAULT_FLOW_CONTROL_THREAD_POOL_MAX_SIZE = 10;

public static final boolean DEFAULT_CACHE_LARGE_MESSAGE_CLIENT = false;

public static final int DEFAULT_INITIAL_MESSAGE_PACKET_SIZE = 1500;
Expand All @@ -148,6 +152,8 @@ public final class ActiveMQClient {

private static ExecutorService globalThreadPool;

private static ExecutorService flowControlThreadPool;

private static boolean injectedPools = false;

private static ScheduledExecutorService globalScheduledThreadPool;
Expand All @@ -165,6 +171,7 @@ public static synchronized void clearThreadPools(long time, TimeUnit unit) {
if (injectedPools) {
globalThreadPool = null;
globalScheduledThreadPool = null;
flowControlThreadPool = null;
injectedPools = false;
return;
}
Expand Down Expand Up @@ -196,6 +203,20 @@ public static synchronized void clearThreadPools(long time, TimeUnit unit) {
globalScheduledThreadPool = null;
}
}

if (flowControlThreadPool != null) {
flowControlThreadPool.shutdownNow();
try {
if (!flowControlThreadPool.awaitTermination(time, unit)) {
flowControlThreadPool.shutdownNow();
ActiveMQClientLogger.LOGGER.unableToProcessScheduledlIn10Sec();
}
} catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
} finally {
flowControlThreadPool = null;
}
}
}

/**
Expand Down Expand Up @@ -232,6 +253,25 @@ public ThreadFactory run() {
return globalThreadPool;
}


public static synchronized ExecutorService getFlowControlThreadPool() {
if (flowControlThreadPool == null) {
ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
@Override
public ThreadFactory run() {
return new ActiveMQThreadFactory("ActiveMQ-client-flow-threads", true, ClientSessionFactoryImpl.class.getClassLoader());
}
});

if (flowControlThreadPoolSize == -1) {
flowControlThreadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), factory);
} else {
flowControlThreadPool = new ActiveMQThreadPoolExecutor(0, ActiveMQClient.flowControlThreadPoolSize, 60L, TimeUnit.SECONDS, factory);
}
}
return flowControlThreadPool;
}

public static synchronized ScheduledExecutorService getGlobalScheduledThreadPool() {
if (globalScheduledThreadPool == null) {
ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
Expand Down Expand Up @@ -288,6 +328,7 @@ public static void setGlobalThreadPoolProperties(int globalThreadMaxPoolSize, in

ActiveMQClient.globalScheduledThreadPoolSize = globalScheduledThreadPoolSize;
ActiveMQClient.globalThreadPoolSize = globalThreadMaxPoolSize;
ActiveMQClient.flowControlThreadPoolSize = DEFAULT_FLOW_CONTROL_THREAD_POOL_MAX_SIZE;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C

private final Executor closeExecutor;

private final Executor flowControlExecutor;

private RemotingConnection connection;

private final long retryInterval;
Expand Down Expand Up @@ -173,24 +175,27 @@ public ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
final int reconnectAttempts,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool,
final Executor flowControlPool,
final List<Interceptor> incomingInterceptors,
final List<Interceptor> outgoingInterceptors) {
this(serverLocator, new Pair<>(connectorConfig, null),
locatorConfig, reconnectAttempts, threadPool,
scheduledThreadPool, incomingInterceptors, outgoingInterceptors);
scheduledThreadPool,flowControlPool, incomingInterceptors, outgoingInterceptors);
}


ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
final Pair<TransportConfiguration, TransportConfiguration> connectorConfig,
final ServerLocatorConfig locatorConfig,
final int reconnectAttempts,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool,
final List<Interceptor> incomingInterceptors,
final List<Interceptor> outgoingInterceptors) {
final Pair<TransportConfiguration, TransportConfiguration> connectorConfig,
final ServerLocatorConfig locatorConfig,
final int reconnectAttempts,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool,
final Executor flowControlPool,
final List<Interceptor> incomingInterceptors,
final List<Interceptor> outgoingInterceptors) {
this(serverLocator, connectorConfig,
locatorConfig, reconnectAttempts, threadPool,
scheduledThreadPool, incomingInterceptors, outgoingInterceptors, null);
locatorConfig, reconnectAttempts, threadPool,
scheduledThreadPool, flowControlPool,incomingInterceptors, outgoingInterceptors, null);
}

ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
Expand All @@ -199,6 +204,7 @@ public ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
final int reconnectAttempts,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool,
final Executor flowControlPoll,
final List<Interceptor> incomingInterceptors,
final List<Interceptor> outgoingInterceptors,
final TransportConfiguration[] connectorConfigs) {
Expand Down Expand Up @@ -250,6 +256,8 @@ public ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,

orderedExecutorFactory = new OrderedExecutorFactory(threadPool);

flowControlExecutor = new OrderedExecutorFactory(flowControlPoll).getExecutor();

closeExecutor = orderedExecutorFactory.getExecutor();

this.incomingInterceptors = incomingInterceptors;
Expand Down Expand Up @@ -836,7 +844,7 @@ private ClientSession createSessionInternal(final String rawUsername,

SessionContext context = createSessionChannel(name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, clientID);

ClientSessionInternal session = new ClientSessionImpl(this, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(), ackBatchSize, serverLocator.getConsumerWindowSize(), serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(), serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(), serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSend(), serverLocator.isCacheLargeMessagesClient(), serverLocator.getMinLargeMessageSize(), serverLocator.isCompressLargeMessage(), serverLocator.getCompressionLevel(), serverLocator.getInitialMessagePacketSize(), serverLocator.getGroupID(), context, orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor());
ClientSessionInternal session = new ClientSessionImpl(this, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(), ackBatchSize, serverLocator.getConsumerWindowSize(), serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(), serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(), serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSend(), serverLocator.isCacheLargeMessagesClient(), serverLocator.getMinLargeMessageSize(), serverLocator.isCompressLargeMessage(), serverLocator.getCompressionLevel(), serverLocator.getInitialMessagePacketSize(), serverLocator.getGroupID(), context, orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), flowControlExecutor, orderedExecutorFactory.getExecutor());

synchronized (sessions) {
if (closed || !clientProtocolManager.isAlive()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ private enum STATE {

private transient Executor threadPool;

private transient Executor flowControlPool;
private transient ScheduledExecutorService scheduledThreadPool;

private transient DiscoveryGroup discoveryGroup;
Expand Down Expand Up @@ -188,20 +189,24 @@ public static synchronized void clearThreadPools() {
ActiveMQClient.clearThreadPools();
}


private synchronized void setThreadPools() {
if (threadPool != null) {
return;
} else if (config.useGlobalPools) {
threadPool = ActiveMQClient.getGlobalThreadPool();

flowControlPool = ActiveMQClient.getFlowControlThreadPool(); //TODO add option for config

scheduledThreadPool = ActiveMQClient.getGlobalScheduledThreadPool();
} else {
this.shutdownPool = true;

ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
@Override
public ThreadFactory run() {
return new ActiveMQThreadFactory("ActiveMQ-client-factory-threads-" + System.identityHashCode(this), true, ServerLocatorImpl.class.getClassLoader());
return new ActiveMQThreadFactory("ActiveMQ-client-factory-threads-" + System.identityHashCode(this), true,
ServerLocatorImpl.class.getClassLoader());
}
});

Expand All @@ -214,10 +219,12 @@ public ThreadFactory run() {
factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
@Override
public ThreadFactory run() {
return new ActiveMQThreadFactory("ActiveMQ-client-factory-pinger-threads-" + System.identityHashCode(this), true, ClientSessionFactoryImpl.class.getClassLoader());
return new ActiveMQThreadFactory("ActiveMQ-client-factory-pinger-threads-" + System.identityHashCode(this), true,
ClientSessionFactoryImpl.class.getClassLoader());
}
});

flowControlPool = ActiveMQClient.getFlowControlThreadPool(); //TODO add option for config;
scheduledThreadPool = Executors.newScheduledThreadPool(config.scheduledThreadPoolMaxSize, factory);
}
this.updateArrayActor = new Actor<>(threadPool, this::internalUpdateArray);
Expand Down Expand Up @@ -625,7 +632,7 @@ public ClientSessionFactory createSessionFactory(final TransportConfiguration tr

initialize();

ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this, transportConfiguration, config, reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors);
ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this, transportConfiguration, config, reconnectAttempts, threadPool, scheduledThreadPool, flowControlPool,incomingInterceptors, outgoingInterceptors);

addToConnecting(factory);
try {
Expand Down Expand Up @@ -701,7 +708,7 @@ public ClientSessionFactory createSessionFactory() throws ActiveMQException {
// try each factory in the list until we find one which works

try {
factory = new ClientSessionFactoryImpl(this, tc, config, config.reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors, initialConnectors);
factory = new ClientSessionFactoryImpl(this, tc, config, config.reconnectAttempts, threadPool, scheduledThreadPool, flowControlPool, incomingInterceptors, outgoingInterceptors, initialConnectors);
try {
addToConnecting(factory);
// We always try to connect here with only one attempt,
Expand Down Expand Up @@ -1804,7 +1811,7 @@ private synchronized void createConnectors() {
connectors = new ArrayList<>();
if (initialConnectors != null) {
for (TransportConfiguration initialConnector : initialConnectors) {
ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(ServerLocatorImpl.this, initialConnector, config, config.reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors);
ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(ServerLocatorImpl.this, initialConnector, config, config.reconnectAttempts, threadPool, scheduledThreadPool,flowControlPool, incomingInterceptors, outgoingInterceptors);

connectors.add(new Connector(initialConnector, factory));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,15 @@ protected void verifySingleAttemptToLocateLive(ActiveMQServerImpl server, Activa
final ServerLocatorConfig locatorConfig = Mockito.mock(ServerLocatorConfig.class);
final int reconnectAttempts = 1;
final Executor threadPool = Mockito.mock(Executor.class);
final Executor flowControlPool = Mockito.mock(Executor.class);
final ScheduledExecutorService scheduledThreadPool = Mockito.mock(ScheduledExecutorService.class);
final ClientProtocolManager clientProtocolManager = Mockito.mock(ClientProtocolManager.class);
when(serverLocator.newProtocolManager()).thenReturn(clientProtocolManager);
when(connectorConfig.getFactoryClassName()).thenReturn(NettyConnectorFactory.class.getName());
Map<String, Object> urlParams = new HashMap<>();
urlParams.put("port", serverSocket.getLocalPort());
when(connectorConfig.getCombinedParams()).thenReturn(urlParams);
ClientSessionFactoryImpl sessionFactory = new ClientSessionFactoryImpl(serverLocator, connectorConfig, locatorConfig, reconnectAttempts, threadPool, scheduledThreadPool, null, null);
ClientSessionFactoryImpl sessionFactory = new ClientSessionFactoryImpl(serverLocator, connectorConfig, locatorConfig, reconnectAttempts, threadPool, scheduledThreadPool, flowControlPool,null, null);
when(clusterControl.getSessionFactory()).thenReturn(sessionFactory);
when(clientProtocolManager.isAlive()).thenReturn(true);

Expand Down Expand Up @@ -270,4 +271,4 @@ public void connectionFailed(ActiveMQException exception, boolean failedOver, St
assertFalse(reconnectWorkOnRetry.get());
}
}
}
}

0 comments on commit ff2b76c

Please sign in to comment.