From fcdf17db5f90aee219c6d7e76dd18dcc80eeeb35 Mon Sep 17 00:00:00 2001 From: lindzh Date: Tue, 16 Jan 2018 16:32:49 +0800 Subject: [PATCH 1/4] make client asyncSend fully async send --- .../impl/producer/DefaultMQProducerImpl.java | 107 +++++++++++------- .../client/producer/DefaultMQProducer.java | 18 +-- .../rocketmq/client/producer/MQProducer.java | 18 +-- .../producer/DefaultMQProducerTest.java | 52 +++++++++ .../rocketmq/remoting/RemotingClient.java | 2 + 5 files changed, 132 insertions(+), 65 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 7c169796741..8dad24215c4 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -31,6 +31,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; + import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageExt; @@ -249,7 +250,7 @@ public TransactionCheckListener checkListener() { @Override public void checkTransactionState(final String addr, final MessageExt msg, - final CheckTransactionStateRequestHeader header) { + final CheckTransactionStateRequestHeader header) { Runnable request = new Runnable() { private final String brokerAddr = addr; private final MessageExt message = msg; @@ -409,17 +410,32 @@ public MessageExt queryMessageByUniqKey(String topic, String uniqKey) /** * DEFAULT ASYNC ------------------------------------------------------- */ - public void send(Message msg, - SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { + public void send(Message msg, SendCallback sendCallback) { send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); } - public void send(Message msg, SendCallback sendCallback, long timeout) - throws MQClientException, RemotingException, InterruptedException { - try { - this.sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout); - } catch (MQBrokerException e) { - throw new MQClientException("unknownn exception", e); + public void send(final Message msg, final SendCallback sendCallback, final long timeout) { + this.getCallbackExecutor().submit(new Runnable() { + @Override + public void run() { + try { + sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout); + } catch (Exception e) { + handleCallbackException(e, sendCallback); + } + } + }); + } + + private void handleCallbackException(Exception e, SendCallback sendCallback) { + if (sendCallback != null) { + if (e instanceof MQBrokerException) { + sendCallback.onException(new MQClientException("unknown exception", e)); + } else { + sendCallback.onException(e); + } + } else { + log.warn("asyncSend message callback null real exception is " + e.getMessage(), e); } } @@ -583,11 +599,11 @@ private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { } private SendResult sendKernelImpl(final Message msg, - final MessageQueue mq, - final CommunicationMode communicationMode, - final SendCallback sendCallback, - final TopicPublishInfo topicPublishInfo, - final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + final MessageQueue mq, + final CommunicationMode communicationMode, + final SendCallback sendCallback, + final TopicPublishInfo topicPublishInfo, + final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); if (null == brokerAddr) { tryToFindTopicPublishInfo(mq.getTopic()); @@ -842,32 +858,34 @@ public SendResult send(Message msg, MessageQueue mq, long timeout) /** * KERNEL ASYNC ------------------------------------------------------- */ - public void send(Message msg, MessageQueue mq, SendCallback sendCallback) - throws MQClientException, RemotingException, InterruptedException { + public void send(Message msg, MessageQueue mq, SendCallback sendCallback) { send(msg, mq, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); } - public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout) - throws MQClientException, RemotingException, InterruptedException { - this.makeSureStateOK(); - Validators.checkMessage(msg, this.defaultMQProducer); - - if (!msg.getTopic().equals(mq.getTopic())) { - throw new MQClientException("message's topic not equal mq's topic", null); - } + public void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, final long timeout) { + this.getCallbackExecutor().submit(new Runnable() { + @Override + public void run() { + try { + makeSureStateOK(); + Validators.checkMessage(msg, defaultMQProducer); - try { - this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, sendCallback, null, timeout); - } catch (MQBrokerException e) { - throw new MQClientException("unknown exception", e); - } + if (!msg.getTopic().equals(mq.getTopic())) { + throw new MQClientException("message's topic not equal mq's topic", null); + } + sendKernelImpl(msg, mq, CommunicationMode.ASYNC, sendCallback, null, timeout); + } catch (Exception e) { + handleCallbackException(e, sendCallback); + } + } + }); } /** * KERNEL ONEWAY ------------------------------------------------------- */ public void sendOneway(Message msg, - MessageQueue mq) throws MQClientException, RemotingException, InterruptedException { + MessageQueue mq) throws MQClientException, RemotingException, InterruptedException { this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); @@ -923,18 +941,21 @@ private SendResult sendSelectImpl( /** * SELECT ASYNC ------------------------------------------------------- */ - public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback) - throws MQClientException, RemotingException, InterruptedException { + public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback) { send(msg, selector, arg, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); } - public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout) - throws MQClientException, RemotingException, InterruptedException { - try { - this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback, timeout); - } catch (MQBrokerException e) { - throw new MQClientException("unknownn exception", e); - } + public void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback, final long timeout) { + this.getCallbackExecutor().submit(new Runnable() { + @Override + public void run() { + try { + sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback, timeout); + } catch (Exception e) { + handleCallbackException(e, sendCallback); + } + } + }); } /** @@ -950,7 +971,7 @@ public void sendOneway(Message msg, MessageQueueSelector selector, Object arg) } public TransactionSendResult sendMessageInTransaction(final Message msg, - final LocalTransactionExecuter tranExecuter, final Object arg) + final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException { if (null == tranExecuter) { throw new MQClientException("tranExecutor is null", null); @@ -1064,8 +1085,12 @@ public void setCallbackExecutor(final ExecutorService callbackExecutor) { this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().setCallbackExecutor(callbackExecutor); } + private ExecutorService getCallbackExecutor() { + return this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().getCallbackExecutor(); + } + public SendResult send(Message msg, - long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout); } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index a2f25dd0f8f..b91c1b994f5 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -250,8 +250,7 @@ public SendResult send(Message msg, * @throws InterruptedException if the sending thread is interrupted. */ @Override - public void send(Message msg, - SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { + public void send(Message msg, SendCallback sendCallback) { this.defaultMQProducerImpl.send(msg, sendCallback); } @@ -266,8 +265,7 @@ public void send(Message msg, * @throws InterruptedException if the sending thread is interrupted. */ @Override - public void send(Message msg, SendCallback sendCallback, long timeout) - throws MQClientException, RemotingException, InterruptedException { + public void send(Message msg, SendCallback sendCallback, long timeout) { this.defaultMQProducerImpl.send(msg, sendCallback, timeout); } @@ -333,8 +331,7 @@ public SendResult send(Message msg, MessageQueue mq, long timeout) * @throws InterruptedException if the sending thread is interrupted. */ @Override - public void send(Message msg, MessageQueue mq, SendCallback sendCallback) - throws MQClientException, RemotingException, InterruptedException { + public void send(Message msg, MessageQueue mq, SendCallback sendCallback) { this.defaultMQProducerImpl.send(msg, mq, sendCallback); } @@ -350,8 +347,7 @@ public void send(Message msg, MessageQueue mq, SendCallback sendCallback) * @throws InterruptedException if the sending thread is interrupted. */ @Override - public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout) - throws MQClientException, RemotingException, InterruptedException { + public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout) { this.defaultMQProducerImpl.send(msg, mq, sendCallback, timeout); } @@ -421,8 +417,7 @@ public SendResult send(Message msg, MessageQueueSelector selector, Object arg, l * @throws InterruptedException if the sending thread is interrupted. */ @Override - public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback) - throws MQClientException, RemotingException, InterruptedException { + public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback) { this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback); } @@ -439,8 +434,7 @@ public void send(Message msg, MessageQueueSelector selector, Object arg, SendCal * @throws InterruptedException if the sending thread is interrupted. */ @Override - public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout) - throws MQClientException, RemotingException, InterruptedException { + public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout) { this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout); } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java index 14caf6ffac9..46b8c2c660d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java @@ -38,11 +38,9 @@ SendResult send(final Message msg) throws MQClientException, RemotingException, SendResult send(final Message msg, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; - void send(final Message msg, final SendCallback sendCallback) throws MQClientException, - RemotingException, InterruptedException; + void send(final Message msg, final SendCallback sendCallback); - void send(final Message msg, final SendCallback sendCallback, final long timeout) - throws MQClientException, RemotingException, InterruptedException; + void send(final Message msg, final SendCallback sendCallback, final long timeout); void sendOneway(final Message msg) throws MQClientException, RemotingException, InterruptedException; @@ -53,11 +51,9 @@ SendResult send(final Message msg, final MessageQueue mq) throws MQClientExcepti SendResult send(final Message msg, final MessageQueue mq, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; - void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback) - throws MQClientException, RemotingException, InterruptedException; + void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback); - void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, long timeout) - throws MQClientException, RemotingException, InterruptedException; + void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, long timeout); void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException, RemotingException, InterruptedException; @@ -70,12 +66,10 @@ SendResult send(final Message msg, final MessageQueueSelector selector, final Ob InterruptedException; void send(final Message msg, final MessageQueueSelector selector, final Object arg, - final SendCallback sendCallback) throws MQClientException, RemotingException, - InterruptedException; + final SendCallback sendCallback); void send(final Message msg, final MessageQueueSelector selector, final Object arg, - final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, - InterruptedException; + final SendCallback sendCallback, final long timeout); void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg) throws MQClientException, RemotingException, InterruptedException; diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java index ded22ada914..465b21a587d 100644 --- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java @@ -24,6 +24,9 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -36,6 +39,7 @@ import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.QueueData; @@ -214,6 +218,54 @@ public void testSetCallbackExecutor() throws MQClientException { assertThat(remotingClient.getCallbackExecutor()).isEqualTo(customized); } + @Test + public void testAsyncSend() throws MQClientException, RemotingException, InterruptedException { + String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis(); + producer = new DefaultMQProducer(producerGroupTemp); + producer.setNamesrvAddr("127.0.0.1:9876"); + producer.start(); + + final AtomicInteger cc = new AtomicInteger(0); + final CountDownLatch countDownLatch = new CountDownLatch(6); + + SendCallback sendCallback = new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + + } + + @Override + public void onException(Throwable e) { + e.printStackTrace(); + countDownLatch.countDown(); + cc.incrementAndGet(); + } + }; + MessageQueueSelector messageQueueSelector = new MessageQueueSelector() { + @Override + public MessageQueue select(List mqs, Message msg, Object arg) { + return null; + } + }; + + Message message = new Message(); + message.setTopic("test"); + message.setBody("hello world".getBytes()); + producer.send(new Message(),sendCallback); + producer.send(message,sendCallback,1000); + producer.send(message,new MessageQueue(),sendCallback); + producer.send(new Message(),new MessageQueue(),sendCallback,1000); + producer.send(new Message(),messageQueueSelector,null,sendCallback); + producer.send(message,messageQueueSelector,null,sendCallback,1000); + + countDownLatch.await(1000L, TimeUnit.MILLISECONDS); + + assertThat(cc.get()).isEqualTo(6); +// producer.sendOneway(new Message()); +// producer.sendOneway(new Message(),new MessageQueue()); +// producer.sendOneway(new Message(),messageQueueSelector,null); + } + public static TopicRouteData createTopicRoute() { TopicRouteData topicRouteData = new TopicRouteData(); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java index 2aea14cb9d6..c0754db634d 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java @@ -48,5 +48,7 @@ void registerProcessor(final int requestCode, final NettyRequestProcessor proces void setCallbackExecutor(final ExecutorService callbackExecutor); + ExecutorService getCallbackExecutor(); + boolean isChannelWritable(final String addr); } From d57f4b126b9254298457be5edf7d9ba74e9e315c Mon Sep 17 00:00:00 2001 From: lindzh Date: Thu, 25 Jan 2018 20:20:55 +0800 Subject: [PATCH 2/4] fix send async semphore --- .../impl/producer/DefaultMQProducerImpl.java | 149 ++++++++++++++---- .../client/producer/DefaultMQProducer.java | 18 ++- .../rocketmq/client/producer/MQProducer.java | 18 ++- 3 files changed, 141 insertions(+), 44 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 8dad24215c4..2f82d82ccee 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -29,6 +29,8 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -99,6 +101,8 @@ public class DefaultMQProducerImpl implements MQProducerInner { private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy(); + private final Semaphore asyncSendSemphore = new Semaphore(65536); + public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer) { this(defaultMQProducer, null); } @@ -407,6 +411,54 @@ public MessageExt queryMessageByUniqKey(String topic, String uniqKey) return this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey); } + private static class AsyncSendCallback implements SendCallback { + private SendCallback realSendCallback; + private Semaphore semaphore; + + public AsyncSendCallback(SendCallback sendCallback, Semaphore semaphore) { + this.realSendCallback = sendCallback; + this.semaphore = semaphore; + } + + @Override + public void onSuccess(SendResult sendResult) { + semaphore.release(); + realSendCallback.onSuccess(sendResult); + } + + @Override + public void onException(Throwable e) { + semaphore.release(); + realSendCallback.onException(e); + } + } + + // FIXME: 2018/1/25 + private ExecutorService getAsyncSendExecutor() { + return this.getCallbackExecutor(); + } + + private void doAsyncSend(Runnable runnable, final SendCallback sendCallback) { + try { + getAsyncSendExecutor().submit(runnable); + } catch (RejectedExecutionException e) { + sendCallback.onException(e); + } + } + + private void asyncHandleException(final SendCallback sendCallback, final Throwable e) { + try { + getAsyncSendExecutor().submit(new Runnable() { + @Override + public void run() { + sendCallback.onException(e); + } + }); + } catch (RejectedExecutionException e1) { + sendCallback.onException(e); + } + } + /** * DEFAULT ASYNC ------------------------------------------------------- */ @@ -415,16 +467,26 @@ public void send(Message msg, SendCallback sendCallback) { } public void send(final Message msg, final SendCallback sendCallback, final long timeout) { - this.getCallbackExecutor().submit(new Runnable() { - @Override - public void run() { - try { - sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout); - } catch (Exception e) { - handleCallbackException(e, sendCallback); - } + try { + boolean acquire = asyncSendSemphore.tryAcquire(timeout, TimeUnit.MILLISECONDS); + if (acquire) { + final AsyncSendCallback asyncSendCallback = new AsyncSendCallback(sendCallback, asyncSendSemphore); + doAsyncSend(new Runnable() { + @Override + public void run() { + try { + sendDefaultImpl(msg, CommunicationMode.ASYNC, asyncSendCallback, timeout); + } catch (Exception e) { + handleCallbackException(e, asyncSendCallback); + } + } + }, asyncSendCallback); + } else { + asyncHandleException(sendCallback, new RejectedExecutionException()); } - }); + } catch (InterruptedException e) { + asyncHandleException(sendCallback, e); + } } private void handleCallbackException(Exception e, SendCallback sendCallback) { @@ -863,22 +925,34 @@ public void send(Message msg, MessageQueue mq, SendCallback sendCallback) { } public void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, final long timeout) { - this.getCallbackExecutor().submit(new Runnable() { - @Override - public void run() { - try { - makeSureStateOK(); - Validators.checkMessage(msg, defaultMQProducer); - - if (!msg.getTopic().equals(mq.getTopic())) { - throw new MQClientException("message's topic not equal mq's topic", null); + try { + boolean acquire = asyncSendSemphore.tryAcquire(timeout, TimeUnit.MILLISECONDS); + if (acquire) { + final AsyncSendCallback asyncSendCallback = new AsyncSendCallback(sendCallback, asyncSendSemphore); + doAsyncSend(new Runnable() { + @Override + public void run() { + try { + + makeSureStateOK(); + Validators.checkMessage(msg, defaultMQProducer); + + if (!msg.getTopic().equals(mq.getTopic())) { + throw new MQClientException("message's topic not equal mq's topic", null); + } + sendKernelImpl(msg, mq, CommunicationMode.ASYNC, asyncSendCallback, null, timeout); + } catch (Exception e) { + handleCallbackException(e, asyncSendCallback); + } } - sendKernelImpl(msg, mq, CommunicationMode.ASYNC, sendCallback, null, timeout); - } catch (Exception e) { - handleCallbackException(e, sendCallback); - } + }, asyncSendCallback); + + } else { + asyncHandleException(sendCallback, new RejectedExecutionException()); } - }); + } catch (InterruptedException e) { + asyncHandleException(sendCallback, e); + } } /** @@ -946,16 +1020,27 @@ public void send(Message msg, MessageQueueSelector selector, Object arg, SendCal } public void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback, final long timeout) { - this.getCallbackExecutor().submit(new Runnable() { - @Override - public void run() { - try { - sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback, timeout); - } catch (Exception e) { - handleCallbackException(e, sendCallback); - } + try { + boolean acquire = asyncSendSemphore.tryAcquire(timeout, TimeUnit.MILLISECONDS); + if (acquire) { + final AsyncSendCallback asyncSendCallback = new AsyncSendCallback(sendCallback, asyncSendSemphore); + doAsyncSend(new Runnable() { + @Override + public void run() { + try { + sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, asyncSendCallback, timeout); + } catch (Exception e) { + handleCallbackException(e, asyncSendCallback); + } + } + }, asyncSendCallback); + + } else { + asyncHandleException(sendCallback, new RejectedExecutionException()); } - }); + } catch (InterruptedException e) { + asyncHandleException(sendCallback, e); + } } /** diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index b91c1b994f5..d0cae87a14f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -250,7 +250,8 @@ public SendResult send(Message msg, * @throws InterruptedException if the sending thread is interrupted. */ @Override - public void send(Message msg, SendCallback sendCallback) { + public void send(Message msg, SendCallback sendCallback) + throws MQClientException, RemotingException, InterruptedException { this.defaultMQProducerImpl.send(msg, sendCallback); } @@ -265,7 +266,8 @@ public void send(Message msg, SendCallback sendCallback) { * @throws InterruptedException if the sending thread is interrupted. */ @Override - public void send(Message msg, SendCallback sendCallback, long timeout) { + public void send(Message msg, SendCallback sendCallback, long timeout) + throws MQClientException, RemotingException, InterruptedException { this.defaultMQProducerImpl.send(msg, sendCallback, timeout); } @@ -331,7 +333,8 @@ public SendResult send(Message msg, MessageQueue mq, long timeout) * @throws InterruptedException if the sending thread is interrupted. */ @Override - public void send(Message msg, MessageQueue mq, SendCallback sendCallback) { + public void send(Message msg, MessageQueue mq, SendCallback sendCallback) + throws MQClientException, RemotingException, InterruptedException { this.defaultMQProducerImpl.send(msg, mq, sendCallback); } @@ -347,7 +350,8 @@ public void send(Message msg, MessageQueue mq, SendCallback sendCallback) { * @throws InterruptedException if the sending thread is interrupted. */ @Override - public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout) { + public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout) + throws MQClientException, RemotingException, InterruptedException { this.defaultMQProducerImpl.send(msg, mq, sendCallback, timeout); } @@ -417,7 +421,8 @@ public SendResult send(Message msg, MessageQueueSelector selector, Object arg, l * @throws InterruptedException if the sending thread is interrupted. */ @Override - public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback) { + public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback) + throws MQClientException, RemotingException, InterruptedException { this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback); } @@ -434,7 +439,8 @@ public void send(Message msg, MessageQueueSelector selector, Object arg, SendCal * @throws InterruptedException if the sending thread is interrupted. */ @Override - public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout) { + public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout) + throws MQClientException, RemotingException, InterruptedException { this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout); } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java index 46b8c2c660d..a7367e91992 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java @@ -38,9 +38,11 @@ SendResult send(final Message msg) throws MQClientException, RemotingException, SendResult send(final Message msg, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; - void send(final Message msg, final SendCallback sendCallback); + void send(final Message msg, final SendCallback sendCallback) throws MQClientException, + RemotingException, InterruptedException; - void send(final Message msg, final SendCallback sendCallback, final long timeout); + void send(final Message msg, final SendCallback sendCallback, final long timeout) + throws MQClientException, RemotingException, InterruptedException; void sendOneway(final Message msg) throws MQClientException, RemotingException, InterruptedException; @@ -51,9 +53,11 @@ SendResult send(final Message msg, final MessageQueue mq) throws MQClientExcepti SendResult send(final Message msg, final MessageQueue mq, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; - void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback); + void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback) + throws MQClientException, RemotingException, InterruptedException; - void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, long timeout); + void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, long timeout) + throws MQClientException, RemotingException, InterruptedException; void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException, RemotingException, InterruptedException; @@ -66,10 +70,12 @@ SendResult send(final Message msg, final MessageQueueSelector selector, final Ob InterruptedException; void send(final Message msg, final MessageQueueSelector selector, final Object arg, - final SendCallback sendCallback); + final SendCallback sendCallback) throws MQClientException, RemotingException, + InterruptedException; void send(final Message msg, final MessageQueueSelector selector, final Object arg, - final SendCallback sendCallback, final long timeout); + final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, + InterruptedException; void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg) throws MQClientException, RemotingException, InterruptedException; From f8113ebc08c5f2f0a95a920058534cb8167c31df Mon Sep 17 00:00:00 2001 From: lindzh Date: Thu, 25 Jan 2018 20:42:55 +0800 Subject: [PATCH 3/4] fix client semaphore --- .../java/org/apache/rocketmq/client/ClientConfig.java | 11 +++++++++++ .../client/impl/producer/DefaultMQProducerImpl.java | 8 ++++++-- .../client/producer/DefaultMQProducerTest.java | 3 --- 3 files changed, 17 insertions(+), 5 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java index a9eabfe6313..66ec1e944db 100644 --- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java +++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java @@ -25,6 +25,7 @@ * Client Common configuration */ public class ClientConfig { + public static final String SEND_ASYNC_SEMAPHORE = "com.rocketmq.sendAsyncSemaphore"; public static final String SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY = "com.rocketmq.sendMessageWithVIPChannel"; private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV)); private String clientIP = RemotingUtil.getLocalAddress(); @@ -46,6 +47,8 @@ public class ClientConfig { private String unitName; private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true")); + private int asyncSendSemaphore = Integer.parseInt(System.getProperty(SEND_ASYNC_SEMAPHORE, "65536")); + private boolean useTLS = TlsSystemConfig.tlsEnable; public String buildMQClientId() { @@ -186,6 +189,14 @@ public void setUseTLS(boolean useTLS) { this.useTLS = useTLS; } + public int getAsyncSendSemaphore() { + return asyncSendSemaphore; + } + + public void setAsyncSendSemaphore(int asyncSendSemaphore) { + this.asyncSendSemaphore = asyncSendSemaphore; + } + @Override public String toString() { return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 2f82d82ccee..f504cafff8c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -101,7 +101,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy(); - private final Semaphore asyncSendSemphore = new Semaphore(65536); + private Semaphore asyncSendSemphore = null; public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer) { this(defaultMQProducer, null); @@ -150,6 +150,10 @@ public void start(final boolean startFactory) throws MQClientException { this.checkConfig(); + if (asyncSendSemphore == null) { + asyncSendSemphore = new Semaphore(defaultMQProducer.getAsyncSendSemaphore()); + } + if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) { this.defaultMQProducer.changeInstanceNameToPID(); } @@ -254,7 +258,7 @@ public TransactionCheckListener checkListener() { @Override public void checkTransactionState(final String addr, final MessageExt msg, - final CheckTransactionStateRequestHeader header) { + final CheckTransactionStateRequestHeader header) { Runnable request = new Runnable() { private final String brokerAddr = addr; private final MessageExt message = msg; diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java index 465b21a587d..a3d11a9dead 100644 --- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java @@ -261,9 +261,6 @@ public MessageQueue select(List mqs, Message msg, Object arg) { countDownLatch.await(1000L, TimeUnit.MILLISECONDS); assertThat(cc.get()).isEqualTo(6); -// producer.sendOneway(new Message()); -// producer.sendOneway(new Message(),new MessageQueue()); -// producer.sendOneway(new Message(),messageQueueSelector,null); } public static TopicRouteData createTopicRoute() { From 0e09e265456f81315a3b2d1edd67fc0370638cb5 Mon Sep 17 00:00:00 2001 From: lindzh Date: Fri, 26 Jan 2018 15:12:09 +0800 Subject: [PATCH 4/4] fix asyncSend semaphore --- .../impl/producer/DefaultMQProducerImpl.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index f504cafff8c..5d4c46c58e1 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -426,14 +426,20 @@ public AsyncSendCallback(SendCallback sendCallback, Semaphore semaphore) { @Override public void onSuccess(SendResult sendResult) { - semaphore.release(); - realSendCallback.onSuccess(sendResult); + try { + realSendCallback.onSuccess(sendResult); + } finally { + semaphore.release(); + } } @Override public void onException(Throwable e) { - semaphore.release(); - realSendCallback.onException(e); + try { + realSendCallback.onException(e); + } finally { + semaphore.release(); + } } }