Skip to content

Commit

Permalink
MASTER [ROCKETMQ-14] invoke callback shoule be invoked in an executor…
Browse files Browse the repository at this point in the history
… rather than in current thread, closes #2
  • Loading branch information
Jaskey authored and zhouxinyu committed Dec 27, 2016
1 parent 0c022e0 commit 1356e35
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,35 +198,7 @@ public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cm
responseTable.remove(opaque);

if (responseFuture.getInvokeCallback() != null) {
boolean runInThisThread = false;
ExecutorService executor = this.getCallbackExecutor();
if (executor != null) {
try {
executor.submit(new Runnable() {
@Override
public void run() {
try {
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
PLOG.warn("execute callback in executor exception, and callback throw", e);
}
}
});
} catch (Exception e) {
runInThisThread = true;
PLOG.warn("execute callback in executor exception, maybe executor busy", e);
}
} else {
runInThisThread = true;
}

if (runInThisThread) {
try {
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
PLOG.warn("executeInvokeCallback Exception", e);
}
}
executeInvokeCallback(responseFuture);
} else {
responseFuture.putResponse(cmd);
}
Expand All @@ -236,6 +208,39 @@ public void run() {
}
}

//execute callback in callback executor. If callback executor is null, run directly in current thread
private void executeInvokeCallback(final ResponseFuture responseFuture) {
boolean runInThisThread = false;
ExecutorService executor = this.getCallbackExecutor();
if (executor != null) {
try {
executor.submit(new Runnable() {
@Override
public void run() {
try {
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
PLOG.warn("execute callback in executor exception, and callback throw", e);
}
}
});
} catch (Exception e) {
runInThisThread = true;
PLOG.warn("execute callback in executor exception, maybe executor busy", e);
}
} else {
runInThisThread = true;
}

if (runInThisThread) {
try {
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
PLOG.warn("executeInvokeCallback Exception", e);
}
}
}

public abstract RPCHook getRPCHook();

abstract public ExecutorService getCallbackExecutor();
Expand All @@ -257,7 +262,7 @@ public void scanResponseTable() {

for (ResponseFuture rf : rfList) {
try {
rf.executeInvokeCallback();
executeInvokeCallback(rf);
} catch (Throwable e) {
PLOG.warn("scanResponseTable, operationComplete Exception", e);
}
Expand Down Expand Up @@ -329,7 +334,7 @@ public void operationComplete(ChannelFuture f) throws Exception {
responseFuture.putResponse(null);
responseTable.remove(opaque);
try {
responseFuture.executeInvokeCallback();
executeInvokeCallback(responseFuture);
} catch (Throwable e) {
PLOG.warn("excute callback in writeAndFlush addListener, and callback throw", e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,15 @@
import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
import com.alibaba.rocketmq.remoting.netty.NettyClientConfig;
import com.alibaba.rocketmq.remoting.netty.NettyRemotingClient;
import com.alibaba.rocketmq.remoting.netty.ResponseFuture;
import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
import org.junit.Assert;
import org.junit.Test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;


/**
Expand All @@ -51,6 +57,52 @@ public void test_connect_timeout() throws InterruptedException, RemotingConnectE
System.out.println("-----------------------------------------------------------------");
}


@Test
public void test_async_timeout() throws InterruptedException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException {
RemotingClient client = createRemotingClient();
final AtomicInteger ai = new AtomicInteger(0);
final CountDownLatch latch = new CountDownLatch(100);
for(int i=0;i<100;i++) {
try {
RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
client.invokeAsync("localhost:8888", request, 5, new InvokeCallback() {//very easy to timeout
@Override
public void operationComplete(ResponseFuture responseFuture) {
if (responseFuture.isTimeout()) {
if(ai.getAndIncrement()==4) {
try {
System.out.println("First try timeout, blocking 10s" + Thread.currentThread().getName());
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
else{
System.out.println("Timeout callback execute,very short."+Thread.currentThread().getName());
}
}
else{
System.out.println("Success."+Thread.currentThread().getName());
}
latch.countDown();

}
});
} catch (Exception e) {
e.printStackTrace();
}
}



latch.await(1000, TimeUnit.MILLISECONDS);
Assert.assertEquals(1, latch.getCount());//only one should be blocked
client.shutdown();
System.out.println("-----------------------------------------------------------------");
}

public static RemotingClient createRemotingClient() {
NettyClientConfig config = new NettyClientConfig();
config.setClientChannelMaxIdleTimeSeconds(15);
Expand Down

0 comments on commit 1356e35

Please sign in to comment.