./conf/tools.log4j.properties
+ ./bin/tubemq-consumer-perf-test.sh
+ ./bin/tubemq-producer-perf-test.sh
+ ./bin/env.sh
LICENSE
NOTICE
DISCLAIMER-WIP
diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/ArgsParserHelper.java b/tubemq-example/src/main/java/org/apache/tubemq/example/ArgsParserHelper.java
new file mode 100644
index 00000000000..c507ae41b09
--- /dev/null
+++ b/tubemq-example/src/main/java/org/apache/tubemq/example/ArgsParserHelper.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.example;
+
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+
+public class ArgsParserHelper {
+
+ /**
+ * Print help information and exit.
+ *
+ * @param opts - options
+ */
+ public static void help(String commandName, Options opts) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp(commandName, opts);
+ System.exit(0);
+ }
+
+ /**
+ * Init common options when parsing args.
+ * @return - options
+ */
+ public static Options initCommonOptions() {
+ Options options = new Options();
+ options.addOption(null, "help", false, "show help");
+ options.addOption(null, "master-list", true, "master address like: host1:8000,host2:8000");
+ options.addOption(null, "topic", true, "topic list, topic1,topic2 or "
+ + "topic1:tid11;tid12,topic2:tid21;tid22(consumer only)");
+ return options;
+ }
+}
diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java
index f76b077486b..9c76ce1f1ab 100644
--- a/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java
+++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java
@@ -30,8 +30,13 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Options;
import org.apache.commons.codec.binary.StringUtils;
import org.apache.tubemq.client.config.TubeClientConfig;
import org.apache.tubemq.client.exception.TubeClientException;
@@ -59,7 +64,7 @@ public class MAMessageProducerExample {
private static final int SESSION_FACTORY_NUM = 10;
private static Set topicSet;
- private static int msgCount;
+ private static int batchCount;
private static int producerCount;
private static byte[] sendData;
@@ -89,45 +94,72 @@ public MAMessageProducerExample(String masterHostAndPort) throws Exception {
}
}
- public static void main(String[] args) {
- final String masterHostAndPort = args[0];
-
- final String topics = args[1];
- final List topicList = Arrays.asList(topics.split(","));
-
- topicSet = new TreeSet<>(topicList);
-
- msgCount = Integer.parseInt(args[2]);
- producerCount = Math.min(args.length > 4 ? Integer.parseInt(args[3]) : 10, MAX_PRODUCER_NUM);
-
- logger.info("MAMessageProducerExample.main started...");
+ /**
+ * Init options
+ *
+ * @return options
+ */
+ public static Options initOptions() {
+ Options options = ArgsParserHelper.initCommonOptions();
+ options.addOption(null, "batch-size", true, "number of messages in single batch, default is 100000");
+ options.addOption(null, "max-batch", true, "max batch number, default is 1024");
+ options.addOption(null, "thread-num", true, "thread number of producers, default is 1, max is 100");
+ return options;
+ }
- final byte[] transmitData = StringUtils.getBytesUtf8("This is a test message from multi-session factory.");
- final ByteBuffer dataBuffer = ByteBuffer.allocate(1024);
+ public static void main(String[] args) {
+ Options options = null;
+ try {
+ CommandLineParser parser = new DefaultParser();
+ options = initOptions();
+ CommandLine cl = parser.parse(options, args);
+ if (cl != null) {
+ final String masterHostAndPort = cl.getOptionValue("master-list");
+ final String topics = cl.getOptionValue("topic");
+ final List topicList = Arrays.asList(topics.split(","));
+ topicSet = new TreeSet<>(topicList);
+
+ batchCount = Integer.parseInt(cl.getOptionValue("max-batch", "100000"));
+ int batchSize = Integer.parseInt(cl.getOptionValue("batch-size", "1024"));
+ producerCount = Math.min(Integer.parseInt(cl.getOptionValue(
+ "thread-num", "1")), MAX_PRODUCER_NUM);
+ logger.info("MAMessageProducerExample.main started...");
+ final byte[] transmitData = StringUtils
+ .getBytesUtf8("This is a test message from multi-session factory.");
+ final ByteBuffer dataBuffer = ByteBuffer.allocate(batchSize);
+
+ while (dataBuffer.hasRemaining()) {
+ int offset = dataBuffer.arrayOffset();
+ dataBuffer.put(transmitData, offset,
+ Math.min(dataBuffer.remaining(), transmitData.length));
+ }
- while (dataBuffer.hasRemaining()) {
- int offset = dataBuffer.arrayOffset();
- dataBuffer.put(transmitData, offset, Math.min(dataBuffer.remaining(), transmitData.length));
- }
+ dataBuffer.flip();
+ sendData = dataBuffer.array();
- dataBuffer.flip();
- sendData = dataBuffer.array();
+ try {
+ MAMessageProducerExample messageProducer = new MAMessageProducerExample(
+ masterHostAndPort);
- try {
- MAMessageProducerExample messageProducer = new MAMessageProducerExample(masterHostAndPort);
+ messageProducer.startService();
- messageProducer.startService();
+ while (SENT_SUCC_COUNTER.get() < (long) batchCount * producerCount * topicSet.size()) {
+ TimeUnit.MILLISECONDS.sleep(1000);
+ }
+ messageProducer.producerMap.clear();
+ messageProducer.shutdown();
- while (SENT_SUCC_COUNTER.get() < msgCount * producerCount * topicSet.size()) {
- Thread.sleep(1000);
+ } catch (TubeClientException e) {
+ logger.error("TubeClientException: ", e);
+ } catch (Throwable e) {
+ logger.error("Throwable: ", e);
+ }
+ }
+ } catch (Exception ex) {
+ logger.error(ex.getMessage());
+ if (options != null) {
+ ArgsParserHelper.help("./tubemq-producer-perf-test.sh", options);
}
- messageProducer.producerMap.clear();
- messageProducer.shutdown();
-
- } catch (TubeClientException e) {
- logger.error("TubeClientException: ", e);
- } catch (Throwable e) {
- logger.error("Throwable: ", e);
}
}
@@ -173,7 +205,7 @@ public void run() {
} catch (Throwable t) {
logger.error("publish exception: ", t);
}
- for (int i = 0; i < msgCount; i++) {
+ for (int i = 0; i < batchCount; i++) {
long millis = System.currentTimeMillis();
for (String topic : topicSet) {
try {
diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java
index d9aeb8a0e0b..3b999430b79 100644
--- a/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java
+++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java
@@ -26,6 +26,10 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Options;
import org.apache.tubemq.client.common.PeerInfo;
import org.apache.tubemq.client.config.ConsumerConfig;
import org.apache.tubemq.client.consumer.ConsumePosition;
@@ -59,29 +63,45 @@ public final class MessageConsumerExample {
private static final MsgRecvStats msgRecvStats = new MsgRecvStats();
private final PushMessageConsumer messageConsumer;
- private final MessageSessionFactory messageSessionFactory;
- public MessageConsumerExample(String masterHostAndPort, String group, int fetchCount) throws Exception {
+ public MessageConsumerExample(String masterHostAndPort, String group,
+ int fetchCount, boolean isFromBegin) throws Exception {
ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group);
- consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
+ if (isFromBegin) {
+ consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_FIRST_OFFSET);
+ } else {
+ consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
+ }
if (fetchCount > 0) {
consumerConfig.setPushFetchThreadCnt(fetchCount);
}
- this.messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
+ MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
this.messageConsumer = messageSessionFactory.createPushConsumer(consumerConfig);
}
- public static void main(String[] args) {
- final String masterHostAndPort = args[0];
- final String topics = args[1];
- final String group = args[2];
- final int consumerCount = Integer.parseInt(args[3]);
- int fetchCount = -1;
- if (args.length > 5) {
- fetchCount = Integer.parseInt(args[4]);
- }
- final Map> topicTidsMap = new HashMap<>();
+ /**
+ * Init options
+ * @return options
+ */
+ public static Options initOptions() {
+
+ Options options = ArgsParserHelper.initCommonOptions();
+ options.addOption(null, "batch-size", true, "max number of fetching message in one batch");
+ options.addOption(null, "thread-num", true, "thread number of consumers");
+ options.addOption(null, "group", true, "consumer group");
+ options.addOption(null, "from-begin", false, "default is consuming from latest, "
+ + "if option is clarified, then consume from begin");
+ return options;
+ }
+
+ /**
+ * init topic->set(tid) map
+ * @param topics - topics string
+ * @return - map of topic->set(tid)
+ */
+ private static Map> initTopicList(String topics) {
+ Map> topicTidsMap = new HashMap<>();
String[] topicTidsList = topics.split(",");
for (String topicTids : topicTidsList) {
String[] topicTidStr = topicTids.split(":");
@@ -95,35 +115,60 @@ public static void main(String[] args) {
}
topicTidsMap.put(topicTidStr[0], tids);
}
- final int startFetchCount = fetchCount;
- final ExecutorService executorService = Executors.newFixedThreadPool(fetchCount);
- for (int i = 0; i < consumerCount; i++) {
- executorService.submit(new Runnable() {
- @Override
- public void run() {
- try {
- MessageConsumerExample messageConsumer = new MessageConsumerExample(
- masterHostAndPort,
- group,
- startFetchCount
- );
- messageConsumer.subscribe(topicTidsMap);
- } catch (Exception e) {
- logger.error("Create consumer failed!", e);
- }
- }
- });
- }
- final Thread statisticThread = new Thread(msgRecvStats, "Received Statistic Thread");
- statisticThread.start();
+ return topicTidsMap;
+ }
- executorService.shutdown();
+ public static void main(String[] args) {
+ Options options = null;
try {
- executorService.awaitTermination(60 * 1000, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- logger.error("Thread Pool shutdown has been interrupted!");
+ CommandLineParser parser = new DefaultParser();
+ options = initOptions();
+ CommandLine cl = parser.parse(options, args);
+ if (cl != null) {
+ final String masterHostAndPort = cl.getOptionValue("master-list");
+ final Map> topicTidsMap = initTopicList(
+ cl.getOptionValue("topic"));
+ final String group = cl.getOptionValue("group");
+ int threadNum = Integer.parseInt(cl.getOptionValue("thread-num", "1"));
+ final int fetchCount = Integer.parseInt(cl.getOptionValue("batch-size", "-1"));
+ final boolean isFromBegin = cl.hasOption("from-begin");
+ ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
+ for (int i = 0; i < threadNum; i++) {
+ executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ MessageConsumerExample messageConsumer = new MessageConsumerExample(
+ masterHostAndPort,
+ group,
+ fetchCount,
+ isFromBegin
+ );
+ messageConsumer.subscribe(topicTidsMap);
+ } catch (Exception e) {
+ logger.error("Create consumer failed!", e);
+ }
+ }
+ });
+ }
+ final Thread statisticThread = new Thread(msgRecvStats,
+ "Received Statistic Thread");
+ statisticThread.start();
+
+ executorService.shutdown();
+ try {
+ executorService.awaitTermination(60 * 1000, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ logger.error("Thread Pool shutdown has been interrupted!");
+ }
+ msgRecvStats.stopStats();
+ }
+ } catch (Exception ex) {
+ logger.error(ex.getMessage(), ex.getMessage());
+ if (options != null) {
+ ArgsParserHelper.help("./tubemq-consumer-perf-test.sh", options);
+ }
}
- msgRecvStats.stopStats();
}
public void subscribe(Map> topicTidsMap) throws TubeClientException {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/ToolUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/ToolUtils.java
index d2c503a5576..9c343a8fdf9 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/ToolUtils.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/ToolUtils.java
@@ -19,10 +19,10 @@
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
-import org.apache.commons.cli.PosixParser;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.broker.BrokerConfig;
import org.apache.tubemq.server.broker.exception.StartupException;
@@ -56,7 +56,7 @@ public static String getConfigFilePath(final String[] args) throws StartupExcept
final Options options = new Options();
final Option file = new Option("f", true, "configuration file path");
options.addOption(file);
- final CommandLineParser parser = new PosixParser();
+ final CommandLineParser parser = new DefaultParser();
CommandLine line = null;
try {
line = parser.parse(options, args);