Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,21 @@
*/
package org.apache.rocketmq.broker;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.client.ClientHousekeepingService;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ConsumerManager;
Expand Down Expand Up @@ -85,21 +100,6 @@
import org.apache.rocketmq.store.stats.BrokerStats;
import org.apache.rocketmq.store.stats.BrokerStatsManager;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class BrokerController {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
Expand Down Expand Up @@ -129,6 +129,7 @@ public class BrokerController {
private final BlockingQueue<Runnable> pullThreadPoolQueue;
private final BlockingQueue<Runnable> queryThreadPoolQueue;
private final BlockingQueue<Runnable> clientManagerThreadPoolQueue;
private final BlockingQueue<Runnable> heartbeatThreadPoolQueue;
private final BlockingQueue<Runnable> consumerManagerThreadPoolQueue;
private final FilterServerManager filterServerManager;
private final BrokerStatsManager brokerStatsManager;
Expand All @@ -143,6 +144,7 @@ public class BrokerController {
private ExecutorService queryMessageExecutor;
private ExecutorService adminBrokerExecutor;
private ExecutorService clientManageExecutor;
private ExecutorService heartbeatExecutor;
private ExecutorService consumerManageExecutor;
private boolean updateMasterHAServerAddrPeriodically = false;
private BrokerStats brokerStats;
Expand Down Expand Up @@ -186,6 +188,7 @@ public BrokerController(
this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());

this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
Expand Down Expand Up @@ -280,6 +283,15 @@ public boolean initialize() throws CloneNotSupportedException {
this.clientManagerThreadPoolQueue,
new ThreadFactoryImpl("ClientManageThread_"));

this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getHeartbeatThreadPoolNums(),
this.brokerConfig.getHeartbeatThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.heartbeatThreadPoolQueue,
new ThreadFactoryImpl("HeartbeatThread_",true));


this.consumerManageExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
"ConsumerManageThread_"));
Expand Down Expand Up @@ -501,11 +513,11 @@ public void registerProcessor() {
* ClientManageProcessor
*/
ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.clientManageExecutor);
this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);

this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.clientManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);

Expand Down Expand Up @@ -729,10 +741,6 @@ public void shutdown() {
if (this.fileWatchService != null) {
this.fileWatchService.shutdown();
}

if (this.transactionalMessageCheckService != null) {
this.transactionalMessageCheckService.shutdown(false);
}
}

private void unregisterBrokerAll() {
Expand Down Expand Up @@ -990,6 +998,10 @@ public Configuration getConfiguration() {
return this.configuration;
}

public BlockingQueue<Runnable> getHeartbeatThreadPoolQueue() {
return heartbeatThreadPoolQueue;
}

public TransactionalMessageCheckService getTransactionalMessageCheckService() {
return transactionalMessageCheckService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ private void cleanExpiredRequest() {

cleanExpiredRequestInQueue(this.brokerController.getPullThreadPoolQueue(),
this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue());

cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(),
this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue());
}

void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
Expand Down
27 changes: 27 additions & 0 deletions common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class BrokerConfig {
private int adminBrokerThreadPoolNums = 16;
private int clientManageThreadPoolNums = 32;
private int consumerManageThreadPoolNums = 32;
private int heartbeatThreadPoolNums = Math.min(32,Runtime.getRuntime().availableProcessors());

private int flushConsumerOffsetInterval = 1000 * 5;

Expand All @@ -77,6 +78,7 @@ public class BrokerConfig {
private int queryThreadPoolQueueCapacity = 20000;
private int clientManagerThreadPoolQueueCapacity = 1000000;
private int consumerManagerThreadPoolQueueCapacity = 1000000;
private int heartbeatThreadPoolQueueCapacity = 50000;

private int filterServerNums = 0;

Expand Down Expand Up @@ -108,6 +110,7 @@ public class BrokerConfig {
private boolean brokerFastFailureEnable = true;
private long waitTimeMillsInSendQueue = 200;
private long waitTimeMillsInPullQueue = 5 * 1000;
private long waitTimeMillsInHeartbeatQueue = 31 * 1000;

private long startAcceptSendRequestTimeStamp = 0L;

Expand Down Expand Up @@ -643,6 +646,30 @@ public void setForceRegister(boolean forceRegister) {
this.forceRegister = forceRegister;
}

public int getHeartbeatThreadPoolQueueCapacity() {
return heartbeatThreadPoolQueueCapacity;
}

public void setHeartbeatThreadPoolQueueCapacity(int heartbeatThreadPoolQueueCapacity) {
this.heartbeatThreadPoolQueueCapacity = heartbeatThreadPoolQueueCapacity;
}

public int getHeartbeatThreadPoolNums() {
return heartbeatThreadPoolNums;
}

public void setHeartbeatThreadPoolNums(int heartbeatThreadPoolNums) {
this.heartbeatThreadPoolNums = heartbeatThreadPoolNums;
}

public long getWaitTimeMillsInHeartbeatQueue() {
return waitTimeMillsInHeartbeatQueue;
}

public void setWaitTimeMillsInHeartbeatQueue(long waitTimeMillsInHeartbeatQueue) {
this.waitTimeMillsInHeartbeatQueue = waitTimeMillsInHeartbeatQueue;
}

public int getRegisterNameServerPeriod() {
return registerNameServerPeriod;
}
Expand Down