From 9c9c714d357a4f30320f7a83846f1e5246ffc105 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A4=A7=E8=81=AA=E6=98=8E?= Date: Tue, 28 Dec 2021 22:20:00 +0800 Subject: [PATCH 1/2] =?UTF-8?q?issues#3676:=E8=A7=A3=E5=86=B3request()?= =?UTF-8?q?=E5=8F=91=E9=80=81=E5=90=8E=E7=9A=84=E9=92=A9=E5=AD=90=E5=A4=84?= =?UTF-8?q?=E7=90=86=201.=E4=BD=BF=E7=94=A8CountDownLatch.await()=E6=97=B6?= =?UTF-8?q?=EF=BC=8C=E5=8F=AA=E6=9C=89onException()=E4=BC=9AcountDown(),?= =?UTF-8?q?=E5=AF=BC=E8=87=B4=E5=8F=91=E9=80=81=E6=88=90=E5=8A=9F=E5=90=8E?= =?UTF-8?q?=E8=BF=98=E6=98=AF=E4=BC=9A=E8=B6=85=E6=97=B6=E5=BC=82=E5=B8=B8?= =?UTF-8?q?=202.=E8=A7=A3=E5=86=B3=E4=BC=A0=E5=85=A5=E7=9A=84RequestCallBa?= =?UTF-8?q?ck=E7=9A=84onSuccess()=E6=96=B9=E6=B3=95=E5=9C=A8=E5=8F=91?= =?UTF-8?q?=E9=80=81=E6=88=90=E5=8A=9F=E5=90=8E=E6=B2=A1=E6=9C=89=E8=B0=83?= =?UTF-8?q?=E7=94=A8=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../impl/producer/DefaultMQProducerImpl.java | 42 +++++++++---------- .../producer/RequestResponseFuture.java | 8 ++-- 2 files changed, 24 insertions(+), 26 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index cedbbdb6ca3..35f08b6147f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -1361,14 +1361,13 @@ public Message request(Message msg, this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { - requestResponseFuture.setSendRequestOk(true); + executeRequestCallback(correlationId,true); } @Override public void onException(Throwable e) { - requestResponseFuture.setSendRequestOk(false); - requestResponseFuture.putResponseMessage(null); requestResponseFuture.setCause(e); + executeRequestCallback(correlationId,false); } }, timeout - cost); @@ -1391,13 +1390,13 @@ public void request(Message msg, final RequestCallback requestCallback, long tim this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { - requestResponseFuture.setSendRequestOk(true); + executeRequestCallback(correlationId, true); } @Override public void onException(Throwable e) { requestResponseFuture.setCause(e); - requestFail(correlationId); + executeRequestCallback(correlationId,false); } }, timeout - cost); } @@ -1417,14 +1416,13 @@ public Message request(final Message msg, final MessageQueueSelector selector, f this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { - requestResponseFuture.setSendRequestOk(true); + executeRequestCallback(correlationId, true); } @Override public void onException(Throwable e) { - requestResponseFuture.setSendRequestOk(false); - requestResponseFuture.putResponseMessage(null); requestResponseFuture.setCause(e); + executeRequestCallback(correlationId,false); } }, timeout - cost); @@ -1448,13 +1446,13 @@ public void request(final Message msg, final MessageQueueSelector selector, fina this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { - requestResponseFuture.setSendRequestOk(true); + executeRequestCallback(correlationId, true); } @Override public void onException(Throwable e) { requestResponseFuture.setCause(e); - requestFail(correlationId); + executeRequestCallback(correlationId,false); } }, timeout - cost); @@ -1474,14 +1472,13 @@ public Message request(final Message msg, final MessageQueue mq, final long time this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { - requestResponseFuture.setSendRequestOk(true); + executeRequestCallback(correlationId, true); } @Override public void onException(Throwable e) { - requestResponseFuture.setSendRequestOk(false); - requestResponseFuture.putResponseMessage(null); requestResponseFuture.setCause(e); + executeRequestCallback(correlationId,false); } }, null, timeout - cost); @@ -1492,16 +1489,17 @@ public void onException(Throwable e) { } private Message waitResponse(Message msg, long timeout, RequestResponseFuture requestResponseFuture, long cost) throws InterruptedException, RequestTimeoutException, MQClientException { - Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost); - if (responseMessage == null) { - if (requestResponseFuture.isSendRequestOk()) { + boolean responseSuccess = requestResponseFuture.waitResponseMessage(timeout - cost); + if (responseSuccess&&requestResponseFuture.isSendRequestOk()) { + return msg; + }else { + if (!responseSuccess) { throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION, - "send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms."); + "send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms."); } else { throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause()); } } - return responseMessage; } public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout) @@ -1517,21 +1515,21 @@ public void request(final Message msg, final MessageQueue mq, final RequestCallb this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { - requestResponseFuture.setSendRequestOk(true); + executeRequestCallback(correlationId, true); } @Override public void onException(Throwable e) { requestResponseFuture.setCause(e); - requestFail(correlationId); + executeRequestCallback(correlationId,false); } }, null, timeout - cost); } - private void requestFail(final String correlationId) { + private void executeRequestCallback(final String correlationId,boolean isSuccess) { RequestResponseFuture responseFuture = RequestFutureHolder.getInstance().getRequestFutureTable().remove(correlationId); if (responseFuture != null) { - responseFuture.setSendRequestOk(false); + responseFuture.setSendRequestOk(isSuccess); responseFuture.putResponseMessage(null); try { responseFuture.executeRequestCallback(); diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/RequestResponseFuture.java b/client/src/main/java/org/apache/rocketmq/client/producer/RequestResponseFuture.java index e66c08fdc53..f03c8c9e1c5 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/RequestResponseFuture.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/RequestResponseFuture.java @@ -29,7 +29,7 @@ public class RequestResponseFuture { private long timeoutMillis; private CountDownLatch countDownLatch = new CountDownLatch(1); private volatile Message responseMsg = null; - private volatile boolean sendRequestOk = true; + private volatile boolean sendRequestOk = false; private volatile Throwable cause = null; public RequestResponseFuture(String correlationId, long timeoutMillis, RequestCallback requestCallback) { @@ -53,9 +53,8 @@ public boolean isTimeout() { return diff > this.timeoutMillis; } - public Message waitResponseMessage(final long timeout) throws InterruptedException { - this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS); - return this.responseMsg; + public boolean waitResponseMessage(final long timeout) throws InterruptedException { + return this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS); } public void putResponseMessage(final Message responseMsg) { @@ -105,6 +104,7 @@ public boolean isSendRequestOk() { public void setSendRequestOk(boolean sendRequestOk) { this.sendRequestOk = sendRequestOk; + this.countDownLatch.countDown(); } public Message getRequestMsg() { From 75a9c1a95aba4696830fa654e60b85edfa35573e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A4=A7=E8=81=AA=E6=98=8E?= Date: Tue, 28 Dec 2021 22:55:20 +0800 Subject: [PATCH 2/2] =?UTF-8?q?issues#3676:=E8=A7=A3=E5=86=B3request()?= =?UTF-8?q?=E5=8F=91=E9=80=81=E5=90=8E=E7=9A=84=E9=92=A9=E5=AD=90=E5=A4=84?= =?UTF-8?q?=E7=90=86=201.=E4=BD=BF=E7=94=A8CountDownLatch.await()=E6=97=B6?= =?UTF-8?q?=EF=BC=8C=E5=8F=AA=E6=9C=89onException()=E4=BC=9AcountDown(),?= =?UTF-8?q?=E5=AF=BC=E8=87=B4=E5=8F=91=E9=80=81=E6=88=90=E5=8A=9F=E5=90=8E?= =?UTF-8?q?=E8=BF=98=E6=98=AF=E4=BC=9A=E8=B6=85=E6=97=B6=E5=BC=82=E5=B8=B8?= =?UTF-8?q?=202.=E8=A7=A3=E5=86=B3=E4=BC=A0=E5=85=A5=E7=9A=84RequestCallBa?= =?UTF-8?q?ck=E7=9A=84onSuccess()=E6=96=B9=E6=B3=95=E5=9C=A8=E5=8F=91?= =?UTF-8?q?=E9=80=81=E6=88=90=E5=8A=9F=E5=90=8E=E6=B2=A1=E6=9C=89=E8=B0=83?= =?UTF-8?q?=E7=94=A8=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rocketmq/client/impl/producer/DefaultMQProducerImpl.java | 1 - 1 file changed, 1 deletion(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 35f08b6147f..8641ed2bd1c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -1530,7 +1530,6 @@ private void executeRequestCallback(final String correlationId,boolean isSuccess RequestResponseFuture responseFuture = RequestFutureHolder.getInstance().getRequestFutureTable().remove(correlationId); if (responseFuture != null) { responseFuture.setSendRequestOk(isSuccess); - responseFuture.putResponseMessage(null); try { responseFuture.executeRequestCallback(); } catch (Exception e) {