New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ROCKETMQ-80] Add batch feature #53

Closed
wants to merge 1 commit into
base: develop
from

Conversation

Projects
None yet
7 participants
@dongeforever
Member

dongeforever commented Feb 7, 2017

Tests show that Kafka's million-level TPS is mainly owed to batch. When set batch size to 1, the TPS is reduced an order of magnitude. So I try to add this feature to RocketMQ.

For a minimal effort, it works as follows:

  • Only add synchronous send functions to MQProducer interface, just like send(final Collection msgs)
  • Use MessageBatch which extends Message and implements Iterable<Message>
  • Use byte buffer instead of list of objects to avoid too much GC in Broker.
  • Split the decode and encode logic from lockForPutMessage to avoid too many race conditions.

Tests:
On linux with 24 Core 48G Ram and SSD, single broker, using 50 threads to send 50Byte(body) message in batch size 50, we get about 150w TPS until the disk is full.

Potential problems:
Although the messages can be accumulated in the Broker very quickly, it need time to dispatch to the consume queue, which is much slower than accepting messages. So the messages may not be able to be consumed immediately.

We may need to refactor the ReputMessageService to solve this problem.

And if guys have some ideas, please let me know or just share it in this issue.

@dongeforever

This comment has been minimized.

Member

dongeforever commented Feb 7, 2017

@vongosling @zhouxinyu @WillemJiang please have a review.

@vongosling

This comment has been minimized.

Member

vongosling commented Feb 7, 2017

Cool, Thanks @dongeforever providing this feature. We will have a look at this implementation. please hold your horses :-)

@coveralls

This comment has been minimized.

coveralls commented Feb 7, 2017

Coverage Status

Coverage increased (+0.4%) to 31.909% when pulling e03b6e6 on dongeforever:ROCKETMQ-80 into 9a2de7b on apache:master.

1 similar comment
@coveralls

This comment has been minimized.

coveralls commented Feb 7, 2017

Coverage Status

Coverage increased (+0.4%) to 31.909% when pulling e03b6e6 on dongeforever:ROCKETMQ-80 into 9a2de7b on apache:master.

@zhouxinyu

This comment has been minimized.

Member

zhouxinyu commented Feb 8, 2017

I wonder that are there some compatibility issues between new client version and old server version?

@dongeforever

This comment has been minimized.

Member

dongeforever commented Feb 8, 2017

@zhouxinyu yeah, this is a problem. When new client send batched messages to old server, it will get no error, for the batched messages are treated as normal message internally.

Maybe I need a new request code, therefore the old server cannot recognize it and throw error.

@shroman

This comment has been minimized.

Contributor

shroman commented Feb 8, 2017

Sounds good to check for the protocol version in the request header, and reject with an error when not matched.
Probably, RemotingCommand.REMOTING_VERSION_KEY will work, but I would create a version policy based not on the release (that is used now) but on the protocol changes, so that we don't have to introduce a new checking condition in the code with each release. Just an idea.

@dongeforever

This comment has been minimized.

Member

dongeforever commented Feb 10, 2017

@shroman thank you.
Great idea to create a version policy based on protocol changes.
And we would need to control every api separately.
So maybe we can write all these checking codes in one place for all apis.

BTW, version control is a global problem, not only for batch, maybe we need more discussion in the mailing list.

@shroman

This comment has been minimized.

Contributor

shroman commented Feb 10, 2017

@dongeforever yes, I think we have to discuss it in the dev mailing list 👍
I think having a class/methods for wire protocol version checking and applying them (and, as a result, accepting or rejecting) when a client wants to establish a connection to a broker, etc. should work. I guess that's what you meant by having "checking codes in one place" :)
The version information can be placed into headers.

Such protocol changes shouldn't happen that often (RocketMQ v.4.0.0, v.4.1.0 may use protocol v.1, but RocketMQ v.5.0.0 may use protocol v.2), but if they happen, in this way we can guarantee that the communication between system components is done correctly.

Let's discuss it in the ml. My understanding of RocketMQ is still not complete ;) Maybe there are better ideas not to over-complicate is with versioning.

@dongeforever

This comment has been minimized.

Member

dongeforever commented Feb 15, 2017

@zhouxinyu Note that the batch is a new api, I add a new request code named "SEND_BATCH_MESSAGE". As the old broker cannot recognize it, the compatible problem is gone.

Also I use the ThreadLocal to avoid race condition for MessageExtBatchEncoder, which previously was cached in the blocking queue.

@coveralls

This comment has been minimized.

coveralls commented Feb 15, 2017

Coverage Status

Coverage increased (+0.5%) to 32.046% when pulling 6579e7a on dongeforever:ROCKETMQ-80 into 573b22c on apache:master.

2 similar comments
@coveralls

This comment has been minimized.

coveralls commented Feb 15, 2017

Coverage Status

Coverage increased (+0.5%) to 32.046% when pulling 6579e7a on dongeforever:ROCKETMQ-80 into 573b22c on apache:master.

@coveralls

This comment has been minimized.

coveralls commented Feb 15, 2017

Coverage Status

Coverage increased (+0.5%) to 32.046% when pulling 6579e7a on dongeforever:ROCKETMQ-80 into 573b22c on apache:master.

@Jaskey

This comment has been minimized.

Contributor

Jaskey commented Feb 16, 2017

@dongeforever

I have the same wishes for batch send too, but what drives me is that user may propably need a batch id for one batch of message, and these message should be success all to one single queue, which is nessecary when sneding order message. say msgA msgB and msgC should be consumed in order, they should be send to one same queue, but if we use for loop to send this, A may success and B may fail to the same queue since the queue numbers may changes at that exct sending time.

Batch send could solve this problem. But we may also need to generate a uniq batch id for this in client, which will help us to optimze the performance of consumeorderlyservice in the furture. Currently, message in one single queue can only be consumed only if the previous one consumed successfully which actually is too strict. Actully we only need the message in one batch consumed in order, batch id will help us to do this.

So in general, I suggest adding batch id when sending batch message in all message property.

PS: There looks like two many repeated code, any ways or plans to clean it?

@dongeforever

This comment has been minimized.

Member

dongeforever commented Feb 17, 2017

@Jaskey Now there is no batch id, but the messages in one batch are sent to the same queue, and they can only be sent all successfully or all unsuccessfully.
You could check the code or test it.

@Jaskey

This comment has been minimized.

Contributor

Jaskey commented Feb 17, 2017

@dongeforever I know your implementation.What I suggest is that you add batch id for this so that we can inditify them, which is actully a very minimum effort for you.

And as I sai, in the future we can optimize consumeOrderlyService with batch id.

@dongeforever

This comment has been minimized.

Member

dongeforever commented Feb 17, 2017

@Jaskey Sorry. You are right. And it is worth a new PR, maybe we can add batch id along with a new optimized consumeOrderlyService.

@lizhanhui

This comment has been minimized.

Contributor

lizhanhui commented Feb 17, 2017

This feature is definitely nice to have and it expands scenarios that RocketMQ fits best. For example, RocketMQ may have close, if not better, performance with Kafka in log collecting usage.

That said, this is a pretty important feature and it matters so much that we need to get it right at the beginning. We'd better have a design document first, then discuss various impacts it brings about in the mailing list.

As for this PR, it's generally good, yet, still needs more work: code duplication, message validation logic discrepancy, excessive constraints on usages(No delay messages, messages of a batch must have same topic, etc) and previously mentioned compatibility issue with older brokers.

@vongosling

This comment has been minimized.

Member

vongosling commented Feb 20, 2017

Yep agreed @lizhanhui . I will make some comment as much detail as possible.

@vongosling

This comment has been minimized.

Member

vongosling commented Feb 20, 2017

@dongeforever we can continue to polish this PR, IMO. if you have any problem. please let me know. BTW, can you post your performance test result for us

@@ -72,7 +73,13 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
mqtraceContext = buildMsgContext(ctx, requestHeader);
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
final RemotingCommand response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
RemotingCommand response = null;

This comment has been minimized.

@vongosling

vongosling Feb 21, 2017

Member

Please remove redundant initializer

response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
if (log.isDebugEnabled()) {

This comment has been minimized.

@vongosling

vongosling Feb 21, 2017

Member

IMO, we can remove redundant expression isDebugEnabled here. Also please capitalize the first letter in log output

}
response.setCode(-1);
super.msgCheck(ctx, requestHeader, response);

This comment has been minimized.

@vongosling

vongosling Feb 21, 2017

Member

Why not put msgCheck this precondition method into the first row of the outer method

messageExtBatch.setBornHost(ctx.channel().remoteAddress());
messageExtBatch.setStoreHost(this.getStoreHost());
messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());

This comment has been minimized.

@vongosling

vongosling Feb 21, 2017

Member

Many duplicated code

MessageExtBatch messageExtBatch = new MessageExtBatch();
messageExtBatch.setTopic(newTopic);

This comment has been minimized.

@vongosling

vongosling Feb 21, 2017

Member

Drawback 1 : only support the same topic

@@ -737,6 +742,10 @@ public MQClientInstance getmQClientFactory() {
}
private boolean tryToCompressMessage(final Message msg) {
if (msg instanceof MessageBatch) {

This comment has been minimized.

@vongosling

vongosling Feb 21, 2017

Member

why not support nowadays

import java.util.Iterator;
import java.util.List;
public class MessageBatch extends Message implements Iterable<Message> {

This comment has been minimized.

@vongosling

vongosling Feb 21, 2017

Member

May be, we can consider to implement equals in collection senario

@@ -48,6 +48,8 @@
private Integer reconsumeTimes;
@CFNullable
private boolean unitMode = false;
@CFNullable

This comment has been minimized.

@vongosling

vongosling Feb 21, 2017

Member

Check this annotation works well

@@ -119,6 +129,7 @@ public String toString() {
", storeTimestamp=" + storeTimestamp +
", logicsOffset=" + logicsOffset +
", pagecacheRT=" + pagecacheRT +

This comment has been minimized.

@vongosling

vongosling Feb 21, 2017

Member

Consider apache commons lang toStringBuilder method to reflect fields

@@ -331,7 +331,7 @@ public long getMinOffsetInQueue() {
public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, long storeTimestamp,
long logicOffset) {
final int maxRetries = 30;
boolean canWrite = this.defaultMessageStore.getRunningFlags().isWriteable();
boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();

This comment has been minimized.

@vongosling

vongosling Feb 21, 2017

Member

what is isCQWriteable,IMO, it is not a good Naming Conventions

This comment has been minimized.

@Jaskey

Jaskey Mar 2, 2017

Contributor

isConsumeQWritable will be better I think

This comment has been minimized.

@dongeforever

dongeforever Mar 6, 2017

Member

@Jaskey Yeah, it is better to use the name isConsumeQWritable

@zhouxinyu

This comment has been minimized.

Member

zhouxinyu commented Feb 24, 2017

Hi, @shroman @Jaskey @lizhanhui , what's your opinion about this updated PR?

@coveralls

This comment has been minimized.

coveralls commented Mar 2, 2017

Coverage Status

Coverage increased (+0.3%) to 31.791% when pulling eb1a5e7 on dongeforever:ROCKETMQ-80 into 573b22c on apache:master.

2 similar comments
@coveralls

This comment has been minimized.

coveralls commented Mar 2, 2017

Coverage Status

Coverage increased (+0.3%) to 31.791% when pulling eb1a5e7 on dongeforever:ROCKETMQ-80 into 573b22c on apache:master.

@coveralls

This comment has been minimized.

coveralls commented Mar 2, 2017

Coverage Status

Coverage increased (+0.3%) to 31.791% when pulling eb1a5e7 on dongeforever:ROCKETMQ-80 into 573b22c on apache:master.

@dongeforever dongeforever changed the base branch from master to develop Mar 10, 2017

@coveralls

This comment has been minimized.

coveralls commented Mar 11, 2017

Coverage Status

Coverage increased (+0.9%) to 31.863% when pulling 854d469 on dongeforever:ROCKETMQ-80 into d7decc8 on apache:develop.

2 similar comments
@coveralls

This comment has been minimized.

coveralls commented Mar 11, 2017

Coverage Status

Coverage increased (+0.9%) to 31.863% when pulling 854d469 on dongeforever:ROCKETMQ-80 into d7decc8 on apache:develop.

@coveralls

This comment has been minimized.

coveralls commented Mar 11, 2017

Coverage Status

Coverage increased (+0.9%) to 31.863% when pulling 854d469 on dongeforever:ROCKETMQ-80 into d7decc8 on apache:develop.

asfgit pushed a commit that referenced this pull request Mar 17, 2017

@dongeforever

This comment has been minimized.

Member

dongeforever commented Mar 17, 2017

this PR has been merged into develop branch

@vongosling @zhouxinyu

@vongosling

This comment has been minimized.

Member

vongosling commented Mar 17, 2017

LGTM~

asfgit pushed a commit that referenced this pull request Jun 6, 2017

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment