Skip to content

Commit

Permalink
Merge pull request #1683 from areyouok/pr_benchmark_consumer_2
Browse files Browse the repository at this point in the history
Optimise benchmark consumer to support the  consume fail rate option
  • Loading branch information
vongosling committed Jan 20, 2020
2 parents b057f93 + 6be0922 commit ad76fd3
Showing 1 changed file with 32 additions and 9 deletions.
Expand Up @@ -22,7 +22,9 @@
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
Expand All @@ -49,15 +51,16 @@ public static void main(String[] args) throws MQClientException, IOException {

final String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : "BenchmarkTest";
final String groupPrefix = commandLine.hasOption('g') ? commandLine.getOptionValue('g').trim() : "benchmark_consumer";
final String isPrefixEnable = commandLine.hasOption('p') ? commandLine.getOptionValue('p').trim() : "true";
final String isSuffixEnable = commandLine.hasOption('p') ? commandLine.getOptionValue('p').trim() : "true";
final String filterType = commandLine.hasOption('f') ? commandLine.getOptionValue('f').trim() : null;
final String expression = commandLine.hasOption('e') ? commandLine.getOptionValue('e').trim() : null;
final double failRate = commandLine.hasOption('r') ? Double.parseDouble(commandLine.getOptionValue('r').trim()) : 0.0;
String group = groupPrefix;
if (Boolean.parseBoolean(isPrefixEnable)) {
group = groupPrefix + "_" + Long.toString(System.currentTimeMillis() % 100);
if (Boolean.parseBoolean(isSuffixEnable)) {
group = groupPrefix + "_" + (System.currentTimeMillis() % 100);
}

System.out.printf("topic: %s, group: %s, prefix: %s, filterType: %s, expression: %s%n", topic, group, isPrefixEnable, filterType, expression);
System.out.printf("topic: %s, group: %s, suffix: %s, filterType: %s, expression: %s%n", topic, group, isSuffixEnable, filterType, expression);

final StatsBenchmarkConsumer statsBenchmarkConsumer = new StatsBenchmarkConsumer();

Expand Down Expand Up @@ -85,9 +88,15 @@ private void printStats() {
(long) (((end[1] - begin[1]) / (double) (end[0] - begin[0])) * 1000L);
final double averageB2CRT = (end[2] - begin[2]) / (double) (end[1] - begin[1]);
final double averageS2CRT = (end[3] - begin[3]) / (double) (end[1] - begin[1]);
final long failCount = end[4] - begin[4];
final long b2cMax = statsBenchmarkConsumer.getBorn2ConsumerMaxRT().get();
final long s2cMax = statsBenchmarkConsumer.getStore2ConsumerMaxRT().get();

statsBenchmarkConsumer.getBorn2ConsumerMaxRT().set(0);
statsBenchmarkConsumer.getStore2ConsumerMaxRT().set(0);

System.out.printf("Consume TPS: %d Average(B2C) RT: %7.3f Average(S2C) RT: %7.3f MAX(B2C) RT: %d MAX(S2C) RT: %d%n",
consumeTps, averageB2CRT, averageS2CRT, end[4], end[5]
System.out.printf("TPS: %d FAIL: %d AVG(B2C) RT: %7.3f AVG(S2C) RT: %7.3f MAX(B2C) RT: %d MAX(S2C) RT: %d%n",
consumeTps, failCount, averageB2CRT, averageS2CRT, b2cMax, s2cMax
);
}
}
Expand Down Expand Up @@ -144,7 +153,12 @@ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,

compareAndSetMax(statsBenchmarkConsumer.getStore2ConsumerMaxRT(), store2ConsumerRT);

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
if (ThreadLocalRandom.current().nextDouble() < failRate) {
statsBenchmarkConsumer.getFailCount().incrementAndGet();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
} else {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
});

Expand Down Expand Up @@ -174,6 +188,10 @@ public static Options buildCommandlineOptions(final Options options) {
opt.setRequired(false);
options.addOption(opt);

opt = new Option("r", "fail rate", true, "consumer fail rate, default 0");
opt.setRequired(false);
options.addOption(opt);

return options;
}

Expand All @@ -200,14 +218,15 @@ class StatsBenchmarkConsumer {

private final AtomicLong store2ConsumerMaxRT = new AtomicLong(0L);

private final AtomicLong failCount = new AtomicLong(0L);

public Long[] createSnapshot() {
Long[] snap = new Long[] {
System.currentTimeMillis(),
this.receiveMessageTotalCount.get(),
this.born2ConsumerTotalRT.get(),
this.store2ConsumerTotalRT.get(),
this.born2ConsumerMaxRT.get(),
this.store2ConsumerMaxRT.get(),
this.failCount.get()
};

return snap;
Expand All @@ -232,4 +251,8 @@ public AtomicLong getBorn2ConsumerMaxRT() {
public AtomicLong getStore2ConsumerMaxRT() {
return store2ConsumerMaxRT;
}

public AtomicLong getFailCount() {
return failCount;
}
}

0 comments on commit ad76fd3

Please sign in to comment.