Skip to content

Commit

Permalink
[ISSUE apache#3284]Optimize the buildMessage method (apache#3324)
Browse files Browse the repository at this point in the history
* [ISSUE apache#3284]Optimize the buildMessage method

Co-authored-by: zhangjidi2016 <zhangjidi@cmss.chinamobile.com>
  • Loading branch information
zhangjidi2016 and zhangjidi2016 committed Sep 8, 2021
1 parent 29cfb04 commit 14b0248
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.rocketmq.example.benchmark;

import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -33,6 +33,7 @@
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
Expand All @@ -44,13 +45,14 @@
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.srvutil.ServerUtil;

public class BatchProducer {

public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
private static byte[] msgBody;

public static void main(String[] args) throws MQClientException {

Options options = ServerUtil.buildCommandlineOptions(new Options());
CommandLine commandLine = ServerUtil.parseCmdLine("benchmarkBatchProducer", args, buildCommandlineOptions(options), new PosixParser());
Expand All @@ -74,6 +76,12 @@ public static void main(String[] args) throws MQClientException, UnsupportedEnco
System.out.printf("topic: %s threadCount: %d messageSize: %d batchSize: %d keyEnable: %s propertySize: %d tagCount: %d traceEnable: %s aclEnable: %s%n",
topic, threadCount, messageSize, batchSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable);

StringBuilder sb = new StringBuilder(messageSize);
for (int i = 0; i < messageSize; i++) {
sb.append(RandomStringUtils.randomAlphanumeric(1));
}
msgBody = sb.toString().getBytes(StandardCharsets.UTF_8);

final StatsBenchmarkBatchProducer statsBenchmark = new StatsBenchmarkBatchProducer();
statsBenchmark.start();

Expand All @@ -87,14 +95,7 @@ public static void main(String[] args) throws MQClientException, UnsupportedEnco
@Override
public void run() {
while (true) {
List<Message> msgs;

try {
msgs = buildBathMessage(batchSize, messageSize, topic);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return;
}
List<Message> msgs = buildBathMessage(batchSize, topic);

if (CollectionUtils.isEmpty(msgs)) {
return;
Expand Down Expand Up @@ -236,23 +237,12 @@ private static boolean getOptionValue(CommandLine commandLine, char key, boolean
return defaultValue;
}

private static List<Message> buildBathMessage(int batchSize, int messageSize,
String topic) throws UnsupportedEncodingException {
private static List<Message> buildBathMessage(final int batchSize, final String topic) {
List<Message> batchMessage = new ArrayList<>(batchSize);

for (int i = 0; i < batchSize; i++) {
Message msg = new Message();
msg.setTopic(topic);

StringBuilder sb = new StringBuilder();
for (int j = 0; j < messageSize; j += 10) {
sb.append("hello baby");
}

msg.setBody(sb.toString().getBytes(RemotingHelper.DEFAULT_CHARSET));
Message msg = new Message(topic, msgBody);
batchMessage.add(msg);
}

return batchMessage;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
*/
package org.apache.rocketmq.example.benchmark;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
Expand All @@ -29,11 +31,9 @@
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.srvutil.ServerUtil;

import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.Random;
Expand All @@ -47,7 +47,9 @@

public class Producer {

public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
private static byte[] msgBody;

public static void main(String[] args) throws MQClientException {

Options options = ServerUtil.buildCommandlineOptions(new Options());
CommandLine commandLine = ServerUtil.parseCmdLine("benchmarkProducer", args, buildCommandlineOptions(options), new PosixParser());
Expand All @@ -70,6 +72,12 @@ public static void main(String[] args) throws MQClientException, UnsupportedEnco
System.out.printf("topic: %s threadCount: %d messageSize: %d keyEnable: %s propertySize: %d tagCount: %d traceEnable: %s aclEnable: %s messageQuantity: %d%n delayEnable: %s%n delayLevel: %s%n",
topic, threadCount, messageSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable, messageNum, delayEnable, delayLevel);

StringBuilder sb = new StringBuilder(messageSize);
for (int i = 0; i < messageSize; i++) {
sb.append(RandomStringUtils.randomAlphanumeric(1));
}
msgBody = sb.toString().getBytes(StandardCharsets.UTF_8);

final InternalLogger log = ClientLogger.getLog();

final ExecutorService sendThreadPool = Executors.newFixedThreadPool(threadCount);
Expand Down Expand Up @@ -142,13 +150,7 @@ public void run() {
int num = 0;
while (true) {
try {
final Message msg;
try {
msg = buildMessage(messageSize, topic);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return;
}
final Message msg = buildMessage(topic);
final long beginTimestamp = System.currentTimeMillis();
if (keyEnable) {
msg.setKeys(String.valueOf(beginTimestamp / 1000));
Expand Down Expand Up @@ -290,18 +292,8 @@ public static Options buildCommandlineOptions(final Options options) {
return options;
}

private static Message buildMessage(final int messageSize, final String topic) throws UnsupportedEncodingException {
Message msg = new Message();
msg.setTopic(topic);

StringBuilder sb = new StringBuilder(messageSize);
for (int i = 0; i < messageSize; i += 10) {
sb.append("hello baby");
}

msg.setBody(sb.toString().getBytes(RemotingHelper.DEFAULT_CHARSET));

return msg;
private static Message buildMessage(final String topic) {
return new Message(topic, msgBody);
}

private static void doPrintStats(final LinkedList<Long[]> snapshotList, final StatsBenchmarkProducer statsBenchmark, boolean done) {
Expand Down

0 comments on commit 14b0248

Please sign in to comment.