Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TUBEMQ-433] add tubemq perf-consumer/producer scripts #330

Merged
merged 3 commits into from
Dec 3, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions bin/tubemq-console-consumer.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#!/bin/bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

if [ -z "$BASE_DIR" ] ; then
PRG="$0"

# need this for relative symlinks
while [ -h "$PRG" ] ; do
ls=`ls -ld "$PRG"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
PRG="`dirname "$PRG"`/$link"
fi
done
BASE_DIR=`dirname "$PRG"`/..

# make it fully qualified
BASE_DIR=`cd "$BASE_DIR" && pwd`
#echo "TubeMQ master is at $BASE_DIR"
fi
source $BASE_DIR/bin/env.sh
$JAVA $TOOLS_ARGS org.apache.tubemq.example.MessageConsumerExample $@
40 changes: 40 additions & 0 deletions bin/tubemq-console-producer.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#!/bin/bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

if [ -z "$BASE_DIR" ] ; then
PRG="$0"

# need this for relative symlinks
while [ -h "$PRG" ] ; do
ls=`ls -ld "$PRG"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
PRG="`dirname "$PRG"`/$link"
fi
done
BASE_DIR=`dirname "$PRG"`/..

# make it fully qualified
BASE_DIR=`cd "$BASE_DIR" && pwd`
#echo "TubeMQ master is at $BASE_DIR"
fi
source $BASE_DIR/bin/env.sh
$JAVA $TOOLS_ARGS org.apache.tubemq.example.MAMessageProducerExample $@
10 changes: 10 additions & 0 deletions tubemq-example/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@
<groupId>org.apache.tubemq</groupId>
<artifactId>tubemq-client</artifactId>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
3 changes: 3 additions & 0 deletions tubemq-example/src/main/assembly/assembly.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
<directory>../</directory>
<includes>
<include>./conf/tools.log4j.properties</include>
<include>./bin/tubemq-console-consumer.sh</include>
<include>./bin/tubemq-console-producer.sh</include>
<include>./bin/env.sh</include>
<include>LICENSE</include>
<include>NOTICE</include>
<include>DISCLAIMER-WIP</include>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tubemq.example;

import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;

public class ArgsParserHelper {

/**
* Print help information and exit.
*
* @param opts - options
*/
public static void help(String commandName, Options opts) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp(commandName, opts);
System.exit(0);
}

/**
* Init common options when parsing args.
* @return - options
*/
public static Options initCommonOptions() {
Options options = new Options();
options.addOption("help", false, "show help");
options.addOption("master", true, "master address like: 127.0.0.1:8000");
options.addOption("topics", true, "topic list, topic1,topic2 or "
+ "topic1:tid11;tid12,topic2:tid21;tid22(consumer only)");
return options;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,13 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.codec.binary.StringUtils;
import org.apache.tubemq.client.config.TubeClientConfig;
import org.apache.tubemq.client.exception.TubeClientException;
Expand Down Expand Up @@ -89,45 +94,70 @@ public MAMessageProducerExample(String masterHostAndPort) throws Exception {
}
}

public static void main(String[] args) {
final String masterHostAndPort = args[0];

final String topics = args[1];
final List<String> topicList = Arrays.asList(topics.split(","));

topicSet = new TreeSet<>(topicList);

msgCount = Integer.parseInt(args[2]);
producerCount = Math.min(args.length > 4 ? Integer.parseInt(args[3]) : 10, MAX_PRODUCER_NUM);

logger.info("MAMessageProducerExample.main started...");
/**
* Init options
*
* @return options
*/
public static Options initOptions() {
Options options = ArgsParserHelper.initCommonOptions();
options.addOption("count", false, "producer count");
options.addOption("thread", false, "thread number of producers");
return options;
}

final byte[] transmitData = StringUtils.getBytesUtf8("This is a test message from multi-session factory.");
final ByteBuffer dataBuffer = ByteBuffer.allocate(1024);
public static void main(String[] args) {
Options options = null;
try {
CommandLineParser parser = new PosixParser();
options = initOptions();
CommandLine cl = parser.parse(options, args);
if (cl != null) {
final String masterHostAndPort = cl.getOptionValue("master");
final String topics = cl.getOptionValue("topics");
final List<String> topicList = Arrays.asList(topics.split(","));
topicSet = new TreeSet<>(topicList);

msgCount = Integer.parseInt(cl.getOptionValue("count"));
producerCount = Math.min(Integer.parseInt(cl.getOptionValue(
"thread", "1")), MAX_PRODUCER_NUM);
logger.info("MAMessageProducerExample.main started...");
final byte[] transmitData = StringUtils
.getBytesUtf8("This is a test message from multi-session factory.");
final ByteBuffer dataBuffer = ByteBuffer.allocate(1024);

while (dataBuffer.hasRemaining()) {
int offset = dataBuffer.arrayOffset();
dataBuffer.put(transmitData, offset,
Math.min(dataBuffer.remaining(), transmitData.length));
}

while (dataBuffer.hasRemaining()) {
int offset = dataBuffer.arrayOffset();
dataBuffer.put(transmitData, offset, Math.min(dataBuffer.remaining(), transmitData.length));
}
dataBuffer.flip();
sendData = dataBuffer.array();

dataBuffer.flip();
sendData = dataBuffer.array();
try {
MAMessageProducerExample messageProducer = new MAMessageProducerExample(
masterHostAndPort);

try {
MAMessageProducerExample messageProducer = new MAMessageProducerExample(masterHostAndPort);
messageProducer.startService();

messageProducer.startService();
while (SENT_SUCC_COUNTER.get() < msgCount * producerCount * topicSet.size()) {
TimeUnit.MILLISECONDS.sleep(1000);
}
messageProducer.producerMap.clear();
messageProducer.shutdown();

while (SENT_SUCC_COUNTER.get() < msgCount * producerCount * topicSet.size()) {
Thread.sleep(1000);
} catch (TubeClientException e) {
logger.error("TubeClientException: ", e);
} catch (Throwable e) {
logger.error("Throwable: ", e);
}
}
} catch (Exception ex) {
logger.error(ex.getMessage());
if (options != null) {
ArgsParserHelper.help("./tubemq-console-producer.sh", options);
}
messageProducer.producerMap.clear();
messageProducer.shutdown();

} catch (TubeClientException e) {
logger.error("TubeClientException: ", e);
} catch (Throwable e) {
logger.error("Throwable: ", e);
}
}

Expand Down
Loading