Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public class BrokerController {
private final SlaveSynchronize slaveSynchronize;
private final BlockingQueue<Runnable> sendThreadPoolQueue;
private final BlockingQueue<Runnable> pullThreadPoolQueue;
private final BlockingQueue<Runnable> queryThreadPoolQueue;
private final BlockingQueue<Runnable> clientManagerThreadPoolQueue;
private final BlockingQueue<Runnable> consumerManagerThreadPoolQueue;
private final FilterServerManager filterServerManager;
Expand All @@ -126,6 +127,7 @@ public class BrokerController {
private TopicConfigManager topicConfigManager;
private ExecutorService sendMessageExecutor;
private ExecutorService pullMessageExecutor;
private ExecutorService queryMessageExecutor;
private ExecutorService adminBrokerExecutor;
private ExecutorService clientManageExecutor;
private ExecutorService consumerManageExecutor;
Expand Down Expand Up @@ -163,8 +165,8 @@ public BrokerController(
this.slaveSynchronize = new SlaveSynchronize(this);

this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());

this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());

Expand All @@ -191,6 +193,10 @@ public BlockingQueue<Runnable> getPullThreadPoolQueue() {
return pullThreadPoolQueue;
}

public BlockingQueue<Runnable> getQueryThreadPoolQueue() {
return queryThreadPoolQueue;
}

public boolean initialize() throws CloneNotSupportedException {
boolean result = this.topicConfigManager.load();

Expand Down Expand Up @@ -237,6 +243,14 @@ public boolean initialize() throws CloneNotSupportedException {
this.pullThreadPoolQueue,
new ThreadFactoryImpl("PullMessageThread_"));

this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getQueryMessageThreadPoolNums(),
this.brokerConfig.getQueryMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.queryThreadPoolQueue,
new ThreadFactoryImpl("QueryMessageThread_"));

this.adminBrokerExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
"AdminBrokerThread_"));
Expand Down Expand Up @@ -404,11 +418,11 @@ public void registerProcessor() {
* QueryMessageProcessor
*/
NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);
this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.pullMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.pullMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);

this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.pullMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.pullMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);

/**
* ClientManageProcessor
Expand Down Expand Up @@ -494,9 +508,14 @@ public long headSlowTimeMills4PullThreadPoolQueue() {
return this.headSlowTimeMills(this.pullThreadPoolQueue);
}

public long headSlowTimeMills4QueryThreadPoolQueue() {
return this.headSlowTimeMills(this.queryThreadPoolQueue);
}

public void printWaterMark() {
LOG_WATER_MARK.info("[WATERMARK] Send Queue Size: {} SlowTimeMills: {}", this.sendThreadPoolQueue.size(), headSlowTimeMills4SendThreadPoolQueue());
LOG_WATER_MARK.info("[WATERMARK] Pull Queue Size: {} SlowTimeMills: {}", this.pullThreadPoolQueue.size(), headSlowTimeMills4PullThreadPoolQueue());
LOG_WATER_MARK.info("[WATERMARK] Query Queue Size: {} SlowTimeMills: {}", this.queryThreadPoolQueue.size(), headSlowTimeMills4QueryThreadPoolQueue());
}

public MessageStore getMessageStore() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1205,11 +1205,17 @@ private HashMap<String, String> prepareRuntimeInfo() {
runtimeInfo.put("pullThreadPoolQueueCapacity",
String.valueOf(this.brokerController.getBrokerConfig().getPullThreadPoolQueueCapacity()));

runtimeInfo.put("queryThreadPoolQueueSize", String.valueOf(this.brokerController.getQueryThreadPoolQueue().size()));
runtimeInfo.put("queryThreadPoolQueueCapacity",
String.valueOf(this.brokerController.getBrokerConfig().getQueryThreadPoolQueueCapacity()));

runtimeInfo.put("dispatchBehindBytes", String.valueOf(this.brokerController.getMessageStore().dispatchBehindBytes()));
runtimeInfo.put("pageCacheLockTimeMills", String.valueOf(this.brokerController.getMessageStore().lockTimeMills()));

runtimeInfo.put("sendThreadPoolQueueHeadWaitTimeMills", String.valueOf(this.brokerController.headSlowTimeMills4SendThreadPoolQueue()));
runtimeInfo.put("pullThreadPoolQueueHeadWaitTimeMills", String.valueOf(this.brokerController.headSlowTimeMills4PullThreadPoolQueue()));
runtimeInfo.put("queryThreadPoolQueueHeadWaitTimeMills", String.valueOf(this.brokerController.headSlowTimeMills4QueryThreadPoolQueue()));

runtimeInfo.put("earliestMessageTimeStamp", String.valueOf(this.brokerController.getMessageStore().getEarliestMessageTime()));
runtimeInfo.put("startAcceptSendRequestTimeStamp", String.valueOf(this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp()));
if (this.brokerController.getMessageStore() instanceof DefaultMessageStore) {
Expand Down
19 changes: 19 additions & 0 deletions common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public class BrokerConfig {
*/
private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4;
private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
private int queryMessageThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors();

private int adminBrokerThreadPoolNums = 16;
private int clientManageThreadPoolNums = 32;
private int consumerManageThreadPoolNums = 32;
Expand All @@ -73,6 +75,7 @@ public class BrokerConfig {
private boolean fetchNamesrvAddrByAddressServer = false;
private int sendThreadPoolQueueCapacity = 10000;
private int pullThreadPoolQueueCapacity = 100000;
private int queryThreadPoolQueueCapacity = 20000;
private int clientManagerThreadPoolQueueCapacity = 1000000;
private int consumerManagerThreadPoolQueueCapacity = 1000000;

Expand Down Expand Up @@ -306,6 +309,14 @@ public void setPullMessageThreadPoolNums(int pullMessageThreadPoolNums) {
this.pullMessageThreadPoolNums = pullMessageThreadPoolNums;
}

public int getQueryMessageThreadPoolNums() {
return queryMessageThreadPoolNums;
}

public void setQueryMessageThreadPoolNums(final int queryMessageThreadPoolNums) {
this.queryMessageThreadPoolNums = queryMessageThreadPoolNums;
}

public int getAdminBrokerThreadPoolNums() {
return adminBrokerThreadPoolNums;
}
Expand Down Expand Up @@ -394,6 +405,14 @@ public void setPullThreadPoolQueueCapacity(int pullThreadPoolQueueCapacity) {
this.pullThreadPoolQueueCapacity = pullThreadPoolQueueCapacity;
}

public int getQueryThreadPoolQueueCapacity() {
return queryThreadPoolQueueCapacity;
}

public void setQueryThreadPoolQueueCapacity(final int queryThreadPoolQueueCapacity) {
this.queryThreadPoolQueueCapacity = queryThreadPoolQueueCapacity;
}

public boolean isBrokerTopicEnable() {
return brokerTopicEnable;
}
Expand Down