Skip to content

Commit

Permalink
[ISSUE #2088] Optimize rocketmq client's stats of RT to make sense
Browse files Browse the repository at this point in the history
Fix issue #2088 , make the log output of RT stat makes sense.
  • Loading branch information
Jaskey committed Jun 23, 2020
1 parent 2fa51fc commit 9e1b00c
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 31 deletions.
Expand Up @@ -62,15 +62,15 @@ public void shutdown() {
}

public void incPullRT(final String group, final String topic, final long rt) {
this.topicAndGroupPullRT.addValue(topic + "@" + group, (int) rt, 1);
this.topicAndGroupPullRT.addRTValue(topic + "@" + group, (int) rt, 1);
}

public void incPullTPS(final String group, final String topic, final long msgs) {
this.topicAndGroupPullTPS.addValue(topic + "@" + group, (int) msgs, 1);
}

public void incConsumeRT(final String group, final String topic, final long rt) {
this.topicAndGroupConsumeRT.addValue(topic + "@" + group, (int) rt, 1);
this.topicAndGroupConsumeRT.addRTValue(topic + "@" + group, (int) rt, 1);
}

public void incConsumeOKTPS(final String group, final String topic, final long msgs) {
Expand Down
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.common.stats;

import org.apache.rocketmq.logging.InternalLogger;

import java.util.concurrent.ScheduledExecutorService;

/**
* A StatItem for response time, the only difference between from StatsItem is it has a different log output.
*/
public class RTStatsItem extends StatsItem {

public RTStatsItem(String statsName, String statsKey, ScheduledExecutorService scheduledExecutorService, InternalLogger log) {
super(statsName, statsKey, scheduledExecutorService, log);
}

/**
* For Response Time stat Item, the print detail should be a little different, TPS and SUM makes no sense.
* And we give a name "AVGRT" rather than AVGPT for value getAvgpt()
*/
@Override
protected String statPrintDetail(StatsSnapshot ss) {
return String.format("TIMES: %d AVGRT: %.2f", ss.getTimes(), ss.getAvgpt());
}
}
Expand Up @@ -55,13 +55,14 @@ private static StatsSnapshot computeStatsData(final LinkedList<CallSnapshot> csL
double tps = 0;
double avgpt = 0;
long sum = 0;
long timesDiff = 0;
if (!csList.isEmpty()) {
CallSnapshot first = csList.getFirst();
CallSnapshot last = csList.getLast();
sum = last.getValue() - first.getValue();
tps = (sum * 1000.0d) / (last.getTimestamp() - first.getTimestamp());

long timesDiff = last.getTimes() - first.getTimes();
timesDiff = last.getTimes() - first.getTimes();
if (timesDiff > 0) {
avgpt = (sum * 1.0d) / timesDiff;
}
Expand All @@ -70,6 +71,7 @@ private static StatsSnapshot computeStatsData(final LinkedList<CallSnapshot> csL
statsSnapshot.setSum(sum);
statsSnapshot.setTps(tps);
statsSnapshot.setAvgpt(avgpt);
statsSnapshot.setTimes(timesDiff);
}

return statsSnapshot;
Expand Down Expand Up @@ -191,32 +193,25 @@ public void samplingInHour() {

public void printAtMinutes() {
StatsSnapshot ss = computeStatsData(this.csListMinute);
log.info(String.format("[%s] [%s] Stats In One Minute, SUM: %d TPS: %.2f AVGPT: %.2f",
this.statsName,
this.statsKey,
ss.getSum(),
ss.getTps(),
ss.getAvgpt()));
log.info(String.format("[%s] [%s] Stats In One Minute, ", this.statsName, this.statsKey) + statPrintDetail(ss));
}

public void printAtHour() {
StatsSnapshot ss = computeStatsData(this.csListHour);
log.info(String.format("[%s] [%s] Stats In One Hour, SUM: %d TPS: %.2f AVGPT: %.2f",
this.statsName,
this.statsKey,
ss.getSum(),
ss.getTps(),
ss.getAvgpt()));
log.info(String.format("[%s] [%s] Stats In One Hour, ", this.statsName, this.statsKey) + statPrintDetail(ss));

}

public void printAtDay() {
StatsSnapshot ss = computeStatsData(this.csListDay);
log.info(String.format("[%s] [%s] Stats In One Day, SUM: %d TPS: %.2f AVGPT: %.2f",
this.statsName,
this.statsKey,
ss.getSum(),
ss.getTps(),
ss.getAvgpt()));
log.info(String.format("[%s] [%s] Stats In One Day, ", this.statsName, this.statsKey) + statPrintDetail(ss));
}

protected String statPrintDetail(StatsSnapshot ss) {
return String.format("SUM: %d TPS: %.2f AVGPT: %.2f",
ss.getSum(),
ss.getTps(),
ss.getAvgpt());
}

public AtomicLong getValue() {
Expand Down
Expand Up @@ -158,6 +158,12 @@ public void addValue(final String statsKey, final int incValue, final int incTim
statsItem.getTimes().addAndGet(incTimes);
}

public void addRTValue(final String statsKey, final int incValue, final int incTimes) {
StatsItem statsItem = this.getAndCreateRTStatsItem(statsKey);
statsItem.getValue().addAndGet(incValue);
statsItem.getTimes().addAndGet(incTimes);
}

public void delValue(final String statsKey) {
StatsItem statsItem = this.statsItemTable.get(statsKey);
if (null != statsItem) {
Expand Down Expand Up @@ -196,9 +202,21 @@ public void delValueBySuffixKey(final String statsKey, String separator) {
}

public StatsItem getAndCreateStatsItem(final String statsKey) {
return getAndCreateItem(statsKey, false);
}

public StatsItem getAndCreateRTStatsItem(final String statsKey) {
return getAndCreateItem(statsKey, true);
}

public StatsItem getAndCreateItem(final String statsKey, boolean rtItem) {
StatsItem statsItem = this.statsItemTable.get(statsKey);
if (null == statsItem) {
statsItem = new StatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log);
if (rtItem) {
statsItem = new RTStatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log);
} else {
statsItem = new StatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log);
}
StatsItem prev = this.statsItemTable.putIfAbsent(statsKey, statsItem);

if (null != prev) {
Expand Down
Expand Up @@ -20,6 +20,8 @@
public class StatsSnapshot {
private long sum;
private double tps;

private long times;
private double avgpt;

public long getSum() {
Expand All @@ -45,4 +47,12 @@ public double getAvgpt() {
public void setAvgpt(double avgpt) {
this.avgpt = avgpt;
}

public long getTimes() {
return times;
}

public void setTimes(long times) {
this.times = times;
}
}
Expand Up @@ -46,14 +46,17 @@ public void test_getAndCreateMomentStatsItem_multiThread() throws InterruptedExc

@Test
public void test_statsOfFirstStatisticsCycle() throws InterruptedException {
final StatsItemSet statsItemSet = new StatsItemSet("topicTest", scheduler, null);
final String tpsStatKey = "tpsTest";
final String rtStatKey = "rtTest";
final StatsItemSet statsItemSet = new StatsItemSet(tpsStatKey, scheduler, null);
executor = new ThreadPoolExecutor(10, 20, 10, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(100), new ThreadFactoryImpl("testMultiThread"));
for (int i = 0; i < 10; i++) {
executor.submit(new Runnable() {
@Override
public void run() {
statsItemSet.addValue("topicTest", 2, 1);
statsItemSet.addValue(tpsStatKey, 2, 1);
statsItemSet.addRTValue(rtStatKey, 2, 1);
}
});
}
Expand All @@ -63,14 +66,33 @@ public void run() {
}
Thread.sleep(1000);
}
// simulate schedule task execution
statsItemSet.getStatsItem("topicTest").samplingInSeconds();
statsItemSet.getStatsItem("topicTest").samplingInMinutes();
statsItemSet.getStatsItem("topicTest").samplingInHour();
// simulate schedule task execution , tps stat
{
statsItemSet.getStatsItem(tpsStatKey).samplingInSeconds();
statsItemSet.getStatsItem(tpsStatKey).samplingInMinutes();
statsItemSet.getStatsItem(tpsStatKey).samplingInHour();

assertEquals(20L, statsItemSet.getStatsDataInMinute(tpsStatKey).getSum());
assertEquals(20L, statsItemSet.getStatsDataInHour(tpsStatKey).getSum());
assertEquals(20L, statsItemSet.getStatsDataInDay(tpsStatKey).getSum());
assertEquals(10L, statsItemSet.getStatsDataInDay(tpsStatKey).getTimes());
assertEquals(10L, statsItemSet.getStatsDataInHour(tpsStatKey).getTimes());
assertEquals(10L, statsItemSet.getStatsDataInDay(tpsStatKey).getTimes());
}

// simulate schedule task execution , rt stat
{
statsItemSet.getStatsItem(rtStatKey).samplingInSeconds();
statsItemSet.getStatsItem(rtStatKey).samplingInMinutes();
statsItemSet.getStatsItem(rtStatKey).samplingInHour();

assertEquals(20L, statsItemSet.getStatsDataInMinute("topicTest").getSum());
assertEquals(20L, statsItemSet.getStatsDataInHour("topicTest").getSum());
assertEquals(20L, statsItemSet.getStatsDataInDay("topicTest").getSum());
assertEquals(20L, statsItemSet.getStatsDataInMinute(rtStatKey).getSum());
assertEquals(20L, statsItemSet.getStatsDataInHour(rtStatKey).getSum());
assertEquals(20L, statsItemSet.getStatsDataInDay(rtStatKey).getSum());
assertEquals(10L, statsItemSet.getStatsDataInDay(rtStatKey).getTimes());
assertEquals(10L, statsItemSet.getStatsDataInHour(rtStatKey).getTimes());
assertEquals(10L, statsItemSet.getStatsDataInDay(rtStatKey).getTimes());
}
}

private AtomicLong test_unit() throws InterruptedException {
Expand Down

0 comments on commit 9e1b00c

Please sign in to comment.