diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index cedbbdb6ca3..8641ed2bd1c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -1361,14 +1361,13 @@ public Message request(Message msg, this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { - requestResponseFuture.setSendRequestOk(true); + executeRequestCallback(correlationId,true); } @Override public void onException(Throwable e) { - requestResponseFuture.setSendRequestOk(false); - requestResponseFuture.putResponseMessage(null); requestResponseFuture.setCause(e); + executeRequestCallback(correlationId,false); } }, timeout - cost); @@ -1391,13 +1390,13 @@ public void request(Message msg, final RequestCallback requestCallback, long tim this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { - requestResponseFuture.setSendRequestOk(true); + executeRequestCallback(correlationId, true); } @Override public void onException(Throwable e) { requestResponseFuture.setCause(e); - requestFail(correlationId); + executeRequestCallback(correlationId,false); } }, timeout - cost); } @@ -1417,14 +1416,13 @@ public Message request(final Message msg, final MessageQueueSelector selector, f this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { - requestResponseFuture.setSendRequestOk(true); + executeRequestCallback(correlationId, true); } @Override public void onException(Throwable e) { - requestResponseFuture.setSendRequestOk(false); - requestResponseFuture.putResponseMessage(null); requestResponseFuture.setCause(e); + executeRequestCallback(correlationId,false); } }, timeout - cost); @@ -1448,13 +1446,13 @@ public void request(final Message msg, final MessageQueueSelector selector, fina this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { - requestResponseFuture.setSendRequestOk(true); + executeRequestCallback(correlationId, true); } @Override public void onException(Throwable e) { requestResponseFuture.setCause(e); - requestFail(correlationId); + executeRequestCallback(correlationId,false); } }, timeout - cost); @@ -1474,14 +1472,13 @@ public Message request(final Message msg, final MessageQueue mq, final long time this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { - requestResponseFuture.setSendRequestOk(true); + executeRequestCallback(correlationId, true); } @Override public void onException(Throwable e) { - requestResponseFuture.setSendRequestOk(false); - requestResponseFuture.putResponseMessage(null); requestResponseFuture.setCause(e); + executeRequestCallback(correlationId,false); } }, null, timeout - cost); @@ -1492,16 +1489,17 @@ public void onException(Throwable e) { } private Message waitResponse(Message msg, long timeout, RequestResponseFuture requestResponseFuture, long cost) throws InterruptedException, RequestTimeoutException, MQClientException { - Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost); - if (responseMessage == null) { - if (requestResponseFuture.isSendRequestOk()) { + boolean responseSuccess = requestResponseFuture.waitResponseMessage(timeout - cost); + if (responseSuccess&&requestResponseFuture.isSendRequestOk()) { + return msg; + }else { + if (!responseSuccess) { throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION, - "send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms."); + "send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms."); } else { throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause()); } } - return responseMessage; } public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout) @@ -1517,22 +1515,21 @@ public void request(final Message msg, final MessageQueue mq, final RequestCallb this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { - requestResponseFuture.setSendRequestOk(true); + executeRequestCallback(correlationId, true); } @Override public void onException(Throwable e) { requestResponseFuture.setCause(e); - requestFail(correlationId); + executeRequestCallback(correlationId,false); } }, null, timeout - cost); } - private void requestFail(final String correlationId) { + private void executeRequestCallback(final String correlationId,boolean isSuccess) { RequestResponseFuture responseFuture = RequestFutureHolder.getInstance().getRequestFutureTable().remove(correlationId); if (responseFuture != null) { - responseFuture.setSendRequestOk(false); - responseFuture.putResponseMessage(null); + responseFuture.setSendRequestOk(isSuccess); try { responseFuture.executeRequestCallback(); } catch (Exception e) { diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/RequestResponseFuture.java b/client/src/main/java/org/apache/rocketmq/client/producer/RequestResponseFuture.java index e66c08fdc53..f03c8c9e1c5 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/RequestResponseFuture.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/RequestResponseFuture.java @@ -29,7 +29,7 @@ public class RequestResponseFuture { private long timeoutMillis; private CountDownLatch countDownLatch = new CountDownLatch(1); private volatile Message responseMsg = null; - private volatile boolean sendRequestOk = true; + private volatile boolean sendRequestOk = false; private volatile Throwable cause = null; public RequestResponseFuture(String correlationId, long timeoutMillis, RequestCallback requestCallback) { @@ -53,9 +53,8 @@ public boolean isTimeout() { return diff > this.timeoutMillis; } - public Message waitResponseMessage(final long timeout) throws InterruptedException { - this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS); - return this.responseMsg; + public boolean waitResponseMessage(final long timeout) throws InterruptedException { + return this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS); } public void putResponseMessage(final Message responseMsg) { @@ -105,6 +104,7 @@ public boolean isSendRequestOk() { public void setSendRequestOk(boolean sendRequestOk) { this.sendRequestOk = sendRequestOk; + this.countDownLatch.countDown(); } public Message getRequestMsg() {