From ea0ed9f5334fd95f64d7313c977633bb744f4959 Mon Sep 17 00:00:00 2001 From: yuanbo Date: Mon, 30 Nov 2020 22:15:19 +0800 Subject: [PATCH 1/3] [TUBEMQ-433] add tubemq perf-consumer/producer scripts --- bin/tubemq-console-consumer.sh | 40 +++++++ bin/tubemq-console-producer.sh | 40 +++++++ tubemq-example/pom.xml | 10 ++ tubemq-example/src/main/assembly/assembly.xml | 3 + .../tubemq/example/ArgsParserHelper.java | 48 ++++++++ .../example/MAMessageProducerExample.java | 94 ++++++++++----- .../example/MessageConsumerExample.java | 113 ++++++++++++------ 7 files changed, 278 insertions(+), 70 deletions(-) create mode 100644 bin/tubemq-console-consumer.sh create mode 100644 bin/tubemq-console-producer.sh create mode 100644 tubemq-example/src/main/java/org/apache/tubemq/example/ArgsParserHelper.java diff --git a/bin/tubemq-console-consumer.sh b/bin/tubemq-console-consumer.sh new file mode 100644 index 00000000000..54189a250d6 --- /dev/null +++ b/bin/tubemq-console-consumer.sh @@ -0,0 +1,40 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +if [ -z "$BASE_DIR" ] ; then + PRG="$0" + + # need this for relative symlinks + while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG="`dirname "$PRG"`/$link" + fi + done + BASE_DIR=`dirname "$PRG"`/.. + + # make it fully qualified + BASE_DIR=`cd "$BASE_DIR" && pwd` + #echo "TubeMQ master is at $BASE_DIR" +fi +source $BASE_DIR/bin/env.sh +$JAVA $TOOLS_ARGS org.apache.tubemq.example.MessageConsumerExample $@ diff --git a/bin/tubemq-console-producer.sh b/bin/tubemq-console-producer.sh new file mode 100644 index 00000000000..ed76c3a3019 --- /dev/null +++ b/bin/tubemq-console-producer.sh @@ -0,0 +1,40 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +if [ -z "$BASE_DIR" ] ; then + PRG="$0" + + # need this for relative symlinks + while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG="`dirname "$PRG"`/$link" + fi + done + BASE_DIR=`dirname "$PRG"`/.. + + # make it fully qualified + BASE_DIR=`cd "$BASE_DIR" && pwd` + #echo "TubeMQ master is at $BASE_DIR" +fi +source $BASE_DIR/bin/env.sh +$JAVA $TOOLS_ARGS org.apache.tubemq.example.MAMessageProducerExample $@ diff --git a/tubemq-example/pom.xml b/tubemq-example/pom.xml index 1095f7be58c..47aa92168fb 100644 --- a/tubemq-example/pom.xml +++ b/tubemq-example/pom.xml @@ -65,6 +65,16 @@ org.apache.tubemq tubemq-client + + commons-cli + commons-cli + + + junit + junit + ${junit.version} + test + diff --git a/tubemq-example/src/main/assembly/assembly.xml b/tubemq-example/src/main/assembly/assembly.xml index 11edd3274b7..659125d7ede 100644 --- a/tubemq-example/src/main/assembly/assembly.xml +++ b/tubemq-example/src/main/assembly/assembly.xml @@ -32,6 +32,9 @@ ../ ./conf/tools.log4j.properties + ./bin/tubemq-console-consumer.sh + ./bin/tubemq-console-producer.sh + ./bin/env.sh LICENSE NOTICE DISCLAIMER-WIP diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/ArgsParserHelper.java b/tubemq-example/src/main/java/org/apache/tubemq/example/ArgsParserHelper.java new file mode 100644 index 00000000000..6e45a0c3c8a --- /dev/null +++ b/tubemq-example/src/main/java/org/apache/tubemq/example/ArgsParserHelper.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tubemq.example; + +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; + +public class ArgsParserHelper { + + /** + * Print help information and exit. + * + * @param opts - options + */ + public static void help(String commandName, Options opts) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(commandName, opts); + System.exit(0); + } + + /** + * Init common options when parsing args. + * @return - options + */ + public static Options initCommonOptions() { + Options options = new Options(); + options.addOption("help", false, "show help"); + options.addOption("master", true, "master address like: 127.0.0.1:8000"); + options.addOption("topics", true, "topic list, topic1,topic2 or " + + "topic1:tid11;tid12,topic2:tid21;tid22(consumer only)"); + return options; + } +} diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java index f76b077486b..9f9079e8cd5 100644 --- a/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java +++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java @@ -30,8 +30,13 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; import org.apache.commons.codec.binary.StringUtils; import org.apache.tubemq.client.config.TubeClientConfig; import org.apache.tubemq.client.exception.TubeClientException; @@ -89,45 +94,70 @@ public MAMessageProducerExample(String masterHostAndPort) throws Exception { } } - public static void main(String[] args) { - final String masterHostAndPort = args[0]; - - final String topics = args[1]; - final List topicList = Arrays.asList(topics.split(",")); - - topicSet = new TreeSet<>(topicList); - - msgCount = Integer.parseInt(args[2]); - producerCount = Math.min(args.length > 4 ? Integer.parseInt(args[3]) : 10, MAX_PRODUCER_NUM); - - logger.info("MAMessageProducerExample.main started..."); + /** + * Init options + * + * @return options + */ + public static Options initOptions() { + Options options = ArgsParserHelper.initCommonOptions(); + options.addOption("count", false, "producer count"); + options.addOption("thread", false, "thread number of producers"); + return options; + } - final byte[] transmitData = StringUtils.getBytesUtf8("This is a test message from multi-session factory."); - final ByteBuffer dataBuffer = ByteBuffer.allocate(1024); + public static void main(String[] args) { + Options options = null; + try { + CommandLineParser parser = new PosixParser(); + options = initOptions(); + CommandLine cl = parser.parse(options, args); + if (cl != null) { + final String masterHostAndPort = cl.getOptionValue("master"); + final String topics = cl.getOptionValue("topics"); + final List topicList = Arrays.asList(topics.split(",")); + topicSet = new TreeSet<>(topicList); + + msgCount = Integer.parseInt(cl.getOptionValue("count")); + producerCount = Math.min(Integer.parseInt(cl.getOptionValue( + "thread", "1")), MAX_PRODUCER_NUM); + logger.info("MAMessageProducerExample.main started..."); + final byte[] transmitData = StringUtils + .getBytesUtf8("This is a test message from multi-session factory."); + final ByteBuffer dataBuffer = ByteBuffer.allocate(1024); + + while (dataBuffer.hasRemaining()) { + int offset = dataBuffer.arrayOffset(); + dataBuffer.put(transmitData, offset, + Math.min(dataBuffer.remaining(), transmitData.length)); + } - while (dataBuffer.hasRemaining()) { - int offset = dataBuffer.arrayOffset(); - dataBuffer.put(transmitData, offset, Math.min(dataBuffer.remaining(), transmitData.length)); - } + dataBuffer.flip(); + sendData = dataBuffer.array(); - dataBuffer.flip(); - sendData = dataBuffer.array(); + try { + MAMessageProducerExample messageProducer = new MAMessageProducerExample( + masterHostAndPort); - try { - MAMessageProducerExample messageProducer = new MAMessageProducerExample(masterHostAndPort); + messageProducer.startService(); - messageProducer.startService(); + while (SENT_SUCC_COUNTER.get() < msgCount * producerCount * topicSet.size()) { + TimeUnit.MILLISECONDS.sleep(1000); + } + messageProducer.producerMap.clear(); + messageProducer.shutdown(); - while (SENT_SUCC_COUNTER.get() < msgCount * producerCount * topicSet.size()) { - Thread.sleep(1000); + } catch (TubeClientException e) { + logger.error("TubeClientException: ", e); + } catch (Throwable e) { + logger.error("Throwable: ", e); + } + } + } catch (Exception ex) { + logger.error(ex.getMessage()); + if (options != null) { + ArgsParserHelper.help("./tubemq-console-consumer.sh", options); } - messageProducer.producerMap.clear(); - messageProducer.shutdown(); - - } catch (TubeClientException e) { - logger.error("TubeClientException: ", e); - } catch (Throwable e) { - logger.error("Throwable: ", e); } } diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java index d9aeb8a0e0b..22b1d91bf97 100644 --- a/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java +++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java @@ -26,6 +26,10 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; import org.apache.tubemq.client.common.PeerInfo; import org.apache.tubemq.client.config.ConsumerConfig; import org.apache.tubemq.client.consumer.ConsumePosition; @@ -59,7 +63,6 @@ public final class MessageConsumerExample { private static final MsgRecvStats msgRecvStats = new MsgRecvStats(); private final PushMessageConsumer messageConsumer; - private final MessageSessionFactory messageSessionFactory; public MessageConsumerExample(String masterHostAndPort, String group, int fetchCount) throws Exception { ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group); @@ -67,21 +70,31 @@ public MessageConsumerExample(String masterHostAndPort, String group, int fetchC if (fetchCount > 0) { consumerConfig.setPushFetchThreadCnt(fetchCount); } - this.messageSessionFactory = new TubeSingleSessionFactory(consumerConfig); + MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(consumerConfig); this.messageConsumer = messageSessionFactory.createPushConsumer(consumerConfig); } - public static void main(String[] args) { - final String masterHostAndPort = args[0]; - final String topics = args[1]; - final String group = args[2]; - final int consumerCount = Integer.parseInt(args[3]); - int fetchCount = -1; - if (args.length > 5) { - fetchCount = Integer.parseInt(args[4]); - } - final Map> topicTidsMap = new HashMap<>(); + /** + * Init options + * @return options + */ + public static Options initOptions() { + Options options = ArgsParserHelper.initCommonOptions(); + options.addOption("count", false, "fetch count"); + options.addOption("thread", false, "thread number of consumers"); + options.addOption("group", true, "consumer group"); + return options; + + } + + /** + * init topic->set(tid) map + * @param topics - topics string + * @return - map of topic->set(tid) + */ + private static Map> initTopicList(String topics) { + Map> topicTidsMap = new HashMap<>(); String[] topicTidsList = topics.split(","); for (String topicTids : topicTidsList) { String[] topicTidStr = topicTids.split(":"); @@ -95,35 +108,59 @@ public static void main(String[] args) { } topicTidsMap.put(topicTidStr[0], tids); } - final int startFetchCount = fetchCount; - final ExecutorService executorService = Executors.newFixedThreadPool(fetchCount); - for (int i = 0; i < consumerCount; i++) { - executorService.submit(new Runnable() { - @Override - public void run() { - try { - MessageConsumerExample messageConsumer = new MessageConsumerExample( - masterHostAndPort, - group, - startFetchCount - ); - messageConsumer.subscribe(topicTidsMap); - } catch (Exception e) { - logger.error("Create consumer failed!", e); - } - } - }); - } - final Thread statisticThread = new Thread(msgRecvStats, "Received Statistic Thread"); - statisticThread.start(); + return topicTidsMap; + } - executorService.shutdown(); + public static void main(String[] args) { + Options options = null; try { - executorService.awaitTermination(60 * 1000, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - logger.error("Thread Pool shutdown has been interrupted!"); + CommandLineParser parser = new PosixParser(); + options = initOptions(); + CommandLine cl = parser.parse(options, args); + if (cl != null) { + final String masterHostAndPort = cl.getOptionValue("master"); + final Map> topicTidsMap = initTopicList( + cl.getOptionValue("topics")); + final String group = cl.getOptionValue("group"); + int threadNum = Integer.parseInt(cl.getOptionValue("thread", "1")); + final int fetchCount = Integer.parseInt(cl.getOptionValue("count", "-1")); + + ExecutorService executorService = Executors.newFixedThreadPool(threadNum); + for (int i = 0; i < threadNum; i++) { + executorService.submit(new Runnable() { + @Override + public void run() { + try { + MessageConsumerExample messageConsumer = new MessageConsumerExample( + masterHostAndPort, + group, + fetchCount + ); + messageConsumer.subscribe(topicTidsMap); + } catch (Exception e) { + logger.error("Create consumer failed!", e); + } + } + }); + } + final Thread statisticThread = new Thread(msgRecvStats, + "Received Statistic Thread"); + statisticThread.start(); + + executorService.shutdown(); + try { + executorService.awaitTermination(60 * 1000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + logger.error("Thread Pool shutdown has been interrupted!"); + } + msgRecvStats.stopStats(); + } + } catch (Exception ex) { + logger.error(ex.getMessage(), ex); + if (options != null) { + ArgsParserHelper.help("./tubemq-console-consumer.sh", options); + } } - msgRecvStats.stopStats(); } public void subscribe(Map> topicTidsMap) throws TubeClientException { From 05d2e8b6157f690b7c69a445b734b4b8f0665471 Mon Sep 17 00:00:00 2001 From: yuanbo Date: Tue, 1 Dec 2020 10:09:50 +0800 Subject: [PATCH 2/3] change producer script name --- .../org/apache/tubemq/example/MAMessageProducerExample.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java index 9f9079e8cd5..57fba30b9d3 100644 --- a/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java +++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java @@ -156,7 +156,7 @@ public static void main(String[] args) { } catch (Exception ex) { logger.error(ex.getMessage()); if (options != null) { - ArgsParserHelper.help("./tubemq-console-consumer.sh", options); + ArgsParserHelper.help("./tubemq-console-producer.sh", options); } } } From f56576108021639c4de06112cc8a2d825ef29fb6 Mon Sep 17 00:00:00 2001 From: yuanbo Date: Wed, 2 Dec 2020 15:45:24 +0800 Subject: [PATCH 3/3] update output of bash command and rename base script names. --- ...nsumer.sh => tubemq-consumer-perf-test.sh} | 0 ...oducer.sh => tubemq-producer-perf-test.sh} | 0 pom.xml | 2 +- tubemq-example/src/main/assembly/assembly.xml | 4 +- .../tubemq/example/ArgsParserHelper.java | 6 +-- .../example/MAMessageProducerExample.java | 28 +++++++------- .../example/MessageConsumerExample.java | 38 +++++++++++-------- .../apache/tubemq/server/tools/ToolUtils.java | 4 +- 8 files changed, 46 insertions(+), 36 deletions(-) rename bin/{tubemq-console-consumer.sh => tubemq-consumer-perf-test.sh} (100%) rename bin/{tubemq-console-producer.sh => tubemq-producer-perf-test.sh} (100%) diff --git a/bin/tubemq-console-consumer.sh b/bin/tubemq-consumer-perf-test.sh similarity index 100% rename from bin/tubemq-console-consumer.sh rename to bin/tubemq-consumer-perf-test.sh diff --git a/bin/tubemq-console-producer.sh b/bin/tubemq-producer-perf-test.sh similarity index 100% rename from bin/tubemq-console-producer.sh rename to bin/tubemq-producer-perf-test.sh diff --git a/pom.xml b/pom.xml index 5a6fe4fba03..490fe152d91 100644 --- a/pom.xml +++ b/pom.xml @@ -312,7 +312,7 @@ commons-cli commons-cli - 1.2 + 1.4 commons-codec diff --git a/tubemq-example/src/main/assembly/assembly.xml b/tubemq-example/src/main/assembly/assembly.xml index 659125d7ede..596af246593 100644 --- a/tubemq-example/src/main/assembly/assembly.xml +++ b/tubemq-example/src/main/assembly/assembly.xml @@ -32,8 +32,8 @@ ../ ./conf/tools.log4j.properties - ./bin/tubemq-console-consumer.sh - ./bin/tubemq-console-producer.sh + ./bin/tubemq-consumer-perf-test.sh + ./bin/tubemq-producer-perf-test.sh ./bin/env.sh LICENSE NOTICE diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/ArgsParserHelper.java b/tubemq-example/src/main/java/org/apache/tubemq/example/ArgsParserHelper.java index 6e45a0c3c8a..c507ae41b09 100644 --- a/tubemq-example/src/main/java/org/apache/tubemq/example/ArgsParserHelper.java +++ b/tubemq-example/src/main/java/org/apache/tubemq/example/ArgsParserHelper.java @@ -39,9 +39,9 @@ public static void help(String commandName, Options opts) { */ public static Options initCommonOptions() { Options options = new Options(); - options.addOption("help", false, "show help"); - options.addOption("master", true, "master address like: 127.0.0.1:8000"); - options.addOption("topics", true, "topic list, topic1,topic2 or " + options.addOption(null, "help", false, "show help"); + options.addOption(null, "master-list", true, "master address like: host1:8000,host2:8000"); + options.addOption(null, "topic", true, "topic list, topic1,topic2 or " + "topic1:tid11;tid12,topic2:tid21;tid22(consumer only)"); return options; } diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java index 57fba30b9d3..9c76ce1f1ab 100644 --- a/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java +++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java @@ -35,8 +35,8 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.Options; -import org.apache.commons.cli.PosixParser; import org.apache.commons.codec.binary.StringUtils; import org.apache.tubemq.client.config.TubeClientConfig; import org.apache.tubemq.client.exception.TubeClientException; @@ -64,7 +64,7 @@ public class MAMessageProducerExample { private static final int SESSION_FACTORY_NUM = 10; private static Set topicSet; - private static int msgCount; + private static int batchCount; private static int producerCount; private static byte[] sendData; @@ -101,30 +101,32 @@ public MAMessageProducerExample(String masterHostAndPort) throws Exception { */ public static Options initOptions() { Options options = ArgsParserHelper.initCommonOptions(); - options.addOption("count", false, "producer count"); - options.addOption("thread", false, "thread number of producers"); + options.addOption(null, "batch-size", true, "number of messages in single batch, default is 100000"); + options.addOption(null, "max-batch", true, "max batch number, default is 1024"); + options.addOption(null, "thread-num", true, "thread number of producers, default is 1, max is 100"); return options; } public static void main(String[] args) { Options options = null; try { - CommandLineParser parser = new PosixParser(); + CommandLineParser parser = new DefaultParser(); options = initOptions(); CommandLine cl = parser.parse(options, args); if (cl != null) { - final String masterHostAndPort = cl.getOptionValue("master"); - final String topics = cl.getOptionValue("topics"); + final String masterHostAndPort = cl.getOptionValue("master-list"); + final String topics = cl.getOptionValue("topic"); final List topicList = Arrays.asList(topics.split(",")); topicSet = new TreeSet<>(topicList); - msgCount = Integer.parseInt(cl.getOptionValue("count")); + batchCount = Integer.parseInt(cl.getOptionValue("max-batch", "100000")); + int batchSize = Integer.parseInt(cl.getOptionValue("batch-size", "1024")); producerCount = Math.min(Integer.parseInt(cl.getOptionValue( - "thread", "1")), MAX_PRODUCER_NUM); + "thread-num", "1")), MAX_PRODUCER_NUM); logger.info("MAMessageProducerExample.main started..."); final byte[] transmitData = StringUtils .getBytesUtf8("This is a test message from multi-session factory."); - final ByteBuffer dataBuffer = ByteBuffer.allocate(1024); + final ByteBuffer dataBuffer = ByteBuffer.allocate(batchSize); while (dataBuffer.hasRemaining()) { int offset = dataBuffer.arrayOffset(); @@ -141,7 +143,7 @@ public static void main(String[] args) { messageProducer.startService(); - while (SENT_SUCC_COUNTER.get() < msgCount * producerCount * topicSet.size()) { + while (SENT_SUCC_COUNTER.get() < (long) batchCount * producerCount * topicSet.size()) { TimeUnit.MILLISECONDS.sleep(1000); } messageProducer.producerMap.clear(); @@ -156,7 +158,7 @@ public static void main(String[] args) { } catch (Exception ex) { logger.error(ex.getMessage()); if (options != null) { - ArgsParserHelper.help("./tubemq-console-producer.sh", options); + ArgsParserHelper.help("./tubemq-producer-perf-test.sh", options); } } } @@ -203,7 +205,7 @@ public void run() { } catch (Throwable t) { logger.error("publish exception: ", t); } - for (int i = 0; i < msgCount; i++) { + for (int i = 0; i < batchCount; i++) { long millis = System.currentTimeMillis(); for (String topic : topicSet) { try { diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java index 22b1d91bf97..3b999430b79 100644 --- a/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java +++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java @@ -28,8 +28,8 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.Options; -import org.apache.commons.cli.PosixParser; import org.apache.tubemq.client.common.PeerInfo; import org.apache.tubemq.client.config.ConsumerConfig; import org.apache.tubemq.client.consumer.ConsumePosition; @@ -64,9 +64,14 @@ public final class MessageConsumerExample { private final PushMessageConsumer messageConsumer; - public MessageConsumerExample(String masterHostAndPort, String group, int fetchCount) throws Exception { + public MessageConsumerExample(String masterHostAndPort, String group, + int fetchCount, boolean isFromBegin) throws Exception { ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group); - consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET); + if (isFromBegin) { + consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_FIRST_OFFSET); + } else { + consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET); + } if (fetchCount > 0) { consumerConfig.setPushFetchThreadCnt(fetchCount); } @@ -81,9 +86,11 @@ public MessageConsumerExample(String masterHostAndPort, String group, int fetchC public static Options initOptions() { Options options = ArgsParserHelper.initCommonOptions(); - options.addOption("count", false, "fetch count"); - options.addOption("thread", false, "thread number of consumers"); - options.addOption("group", true, "consumer group"); + options.addOption(null, "batch-size", true, "max number of fetching message in one batch"); + options.addOption(null, "thread-num", true, "thread number of consumers"); + options.addOption(null, "group", true, "consumer group"); + options.addOption(null, "from-begin", false, "default is consuming from latest, " + + "if option is clarified, then consume from begin"); return options; } @@ -114,17 +121,17 @@ private static Map> initTopicList(String topics) { public static void main(String[] args) { Options options = null; try { - CommandLineParser parser = new PosixParser(); + CommandLineParser parser = new DefaultParser(); options = initOptions(); CommandLine cl = parser.parse(options, args); if (cl != null) { - final String masterHostAndPort = cl.getOptionValue("master"); + final String masterHostAndPort = cl.getOptionValue("master-list"); final Map> topicTidsMap = initTopicList( - cl.getOptionValue("topics")); + cl.getOptionValue("topic")); final String group = cl.getOptionValue("group"); - int threadNum = Integer.parseInt(cl.getOptionValue("thread", "1")); - final int fetchCount = Integer.parseInt(cl.getOptionValue("count", "-1")); - + int threadNum = Integer.parseInt(cl.getOptionValue("thread-num", "1")); + final int fetchCount = Integer.parseInt(cl.getOptionValue("batch-size", "-1")); + final boolean isFromBegin = cl.hasOption("from-begin"); ExecutorService executorService = Executors.newFixedThreadPool(threadNum); for (int i = 0; i < threadNum; i++) { executorService.submit(new Runnable() { @@ -134,7 +141,8 @@ public void run() { MessageConsumerExample messageConsumer = new MessageConsumerExample( masterHostAndPort, group, - fetchCount + fetchCount, + isFromBegin ); messageConsumer.subscribe(topicTidsMap); } catch (Exception e) { @@ -156,9 +164,9 @@ public void run() { msgRecvStats.stopStats(); } } catch (Exception ex) { - logger.error(ex.getMessage(), ex); + logger.error(ex.getMessage(), ex.getMessage()); if (options != null) { - ArgsParserHelper.help("./tubemq-console-consumer.sh", options); + ArgsParserHelper.help("./tubemq-consumer-perf-test.sh", options); } } } diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/ToolUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/ToolUtils.java index d2c503a5576..9c343a8fdf9 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/ToolUtils.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/ToolUtils.java @@ -19,10 +19,10 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; -import org.apache.commons.cli.PosixParser; import org.apache.tubemq.corebase.utils.TStringUtils; import org.apache.tubemq.server.broker.BrokerConfig; import org.apache.tubemq.server.broker.exception.StartupException; @@ -56,7 +56,7 @@ public static String getConfigFilePath(final String[] args) throws StartupExcept final Options options = new Options(); final Option file = new Option("f", true, "configuration file path"); options.addOption(file); - final CommandLineParser parser = new PosixParser(); + final CommandLineParser parser = new DefaultParser(); CommandLine line = null; try { line = parser.parse(options, args);