diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index d190e00f44f..ea3350dbb30 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -376,8 +376,10 @@ public void scanResponseTable() { while (it.hasNext()) { Entry next = it.next(); ResponseFuture rep = next.getValue(); - - if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) { + if (rep.isSyncInvoke()) { + continue; + } + if ((rep.getBeginTimestamp() + rep.getTimeoutMillis()) <= System.currentTimeMillis()) { rep.release(); it.remove(); rfList.add(rep); @@ -451,6 +453,7 @@ public void invokeAsyncImpl(final Channel channel, final RemotingCommand request } final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once); + responseFuture.setSyncInvoke(false); this.responseTable.put(opaque, responseFuture); try { channel.writeAndFlush(request).addListener(new ChannelFutureListener() { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java index 5f4c8c69502..068f864a4e5 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java @@ -38,6 +38,7 @@ public class ResponseFuture { private volatile RemotingCommand responseCommand; private volatile boolean sendRequestOK = true; private volatile Throwable cause; + private volatile boolean syncInvoke = true; public ResponseFuture(Channel channel, int opaque, long timeoutMillis, InvokeCallback invokeCallback, SemaphoreReleaseOnlyOnce once) { @@ -121,6 +122,14 @@ public Channel getProcessChannel() { return processChannel; } + public boolean isSyncInvoke() { + return syncInvoke; + } + + public void setSyncInvoke(boolean syncInvoke) { + this.syncInvoke = syncInvoke; + } + @Override public String toString() { return "ResponseFuture [responseCommand=" + responseCommand