Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1361,14 +1361,13 @@ public Message request(Message msg,
this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
requestResponseFuture.setSendRequestOk(true);
executeRequestCallback(correlationId,true);
}

@Override
public void onException(Throwable e) {
requestResponseFuture.setSendRequestOk(false);
requestResponseFuture.putResponseMessage(null);
requestResponseFuture.setCause(e);
executeRequestCallback(correlationId,false);
}
}, timeout - cost);

Expand All @@ -1391,13 +1390,13 @@ public void request(Message msg, final RequestCallback requestCallback, long tim
this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
requestResponseFuture.setSendRequestOk(true);
executeRequestCallback(correlationId, true);
}

@Override
public void onException(Throwable e) {
requestResponseFuture.setCause(e);
requestFail(correlationId);
executeRequestCallback(correlationId,false);
}
}, timeout - cost);
}
Expand All @@ -1417,14 +1416,13 @@ public Message request(final Message msg, final MessageQueueSelector selector, f
this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
requestResponseFuture.setSendRequestOk(true);
executeRequestCallback(correlationId, true);
}

@Override
public void onException(Throwable e) {
requestResponseFuture.setSendRequestOk(false);
requestResponseFuture.putResponseMessage(null);
requestResponseFuture.setCause(e);
executeRequestCallback(correlationId,false);
}
}, timeout - cost);

Expand All @@ -1448,13 +1446,13 @@ public void request(final Message msg, final MessageQueueSelector selector, fina
this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
requestResponseFuture.setSendRequestOk(true);
executeRequestCallback(correlationId, true);
}

@Override
public void onException(Throwable e) {
requestResponseFuture.setCause(e);
requestFail(correlationId);
executeRequestCallback(correlationId,false);
}
}, timeout - cost);

Expand All @@ -1474,14 +1472,13 @@ public Message request(final Message msg, final MessageQueue mq, final long time
this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
requestResponseFuture.setSendRequestOk(true);
executeRequestCallback(correlationId, true);
}

@Override
public void onException(Throwable e) {
requestResponseFuture.setSendRequestOk(false);
requestResponseFuture.putResponseMessage(null);
requestResponseFuture.setCause(e);
executeRequestCallback(correlationId,false);
}
}, null, timeout - cost);

Expand All @@ -1492,16 +1489,17 @@ public void onException(Throwable e) {
}

private Message waitResponse(Message msg, long timeout, RequestResponseFuture requestResponseFuture, long cost) throws InterruptedException, RequestTimeoutException, MQClientException {
Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost);
if (responseMessage == null) {
if (requestResponseFuture.isSendRequestOk()) {
boolean responseSuccess = requestResponseFuture.waitResponseMessage(timeout - cost);
if (responseSuccess&&requestResponseFuture.isSendRequestOk()) {
return msg;
}else {
if (!responseSuccess) {
throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION,
"send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms.");
"send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms.");
} else {
throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause());
}
}
return responseMessage;
}

public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)
Expand All @@ -1517,22 +1515,21 @@ public void request(final Message msg, final MessageQueue mq, final RequestCallb
this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
requestResponseFuture.setSendRequestOk(true);
executeRequestCallback(correlationId, true);
}

@Override
public void onException(Throwable e) {
requestResponseFuture.setCause(e);
requestFail(correlationId);
executeRequestCallback(correlationId,false);
}
}, null, timeout - cost);
}

private void requestFail(final String correlationId) {
private void executeRequestCallback(final String correlationId,boolean isSuccess) {
RequestResponseFuture responseFuture = RequestFutureHolder.getInstance().getRequestFutureTable().remove(correlationId);
if (responseFuture != null) {
responseFuture.setSendRequestOk(false);
responseFuture.putResponseMessage(null);
responseFuture.setSendRequestOk(isSuccess);
try {
responseFuture.executeRequestCallback();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class RequestResponseFuture {
private long timeoutMillis;
private CountDownLatch countDownLatch = new CountDownLatch(1);
private volatile Message responseMsg = null;
private volatile boolean sendRequestOk = true;
private volatile boolean sendRequestOk = false;
private volatile Throwable cause = null;

public RequestResponseFuture(String correlationId, long timeoutMillis, RequestCallback requestCallback) {
Expand All @@ -53,9 +53,8 @@ public boolean isTimeout() {
return diff > this.timeoutMillis;
}

public Message waitResponseMessage(final long timeout) throws InterruptedException {
this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
return this.responseMsg;
public boolean waitResponseMessage(final long timeout) throws InterruptedException {
return this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
}

public void putResponseMessage(final Message responseMsg) {
Expand Down Expand Up @@ -105,6 +104,7 @@ public boolean isSendRequestOk() {

public void setSendRequestOk(boolean sendRequestOk) {
this.sendRequestOk = sendRequestOk;
this.countDownLatch.countDown();
}

public Message getRequestMsg() {
Expand Down