Skip to content

[Issue#94] Support send batch message to broker#95

Merged
ShannonDing merged 5 commits intoapache:masterfrom
jonnxu:master
Mar 14, 2019
Merged

[Issue#94] Support send batch message to broker#95
ShannonDing merged 5 commits intoapache:masterfrom
jonnxu:master

Conversation

@jonnxu
Copy link
Contributor

@jonnxu jonnxu commented Feb 9, 2019

What is the purpose of the change

#94 Support send batch message to broker

Brief changelog

Add two new APIs:
SendResult send(std::vector& msgs);
SendResult send(std::vector& msgs, const MQMessageQueue& mq);

Verifying this change

Producer send one batch message:

./bin/BatchProducer -t batch -m 1 -c test2222 -T 1 -n 127.0.0.1:9876 -g g1 nameserver: 101.132.96.164:9876
topic: batch
groupname: g1
produce content: test2222msgbody for test
msg count: 1
thread count: 1
tps_thread_ is null
send RT more than: 2807 ms with msgid:
per msg time: 2807ms
========================finished==============================

consumer receive three message:

./bin/PushConsumer -t batch -n 127.0.0.1:9876 -g g0 -m 10000 -T 1
nameserver: 127.0.0.1:9876
topic: batch
groupname: g0
produce content: msgbody for test
msg count: 10000
thread count: 1
tps_thread_ is null
msg: MessageExt [queueId=0, storeSize=196, queueOffset=48, sysFlag=0, bornTimestamp=1549729233501, bornHost=127.0.0.1, storeTimestamp=1549729234903, storeHost=127.0.0.1, msgId=730010AC000083060000E06492330100, commitLogOffset=365601006, bodyCRC=1949078420, reconsumeTimes=0, preparedTransactionOffset=0, Message [topic=batch, flag=0, tag=]]
body: test2222msgbody for test
TAGS : *
UNIQ_KEY : 730010AC000083060000E06492330100
WAIT : true
property1 : value1
msg: MessageExt [queueId=0, storeSize=213, queueOffset=49, sysFlag=0, bornTimestamp=1549729233501, bornHost=127.0.0.1, storeTimestamp=1549729234903, storeHost=127.0.0.1, msgId=730010AC000083060000E06492330200, commitLogOffset=365601202, bodyCRC=1949078420, reconsumeTimes=0, preparedTransactionOffset=0, Message [topic=batch, flag=0, tag=
]]
body: test2222msgbody for test
TAGS : *
UNIQ_KEY : 730010AC000083060000E06492330200
WAIT : true
property1 : value1
property2 : value2
msg: MessageExt [queueId=0, storeSize=230, queueOffset=50, sysFlag=0, bornTimestamp=1549729233501, bornHost=127.0.0.1, storeTimestamp=1549729234903, storeHost=127.0.0.1, msgId=730010AC000083060000E06492330300, commitLogOffset=365601415, bodyCRC=1949078420, reconsumeTimes=0, preparedTransactionOffset=0, Message [topic=batch, flag=0, tag=*]]
body: test2222msgbody for test
TAGS : *
UNIQ_KEY : 730010AC000083060000E06492330300
WAIT : true
property1 : value1
property2 : value2
property3 : value3

Follow this checklist to help us incorporate your contribution quickly and easily. Notice, it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR.

  • Make sure there is a Github issue filed for the change (usually before you start working on it). Trivial changes like typos do not require a Github issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue.
  • Format the pull request title like [ISSUE #123] Fix UnknownException when host config not exist. Each commit in the pull request should have a meaningful subject line and body.
  • Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
  • Write necessary unit-test(over 80% coverage) to verify your logic correction, more mock a little better when a cross-module dependency exists.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

@ShannonDing ShannonDing added this to the 1.2.2 milestone Feb 12, 2019
Copy link
Contributor

@jovany-wang jovany-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some minor comments.
I'll take a look at this PR again in this weekend.

Another suggestion, could we use the OO in this example? So that we shouldn't define some global variable like g_mutex, etc. How do you think of this?

* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <stdio.h>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A new line after copyright?

#include <mutex>
#include <thread>
#include <vector>
#include "common.h"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A new line before line28?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think need a new line between header files

#include "common.h"

using namespace rocketmq;
using namespace std;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to remove this using namespace std; statement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it doesn't matter for cpp file


using namespace rocketmq;
using namespace std;
boost::atomic<bool> g_quit;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use std::atomic instead of boost::atomic.

Or do you have any other consideration?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copy from another sample

msgs.push_back(msg2);
msgs.push_back(msg3);
try {
auto start = std::chrono::system_clock::now();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
auto start = std::chrono::system_clock::now();
auto start = UtilAll::currentTimeMillis();

auto duration =
std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
if (duration.count() >= 500) {
std::cout << "send RT more than: " << duration.count()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use log instead of std::cout

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i suggest use std::cout or printf for samples because it can see the output in screen at once

static std::string encode(MQMessage &message);
};
}
#endif No newline at end of file
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a new line at the end of this line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@ShannonDing ShannonDing merged commit dcba721 into apache:master Mar 14, 2019
@ShannonDing ShannonDing added the enhancement New feature or request label Mar 14, 2019
@messense
Copy link
Member

Any plan on adding send batch message APIs to C APIs?

ifplusor pushed a commit to ifplusor/rocketmq-client-cpp that referenced this pull request Mar 15, 2019
ifplusor pushed a commit to ifplusor/rocketmq-client-cpp that referenced this pull request Mar 15, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants