diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java index a9eabfe6313..66ec1e944db 100644 --- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java +++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java @@ -25,6 +25,7 @@ * Client Common configuration */ public class ClientConfig { + public static final String SEND_ASYNC_SEMAPHORE = "com.rocketmq.sendAsyncSemaphore"; public static final String SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY = "com.rocketmq.sendMessageWithVIPChannel"; private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV)); private String clientIP = RemotingUtil.getLocalAddress(); @@ -46,6 +47,8 @@ public class ClientConfig { private String unitName; private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true")); + private int asyncSendSemaphore = Integer.parseInt(System.getProperty(SEND_ASYNC_SEMAPHORE, "65536")); + private boolean useTLS = TlsSystemConfig.tlsEnable; public String buildMQClientId() { @@ -186,6 +189,14 @@ public void setUseTLS(boolean useTLS) { this.useTLS = useTLS; } + public int getAsyncSendSemaphore() { + return asyncSendSemaphore; + } + + public void setAsyncSendSemaphore(int asyncSendSemaphore) { + this.asyncSendSemaphore = asyncSendSemaphore; + } + @Override public String toString() { return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 7c169796741..5d4c46c58e1 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -29,8 +29,11 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; + import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageExt; @@ -98,6 +101,8 @@ public class DefaultMQProducerImpl implements MQProducerInner { private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy(); + private Semaphore asyncSendSemphore = null; + public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer) { this(defaultMQProducer, null); } @@ -145,6 +150,10 @@ public void start(final boolean startFactory) throws MQClientException { this.checkConfig(); + if (asyncSendSemphore == null) { + asyncSendSemphore = new Semaphore(defaultMQProducer.getAsyncSendSemaphore()); + } + if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) { this.defaultMQProducer.changeInstanceNameToPID(); } @@ -406,20 +415,99 @@ public MessageExt queryMessageByUniqKey(String topic, String uniqKey) return this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey); } + private static class AsyncSendCallback implements SendCallback { + private SendCallback realSendCallback; + private Semaphore semaphore; + + public AsyncSendCallback(SendCallback sendCallback, Semaphore semaphore) { + this.realSendCallback = sendCallback; + this.semaphore = semaphore; + } + + @Override + public void onSuccess(SendResult sendResult) { + try { + realSendCallback.onSuccess(sendResult); + } finally { + semaphore.release(); + } + } + + @Override + public void onException(Throwable e) { + try { + realSendCallback.onException(e); + } finally { + semaphore.release(); + } + } + } + + // FIXME: 2018/1/25 + private ExecutorService getAsyncSendExecutor() { + return this.getCallbackExecutor(); + } + + private void doAsyncSend(Runnable runnable, final SendCallback sendCallback) { + try { + getAsyncSendExecutor().submit(runnable); + } catch (RejectedExecutionException e) { + sendCallback.onException(e); + } + } + + private void asyncHandleException(final SendCallback sendCallback, final Throwable e) { + try { + getAsyncSendExecutor().submit(new Runnable() { + @Override + public void run() { + sendCallback.onException(e); + } + }); + } catch (RejectedExecutionException e1) { + sendCallback.onException(e); + } + } + /** * DEFAULT ASYNC ------------------------------------------------------- */ - public void send(Message msg, - SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { + public void send(Message msg, SendCallback sendCallback) { send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); } - public void send(Message msg, SendCallback sendCallback, long timeout) - throws MQClientException, RemotingException, InterruptedException { + public void send(final Message msg, final SendCallback sendCallback, final long timeout) { try { - this.sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout); - } catch (MQBrokerException e) { - throw new MQClientException("unknownn exception", e); + boolean acquire = asyncSendSemphore.tryAcquire(timeout, TimeUnit.MILLISECONDS); + if (acquire) { + final AsyncSendCallback asyncSendCallback = new AsyncSendCallback(sendCallback, asyncSendSemphore); + doAsyncSend(new Runnable() { + @Override + public void run() { + try { + sendDefaultImpl(msg, CommunicationMode.ASYNC, asyncSendCallback, timeout); + } catch (Exception e) { + handleCallbackException(e, asyncSendCallback); + } + } + }, asyncSendCallback); + } else { + asyncHandleException(sendCallback, new RejectedExecutionException()); + } + } catch (InterruptedException e) { + asyncHandleException(sendCallback, e); + } + } + + private void handleCallbackException(Exception e, SendCallback sendCallback) { + if (sendCallback != null) { + if (e instanceof MQBrokerException) { + sendCallback.onException(new MQClientException("unknown exception", e)); + } else { + sendCallback.onException(e); + } + } else { + log.warn("asyncSend message callback null real exception is " + e.getMessage(), e); } } @@ -583,11 +671,11 @@ private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { } private SendResult sendKernelImpl(final Message msg, - final MessageQueue mq, - final CommunicationMode communicationMode, - final SendCallback sendCallback, - final TopicPublishInfo topicPublishInfo, - final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + final MessageQueue mq, + final CommunicationMode communicationMode, + final SendCallback sendCallback, + final TopicPublishInfo topicPublishInfo, + final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); if (null == brokerAddr) { tryToFindTopicPublishInfo(mq.getTopic()); @@ -842,24 +930,38 @@ public SendResult send(Message msg, MessageQueue mq, long timeout) /** * KERNEL ASYNC ------------------------------------------------------- */ - public void send(Message msg, MessageQueue mq, SendCallback sendCallback) - throws MQClientException, RemotingException, InterruptedException { + public void send(Message msg, MessageQueue mq, SendCallback sendCallback) { send(msg, mq, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); } - public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout) - throws MQClientException, RemotingException, InterruptedException { - this.makeSureStateOK(); - Validators.checkMessage(msg, this.defaultMQProducer); - - if (!msg.getTopic().equals(mq.getTopic())) { - throw new MQClientException("message's topic not equal mq's topic", null); - } - + public void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, final long timeout) { try { - this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, sendCallback, null, timeout); - } catch (MQBrokerException e) { - throw new MQClientException("unknown exception", e); + boolean acquire = asyncSendSemphore.tryAcquire(timeout, TimeUnit.MILLISECONDS); + if (acquire) { + final AsyncSendCallback asyncSendCallback = new AsyncSendCallback(sendCallback, asyncSendSemphore); + doAsyncSend(new Runnable() { + @Override + public void run() { + try { + + makeSureStateOK(); + Validators.checkMessage(msg, defaultMQProducer); + + if (!msg.getTopic().equals(mq.getTopic())) { + throw new MQClientException("message's topic not equal mq's topic", null); + } + sendKernelImpl(msg, mq, CommunicationMode.ASYNC, asyncSendCallback, null, timeout); + } catch (Exception e) { + handleCallbackException(e, asyncSendCallback); + } + } + }, asyncSendCallback); + + } else { + asyncHandleException(sendCallback, new RejectedExecutionException()); + } + } catch (InterruptedException e) { + asyncHandleException(sendCallback, e); } } @@ -867,7 +969,7 @@ public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long t * KERNEL ONEWAY ------------------------------------------------------- */ public void sendOneway(Message msg, - MessageQueue mq) throws MQClientException, RemotingException, InterruptedException { + MessageQueue mq) throws MQClientException, RemotingException, InterruptedException { this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); @@ -923,17 +1025,31 @@ private SendResult sendSelectImpl( /** * SELECT ASYNC ------------------------------------------------------- */ - public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback) - throws MQClientException, RemotingException, InterruptedException { + public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback) { send(msg, selector, arg, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); } - public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout) - throws MQClientException, RemotingException, InterruptedException { + public void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback, final long timeout) { try { - this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback, timeout); - } catch (MQBrokerException e) { - throw new MQClientException("unknownn exception", e); + boolean acquire = asyncSendSemphore.tryAcquire(timeout, TimeUnit.MILLISECONDS); + if (acquire) { + final AsyncSendCallback asyncSendCallback = new AsyncSendCallback(sendCallback, asyncSendSemphore); + doAsyncSend(new Runnable() { + @Override + public void run() { + try { + sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, asyncSendCallback, timeout); + } catch (Exception e) { + handleCallbackException(e, asyncSendCallback); + } + } + }, asyncSendCallback); + + } else { + asyncHandleException(sendCallback, new RejectedExecutionException()); + } + } catch (InterruptedException e) { + asyncHandleException(sendCallback, e); } } @@ -950,7 +1066,7 @@ public void sendOneway(Message msg, MessageQueueSelector selector, Object arg) } public TransactionSendResult sendMessageInTransaction(final Message msg, - final LocalTransactionExecuter tranExecuter, final Object arg) + final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException { if (null == tranExecuter) { throw new MQClientException("tranExecutor is null", null); @@ -1064,8 +1180,12 @@ public void setCallbackExecutor(final ExecutorService callbackExecutor) { this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().setCallbackExecutor(callbackExecutor); } + private ExecutorService getCallbackExecutor() { + return this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().getCallbackExecutor(); + } + public SendResult send(Message msg, - long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout); } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index a2f25dd0f8f..d0cae87a14f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -250,8 +250,8 @@ public SendResult send(Message msg, * @throws InterruptedException if the sending thread is interrupted. */ @Override - public void send(Message msg, - SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { + public void send(Message msg, SendCallback sendCallback) + throws MQClientException, RemotingException, InterruptedException { this.defaultMQProducerImpl.send(msg, sendCallback); } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java index 14caf6ffac9..a7367e91992 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java @@ -70,11 +70,11 @@ SendResult send(final Message msg, final MessageQueueSelector selector, final Ob InterruptedException; void send(final Message msg, final MessageQueueSelector selector, final Object arg, - final SendCallback sendCallback) throws MQClientException, RemotingException, + final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException; void send(final Message msg, final MessageQueueSelector selector, final Object arg, - final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, + final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, InterruptedException; void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg) diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java index ded22ada914..a3d11a9dead 100644 --- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java @@ -24,6 +24,9 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -36,6 +39,7 @@ import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.QueueData; @@ -214,6 +218,51 @@ public void testSetCallbackExecutor() throws MQClientException { assertThat(remotingClient.getCallbackExecutor()).isEqualTo(customized); } + @Test + public void testAsyncSend() throws MQClientException, RemotingException, InterruptedException { + String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis(); + producer = new DefaultMQProducer(producerGroupTemp); + producer.setNamesrvAddr("127.0.0.1:9876"); + producer.start(); + + final AtomicInteger cc = new AtomicInteger(0); + final CountDownLatch countDownLatch = new CountDownLatch(6); + + SendCallback sendCallback = new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + + } + + @Override + public void onException(Throwable e) { + e.printStackTrace(); + countDownLatch.countDown(); + cc.incrementAndGet(); + } + }; + MessageQueueSelector messageQueueSelector = new MessageQueueSelector() { + @Override + public MessageQueue select(List mqs, Message msg, Object arg) { + return null; + } + }; + + Message message = new Message(); + message.setTopic("test"); + message.setBody("hello world".getBytes()); + producer.send(new Message(),sendCallback); + producer.send(message,sendCallback,1000); + producer.send(message,new MessageQueue(),sendCallback); + producer.send(new Message(),new MessageQueue(),sendCallback,1000); + producer.send(new Message(),messageQueueSelector,null,sendCallback); + producer.send(message,messageQueueSelector,null,sendCallback,1000); + + countDownLatch.await(1000L, TimeUnit.MILLISECONDS); + + assertThat(cc.get()).isEqualTo(6); + } + public static TopicRouteData createTopicRoute() { TopicRouteData topicRouteData = new TopicRouteData(); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java index 2aea14cb9d6..c0754db634d 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java @@ -48,5 +48,7 @@ void registerProcessor(final int requestCode, final NettyRequestProcessor proces void setCallbackExecutor(final ExecutorService callbackExecutor); + ExecutorService getCallbackExecutor(); + boolean isChannelWritable(final String addr); }