From f966ee258125cef108a53ef53c5bca60a997b1ac Mon Sep 17 00:00:00 2001 From: liyiwen <915071628@qq.com> Date: Sun, 1 Dec 2019 16:17:29 +0800 Subject: [PATCH] fix async invoke timeout is not accurate --- .../rocketmq/remoting/netty/NettyRemotingAbstract.java | 7 +++++-- .../apache/rocketmq/remoting/netty/ResponseFuture.java | 9 +++++++++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index d190e00f44f..ea3350dbb30 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -376,8 +376,10 @@ public void scanResponseTable() { while (it.hasNext()) { Entry next = it.next(); ResponseFuture rep = next.getValue(); - - if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) { + if (rep.isSyncInvoke()) { + continue; + } + if ((rep.getBeginTimestamp() + rep.getTimeoutMillis()) <= System.currentTimeMillis()) { rep.release(); it.remove(); rfList.add(rep); @@ -451,6 +453,7 @@ public void invokeAsyncImpl(final Channel channel, final RemotingCommand request } final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once); + responseFuture.setSyncInvoke(false); this.responseTable.put(opaque, responseFuture); try { channel.writeAndFlush(request).addListener(new ChannelFutureListener() { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java index 5f4c8c69502..068f864a4e5 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java @@ -38,6 +38,7 @@ public class ResponseFuture { private volatile RemotingCommand responseCommand; private volatile boolean sendRequestOK = true; private volatile Throwable cause; + private volatile boolean syncInvoke = true; public ResponseFuture(Channel channel, int opaque, long timeoutMillis, InvokeCallback invokeCallback, SemaphoreReleaseOnlyOnce once) { @@ -121,6 +122,14 @@ public Channel getProcessChannel() { return processChannel; } + public boolean isSyncInvoke() { + return syncInvoke; + } + + public void setSyncInvoke(boolean syncInvoke) { + this.syncInvoke = syncInvoke; + } + @Override public String toString() { return "ResponseFuture [responseCommand=" + responseCommand