Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public void testSendMessageAsync_Success() throws RemotingException, Interrupted
public Object answer(InvocationOnMock mock) throws Throwable {
InvokeCallback callback = mock.getArgument(3);
RemotingCommand request = mock.getArgument(1);
ResponseFuture responseFuture = new ResponseFuture(request.getOpaque(), 3 * 1000, null, null);
ResponseFuture responseFuture = new ResponseFuture(null,request.getOpaque(), 3 * 1000, null, null);
responseFuture.setResponseCommand(createSuccessResponse(request));
callback.operationComplete(responseFuture);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingComma
final int opaque = request.getOpaque();

try {
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
Expand Down Expand Up @@ -400,8 +400,7 @@ public void invokeAsyncImpl(final Channel channel, final RemotingCommand request
boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);

final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, once);
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, invokeCallback, once);
this.responseTable.put(opaque, responseFuture);
try {
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
Expand All @@ -410,20 +409,8 @@ public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
}

responseFuture.putResponse(null);
responseTable.remove(opaque);
try {
executeInvokeCallback(responseFuture);
} catch (Throwable e) {
log.warn("excute callback in writeAndFlush addListener, and callback throw", e);
} finally {
responseFuture.release();
}

requestFail(opaque);
log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
}
});
Expand All @@ -448,6 +435,38 @@ public void operationComplete(ChannelFuture f) throws Exception {
}
}

private void requestFail(final int opaque) {
ResponseFuture responseFuture = responseTable.remove(opaque);
if (responseFuture != null) {
responseFuture.setSendRequestOK(false);
responseFuture.putResponse(null);
try {
executeInvokeCallback(responseFuture);
} catch (Throwable e) {
log.warn("execute callback in requestFail, and callback throw", e);
} finally {
responseFuture.release();
}
}
}

/**
* mark the request of the specified channel as fail and to invoke fail callback immediately
* @param channel the channel which is close already
*/
protected void failFast(final Channel channel) {
Iterator<Entry<Integer, ResponseFuture>> it = responseTable.entrySet().iterator();
while (it.hasNext()) {
Entry<Integer, ResponseFuture> entry = it.next();
if (entry.getValue().getProcessChannel() == channel) {
Integer opaque = entry.getKey();
if (opaque != null) {
requestFail(opaque);
}
}
}
}

public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
request.markOnewayRPC();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exce
log.info("NETTY CLIENT PIPELINE: CLOSE {}", remoteAddress);
closeChannel(ctx.channel());
super.close(ctx, promise);

NettyRemotingClient.this.failFast(ctx.channel());
if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.remoting.netty;

import io.netty.channel.Channel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -25,6 +26,7 @@

public class ResponseFuture {
private final int opaque;
private final Channel processChannel;
private final long timeoutMillis;
private final InvokeCallback invokeCallback;
private final long beginTimestamp = System.currentTimeMillis();
Expand All @@ -37,9 +39,10 @@ public class ResponseFuture {
private volatile boolean sendRequestOK = true;
private volatile Throwable cause;

public ResponseFuture(int opaque, long timeoutMillis, InvokeCallback invokeCallback,
public ResponseFuture(Channel channel, int opaque, long timeoutMillis, InvokeCallback invokeCallback,
SemaphoreReleaseOnlyOnce once) {
this.opaque = opaque;
this.processChannel = channel;
this.timeoutMillis = timeoutMillis;
this.invokeCallback = invokeCallback;
this.once = once;
Expand Down Expand Up @@ -114,11 +117,20 @@ public int getOpaque() {
return opaque;
}

public Channel getProcessChannel() {
return processChannel;
}

@Override
public String toString() {
return "ResponseFuture [responseCommand=" + responseCommand + ", sendRequestOK=" + sendRequestOK
+ ", cause=" + cause + ", opaque=" + opaque + ", timeoutMillis=" + timeoutMillis
+ ", invokeCallback=" + invokeCallback + ", beginTimestamp=" + beginTimestamp
return "ResponseFuture [responseCommand=" + responseCommand
+ ", sendRequestOK=" + sendRequestOK
+ ", cause=" + cause
+ ", opaque=" + opaque
+ ", processChannel=" + processChannel
+ ", timeoutMillis=" + timeoutMillis
+ ", invokeCallback=" + invokeCallback
+ ", beginTimestamp=" + beginTimestamp
+ ", countDownLatch=" + countDownLatch + "]";
}
}