Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
1ae0693
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Aug 11, 2017
f0e243c
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Aug 11, 2017
1810be4
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Aug 14, 2017
53dcd8d
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Aug 22, 2017
4abfa4f
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Aug 28, 2017
d576e38
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Aug 30, 2017
bb446c4
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Sep 21, 2017
11d40c2
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Sep 25, 2017
ae5c41e
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Oct 11, 2017
89dbf04
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Oct 12, 2017
bc1c880
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Oct 25, 2017
48d93e8
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Dec 7, 2017
962dfbf
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Dec 13, 2017
844f0a9
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Dec 14, 2017
5567377
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Dec 18, 2017
c1f503c
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Dec 20, 2017
fcdf17d
make client asyncSend fully async send
lindzh Jan 16, 2018
ccbc4b4
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Jan 16, 2018
969f20b
Merge branch 'develop' into fix_sendAsync
lindzh Jan 16, 2018
171799e
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Jan 23, 2018
564aa26
Merge branch 'develop' into fix_sendAsync
lindzh Jan 25, 2018
d57f4b1
fix send async semphore
lindzh Jan 25, 2018
f8113eb
fix client semaphore
lindzh Jan 25, 2018
0e09e26
fix asyncSend semaphore
lindzh Jan 26, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* Client Common configuration
*/
public class ClientConfig {
public static final String SEND_ASYNC_SEMAPHORE = "com.rocketmq.sendAsyncSemaphore";
public static final String SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY = "com.rocketmq.sendMessageWithVIPChannel";
private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
private String clientIP = RemotingUtil.getLocalAddress();
Expand All @@ -46,6 +47,8 @@ public class ClientConfig {
private String unitName;
private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"));

private int asyncSendSemaphore = Integer.parseInt(System.getProperty(SEND_ASYNC_SEMAPHORE, "65536"));

private boolean useTLS = TlsSystemConfig.tlsEnable;

public String buildMQClientId() {
Expand Down Expand Up @@ -186,6 +189,14 @@ public void setUseTLS(boolean useTLS) {
this.useTLS = useTLS;
}

public int getAsyncSendSemaphore() {
return asyncSendSemaphore;
}

public void setAsyncSendSemaphore(int asyncSendSemaphore) {
this.asyncSendSemaphore = asyncSendSemaphore;
}

@Override
public String toString() {
return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageExt;
Expand Down Expand Up @@ -98,6 +101,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {

private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();

private Semaphore asyncSendSemphore = null;

public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer) {
this(defaultMQProducer, null);
}
Expand Down Expand Up @@ -145,6 +150,10 @@ public void start(final boolean startFactory) throws MQClientException {

this.checkConfig();

if (asyncSendSemphore == null) {
asyncSendSemphore = new Semaphore(defaultMQProducer.getAsyncSendSemaphore());
}

if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
Expand Down Expand Up @@ -406,20 +415,99 @@ public MessageExt queryMessageByUniqKey(String topic, String uniqKey)
return this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey);
}

private static class AsyncSendCallback implements SendCallback {
private SendCallback realSendCallback;
private Semaphore semaphore;

public AsyncSendCallback(SendCallback sendCallback, Semaphore semaphore) {
this.realSendCallback = sendCallback;
this.semaphore = semaphore;
}

@Override
public void onSuccess(SendResult sendResult) {
try {
realSendCallback.onSuccess(sendResult);
} finally {
semaphore.release();
}
}

@Override
public void onException(Throwable e) {
try {
realSendCallback.onException(e);
} finally {
semaphore.release();
}
}
}

// FIXME: 2018/1/25
private ExecutorService getAsyncSendExecutor() {
return this.getCallbackExecutor();
}

private void doAsyncSend(Runnable runnable, final SendCallback sendCallback) {
try {
getAsyncSendExecutor().submit(runnable);
} catch (RejectedExecutionException e) {
sendCallback.onException(e);
}
}

private void asyncHandleException(final SendCallback sendCallback, final Throwable e) {
try {
getAsyncSendExecutor().submit(new Runnable() {
@Override
public void run() {
sendCallback.onException(e);
}
});
} catch (RejectedExecutionException e1) {
sendCallback.onException(e);
}
}

/**
* DEFAULT ASYNC -------------------------------------------------------
*/
public void send(Message msg,
SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
public void send(Message msg, SendCallback sendCallback) {
send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
}

public void send(Message msg, SendCallback sendCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException {
public void send(final Message msg, final SendCallback sendCallback, final long timeout) {
try {
this.sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout);
} catch (MQBrokerException e) {
throw new MQClientException("unknownn exception", e);
boolean acquire = asyncSendSemphore.tryAcquire(timeout, TimeUnit.MILLISECONDS);
if (acquire) {
final AsyncSendCallback asyncSendCallback = new AsyncSendCallback(sendCallback, asyncSendSemphore);
doAsyncSend(new Runnable() {
@Override
public void run() {
try {
sendDefaultImpl(msg, CommunicationMode.ASYNC, asyncSendCallback, timeout);
} catch (Exception e) {
handleCallbackException(e, asyncSendCallback);
}
}
}, asyncSendCallback);
} else {
asyncHandleException(sendCallback, new RejectedExecutionException());
}
} catch (InterruptedException e) {
asyncHandleException(sendCallback, e);
}
}

private void handleCallbackException(Exception e, SendCallback sendCallback) {
if (sendCallback != null) {
if (e instanceof MQBrokerException) {
sendCallback.onException(new MQClientException("unknown exception", e));
} else {
sendCallback.onException(e);
}
} else {
log.warn("asyncSend message callback null real exception is " + e.getMessage(), e);
}
}

Expand Down Expand Up @@ -583,11 +671,11 @@ private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
}

private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
Expand Down Expand Up @@ -842,32 +930,46 @@ public SendResult send(Message msg, MessageQueue mq, long timeout)
/**
* KERNEL ASYNC -------------------------------------------------------
*/
public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
throws MQClientException, RemotingException, InterruptedException {
public void send(Message msg, MessageQueue mq, SendCallback sendCallback) {
send(msg, mq, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
}

public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);

if (!msg.getTopic().equals(mq.getTopic())) {
throw new MQClientException("message's topic not equal mq's topic", null);
}

public void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, final long timeout) {
try {
this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, sendCallback, null, timeout);
} catch (MQBrokerException e) {
throw new MQClientException("unknown exception", e);
boolean acquire = asyncSendSemphore.tryAcquire(timeout, TimeUnit.MILLISECONDS);
if (acquire) {
final AsyncSendCallback asyncSendCallback = new AsyncSendCallback(sendCallback, asyncSendSemphore);
doAsyncSend(new Runnable() {
@Override
public void run() {
try {

makeSureStateOK();
Validators.checkMessage(msg, defaultMQProducer);

if (!msg.getTopic().equals(mq.getTopic())) {
throw new MQClientException("message's topic not equal mq's topic", null);
}
sendKernelImpl(msg, mq, CommunicationMode.ASYNC, asyncSendCallback, null, timeout);
} catch (Exception e) {
handleCallbackException(e, asyncSendCallback);
}
}
}, asyncSendCallback);

} else {
asyncHandleException(sendCallback, new RejectedExecutionException());
}
} catch (InterruptedException e) {
asyncHandleException(sendCallback, e);
}
}

/**
* KERNEL ONEWAY -------------------------------------------------------
*/
public void sendOneway(Message msg,
MessageQueue mq) throws MQClientException, RemotingException, InterruptedException {
MessageQueue mq) throws MQClientException, RemotingException, InterruptedException {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);

Expand Down Expand Up @@ -923,17 +1025,31 @@ private SendResult sendSelectImpl(
/**
* SELECT ASYNC -------------------------------------------------------
*/
public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback)
throws MQClientException, RemotingException, InterruptedException {
public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback) {
send(msg, selector, arg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
}

public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException {
public void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback, final long timeout) {
try {
this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback, timeout);
} catch (MQBrokerException e) {
throw new MQClientException("unknownn exception", e);
boolean acquire = asyncSendSemphore.tryAcquire(timeout, TimeUnit.MILLISECONDS);
if (acquire) {
final AsyncSendCallback asyncSendCallback = new AsyncSendCallback(sendCallback, asyncSendSemphore);
doAsyncSend(new Runnable() {
@Override
public void run() {
try {
sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, asyncSendCallback, timeout);
} catch (Exception e) {
handleCallbackException(e, asyncSendCallback);
}
}
}, asyncSendCallback);

} else {
asyncHandleException(sendCallback, new RejectedExecutionException());
}
} catch (InterruptedException e) {
asyncHandleException(sendCallback, e);
}
}

Expand All @@ -950,7 +1066,7 @@ public void sendOneway(Message msg, MessageQueueSelector selector, Object arg)
}

public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter tranExecuter, final Object arg)
final LocalTransactionExecuter tranExecuter, final Object arg)
throws MQClientException {
if (null == tranExecuter) {
throw new MQClientException("tranExecutor is null", null);
Expand Down Expand Up @@ -1064,8 +1180,12 @@ public void setCallbackExecutor(final ExecutorService callbackExecutor) {
this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().setCallbackExecutor(callbackExecutor);
}

private ExecutorService getCallbackExecutor() {
return this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().getCallbackExecutor();
}

public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,8 @@ public SendResult send(Message msg,
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
public void send(Message msg,
SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
public void send(Message msg, SendCallback sendCallback)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.send(msg, sendCallback);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ SendResult send(final Message msg, final MessageQueueSelector selector, final Ob
InterruptedException;

void send(final Message msg, final MessageQueueSelector selector, final Object arg,
final SendCallback sendCallback) throws MQClientException, RemotingException,
final SendCallback sendCallback) throws MQClientException, RemotingException,
InterruptedException;

void send(final Message msg, final MessageQueueSelector selector, final Object arg,
final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException,
final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException,
InterruptedException;

void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg)
Expand Down
Loading