Description
BUG REPORT
- Please describe the issue you observed:
When the request of send message resides in the sendThreadPoolQueue too long, the broker may occur "[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, code as follow":
final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
if (behind >= maxWaitTimeMillsInQueue) {
if (blockingQueue.remove(runnable)) {
rt.setStopRun(true);
rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size()));
}
}
the default value of maxWaitTimeMillsInQueue is 200ms.
We have set it`s value to 1000ms on the production environment, but, this quesiton still happens occasionally.
We use rocketmq-exporter+prometheus+grafana monitoring the value of sendThreadPoolQueueHeadWaitTimeMills, however the value always is 0(Occasionally a very high value appears).It is not science!
When I debug the broker`s source code, I found that there are two types of data in the sendThreadPoolQueue.
java.util.concurrent.CompletableFuture$UniAccept
org.apache.rocketmq.broker.latency.FutureTaskExt
If the header element of sendThreadPoolQueue is org.apache.rocketmq.broker.latency.FutureTaskExt, will computer the value of sendThreadPoolQueueHeadWaitTimeMills. Otherwise, it return 0. Look at the source code below:
// BrokerController.java
public long headSlowTimeMills(BlockingQueue<Runnable> q) {
long slowTimeMills = 0;
final Runnable peek = q.peek();
if (peek != null) {
RequestTask rt = BrokerFastFailure.castRunnable(peek);
slowTimeMills = rt == null ? 0 : this.messageStore.now() - rt.getCreateTimestamp();
}
if (slowTimeMills < 0) {
slowTimeMills = 0;
}
return slowTimeMills;
}
Look at this line of code : BrokerFastFailure.castRunnable(peek);
public static RequestTask castRunnable(final Runnable runnable) {
try {
if (runnable instanceof FutureTaskExt) {
FutureTaskExt object = (FutureTaskExt) runnable;
return (RequestTask) object.getRunnable();
}
} catch (Throwable e) {
log.error(String.format("castRunnable exception, %s", runnable.getClass().getName()), e);
}
return null;
}
The data of java.util.concurrent.CompletableFuture$UniAccept comes from(SendMessageProcessor.java):
@Override
public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception {
asyncProcessRequest(ctx, request).thenAcceptAsync(responseCallback::callback, this.brokerController.getSendMessageExecutor());
}
They share the sendThreadPoolQueue. And the header element of sendThreadPoolQueue is java.util.concurrent.CompletableFuture$UniAccept most of the time.
- Please tell us about your environment:
linux
mac os
rocketmq 4.7.1 release
- Other information (e.g. detailed explanation, logs, related issues, suggestions how to fix, etc):
How I found these info in the sendThreadPoolQueue.
Print it. Such as the code below:
// BrokerController#headSlowTimeMills(BlockingQueue<Runnable> q)
........
if (q == this.sendThreadPoolQueue) {
System.out.println("send queue foreach size: " + q.size());
q.stream().forEach(r -> {
long tmpSlowTime = 0l;
RequestTask rt = BrokerFastFailure.castRunnable(r);
System.out.println(r.getClass());
tmpSlowTime = rt == null ? -1 : this.messageStore.now() - rt.getCreateTimestamp();
System.out.println(tmpSlowTime);
});
//System.out.println("Send queue slow time mills: " + slowTimeMills);
}
.......
this is print info:
send queue foreach size: 4
class java.util.concurrent.CompletableFuture$UniAccept
-1
class org.apache.rocketmq.broker.latency.FutureTaskExt
387
class java.util.concurrent.CompletableFuture$UniAccept
-1
class org.apache.rocketmq.broker.latency.FutureTaskExt
80
And print the stack trace:
this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity()) {
@Override
public void put(Runnable runnable) throws InterruptedException {
System.out.println("queue put: " + runnable.getClass());
super.put(runnable);
}
@Override
public boolean offer(Runnable runnable) {
System.out.println("queue offer: " + runnable.getClass() + ", current thread: " + Thread.currentThread().getName() + ", thread id: " + Thread.currentThread().getId());
Throwable throwable = new Throwable();
StackTraceElement[] stackTraceElements = throwable.getStackTrace();
if (stackTraceElements != null) {
Arrays.stream(stackTraceElements).forEach(stackTraceElement -> {
System.out.println(stackTraceElement.getClassName() + "#"
+ stackTraceElement.getMethodName() + "#" + stackTraceElement.getLineNumber());
});
}
System.out.println("---------------------------end------------------------------");
return super.offer(runnable);
}
@Override
public boolean offer(Runnable runnable, long timeout, TimeUnit unit) throws InterruptedException {
System.out.println("queue timeoutoffer: " + runnable.getClass());
return super.offer(runnable, timeout, unit);
}
};
info as fllow:
queue offer: class java.util.concurrent.CompletableFuture$UniAccept, current thread: SendMessageThread_1, thread id: 81
org.apache.rocketmq.broker.BrokerController$1#offer#205
org.apache.rocketmq.broker.BrokerController$1#offer#195
java.util.concurrent.ThreadPoolExecutor#execute#1371
java.util.concurrent.CompletableFuture$UniCompletion#claim#543
java.util.concurrent.CompletableFuture#uniAccept#667
java.util.concurrent.CompletableFuture$UniAccept#tryFire$$$capture#646
java.util.concurrent.CompletableFuture$UniAccept#tryFire#-1
java.util.concurrent.CompletableFuture#uniAcceptStage#686
java.util.concurrent.CompletableFuture#thenAcceptAsync#2019
org.apache.rocketmq.broker.processor.SendMessageProcessor#asyncProcessRequest#82
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$1#run#226
org.apache.rocketmq.remoting.netty.RequestTask#run#80
java.util.concurrent.Executors$RunnableAdapter#call#511
java.util.concurrent.FutureTask#run$$$capture#266
java.util.concurrent.FutureTask#run#-1
java.util.concurrent.ThreadPoolExecutor#runWorker#1149
java.util.concurrent.ThreadPoolExecutor$Worker#run#624
java.lang.Thread#run#748
queue offer: class org.apache.rocketmq.broker.latency.FutureTaskExt, current thread: NettyServerCodecThread_5, thread id: 56
org.apache.rocketmq.broker.BrokerController$1#offer#205
org.apache.rocketmq.broker.BrokerController$1#offer#195
java.util.concurrent.ThreadPoolExecutor#execute#1371
java.util.concurrent.AbstractExecutorService#submit#112
org.apache.rocketmq.broker.BrokerController$2#submit#304
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand#256
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processMessageReceived#158
org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler#channelRead0#420
org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler#channelRead0#415