From 80f4e6b24daf0cf3e7b7cdc75298665adbea4d4c Mon Sep 17 00:00:00 2001 From: Jaskey Date: Wed, 19 Apr 2017 17:13:09 +0800 Subject: [PATCH] invoke callback at once when channel is close --- .../client/impl/MQClientAPIImplTest.java | 2 +- .../remoting/netty/NettyRemotingAbstract.java | 51 +++++++++++++------ .../remoting/netty/NettyRemotingClient.java | 2 +- .../remoting/netty/ResponseFuture.java | 20 ++++++-- 4 files changed, 53 insertions(+), 22 deletions(-) diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java index bf019618d9c..c13e75c206c 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java @@ -162,7 +162,7 @@ public void testSendMessageAsync_Success() throws RemotingException, Interrupted public Object answer(InvocationOnMock mock) throws Throwable { InvokeCallback callback = mock.getArgument(3); RemotingCommand request = mock.getArgument(1); - ResponseFuture responseFuture = new ResponseFuture(request.getOpaque(), 3 * 1000, null, null); + ResponseFuture responseFuture = new ResponseFuture(null,request.getOpaque(), 3 * 1000, null, null); responseFuture.setResponseCommand(createSuccessResponse(request)); callback.operationComplete(responseFuture); return null; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index 06918086ff1..664c5fd8651 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -357,7 +357,7 @@ public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingComma final int opaque = request.getOpaque(); try { - final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null); + final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null); this.responseTable.put(opaque, responseFuture); final SocketAddress addr = channel.remoteAddress(); channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @@ -400,8 +400,7 @@ public void invokeAsyncImpl(final Channel channel, final RemotingCommand request boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if (acquired) { final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync); - - final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, once); + final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, invokeCallback, once); this.responseTable.put(opaque, responseFuture); try { channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @@ -410,20 +409,8 @@ public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; - } else { - responseFuture.setSendRequestOK(false); - } - - responseFuture.putResponse(null); - responseTable.remove(opaque); - try { - executeInvokeCallback(responseFuture); - } catch (Throwable e) { - log.warn("excute callback in writeAndFlush addListener, and callback throw", e); - } finally { - responseFuture.release(); } - + requestFail(opaque); log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel)); } }); @@ -448,6 +435,38 @@ public void operationComplete(ChannelFuture f) throws Exception { } } + private void requestFail(final int opaque) { + ResponseFuture responseFuture = responseTable.remove(opaque); + if (responseFuture != null) { + responseFuture.setSendRequestOK(false); + responseFuture.putResponse(null); + try { + executeInvokeCallback(responseFuture); + } catch (Throwable e) { + log.warn("execute callback in requestFail, and callback throw", e); + } finally { + responseFuture.release(); + } + } + } + + /** + * mark the request of the specified channel as fail and to invoke fail callback immediately + * @param channel the channel which is close already + */ + protected void failFast(final Channel channel) { + Iterator> it = responseTable.entrySet().iterator(); + while (it.hasNext()) { + Entry entry = it.next(); + if (entry.getValue().getProcessChannel() == channel) { + Integer opaque = entry.getKey(); + if (opaque != null) { + requestFail(opaque); + } + } + } + } + public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { request.markOnewayRPC(); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index a96423c1fce..d08bdd86dfc 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -650,7 +650,7 @@ public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exce log.info("NETTY CLIENT PIPELINE: CLOSE {}", remoteAddress); closeChannel(ctx.channel()); super.close(ctx, promise); - + NettyRemotingClient.this.failFast(ctx.channel()); if (NettyRemotingClient.this.channelEventListener != null) { NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel())); } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java index 1157c450283..5f4c8c69502 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.remoting.netty; +import io.netty.channel.Channel; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -25,6 +26,7 @@ public class ResponseFuture { private final int opaque; + private final Channel processChannel; private final long timeoutMillis; private final InvokeCallback invokeCallback; private final long beginTimestamp = System.currentTimeMillis(); @@ -37,9 +39,10 @@ public class ResponseFuture { private volatile boolean sendRequestOK = true; private volatile Throwable cause; - public ResponseFuture(int opaque, long timeoutMillis, InvokeCallback invokeCallback, + public ResponseFuture(Channel channel, int opaque, long timeoutMillis, InvokeCallback invokeCallback, SemaphoreReleaseOnlyOnce once) { this.opaque = opaque; + this.processChannel = channel; this.timeoutMillis = timeoutMillis; this.invokeCallback = invokeCallback; this.once = once; @@ -114,11 +117,20 @@ public int getOpaque() { return opaque; } + public Channel getProcessChannel() { + return processChannel; + } + @Override public String toString() { - return "ResponseFuture [responseCommand=" + responseCommand + ", sendRequestOK=" + sendRequestOK - + ", cause=" + cause + ", opaque=" + opaque + ", timeoutMillis=" + timeoutMillis - + ", invokeCallback=" + invokeCallback + ", beginTimestamp=" + beginTimestamp + return "ResponseFuture [responseCommand=" + responseCommand + + ", sendRequestOK=" + sendRequestOK + + ", cause=" + cause + + ", opaque=" + opaque + + ", processChannel=" + processChannel + + ", timeoutMillis=" + timeoutMillis + + ", invokeCallback=" + invokeCallback + + ", beginTimestamp=" + beginTimestamp + ", countDownLatch=" + countDownLatch + "]"; } }