From 14969aa389f1c87d362504a3899c19ed37a3ee51 Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Tue, 24 Mar 2020 12:08:34 +0800 Subject: [PATCH] feat(broker) prevent blocked by group transfer service --- .../java/org/apache/rocketmq/broker/BrokerController.java | 5 +++-- .../rocketmq/broker/processor/SendMessageProcessor.java | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 85009d620f57..194f2850fa57 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -1240,6 +1240,7 @@ private void shutdownProcessorByHa() { } } - - + public ExecutorService getSendMessageExecutor() { + return sendMessageExecutor; + } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index 801d886c43be..4dc311db913b 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -79,7 +79,7 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, @Override public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception { - asyncProcessRequest(ctx, request).thenAccept(responseCallback::callback); + asyncProcessRequest(ctx, request).thenAcceptAsync(responseCallback::callback, this.brokerController.getSendMessageExecutor()); } public CompletableFuture asyncProcessRequest(ChannelHandlerContext ctx,