From 028628982b054523e4b8a2309218d033a0fd75db Mon Sep 17 00:00:00 2001 From: gosonzhang <4675739@qq.com> Date: Sat, 12 Feb 2022 12:41:05 +0800 Subject: [PATCH] [INLONG-2478][TubeMQ] Optimize GroupCountService logic implementation --- .../daemon/AbstractDaemonService.java | 6 +- .../server/broker/BrokerServiceServer.java | 11 +- .../server/broker/msgstore/MessageStore.java | 4 +- .../msgstore/disk/GetMessageResult.java | 12 +- .../broker/msgstore/disk/MsgFileStore.java | 4 +- .../broker/stats/GroupCountService.java | 146 ----------- .../{CountItem.java => TrafficInfo.java} | 38 ++- ...{CountService.java => TrafficService.java} | 29 ++- .../broker/stats/TrafficStatsService.java | 245 ++++++++++++++++++ .../server/broker/utils/DataStoreUtils.java | 16 +- .../broker/stats/GroupCountServiceTest.java | 39 --- .../broker/stats/ServiceStatsHolderTest.java | 2 + .../broker/stats/TrafficStatsServiceTest.java | 61 +++++ 13 files changed, 379 insertions(+), 234 deletions(-) delete mode 100644 inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/GroupCountService.java rename inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/{CountItem.java => TrafficInfo.java} (61%) rename inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/{CountService.java => TrafficService.java} (54%) create mode 100644 inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java delete mode 100644 inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/GroupCountServiceTest.java create mode 100644 inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsServiceTest.java diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/daemon/AbstractDaemonService.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/daemon/AbstractDaemonService.java index e76817b5f8b..d99006a112a 100644 --- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/daemon/AbstractDaemonService.java +++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/daemon/AbstractDaemonService.java @@ -22,11 +22,13 @@ import org.slf4j.LoggerFactory; public abstract class AbstractDaemonService implements Service, Runnable { - private static final Logger logger = LoggerFactory.getLogger(AbstractDaemonService.class); + private static final Logger logger = + LoggerFactory.getLogger(AbstractDaemonService.class); private final String name; private final long intervalMs; private final Thread daemon; - private AtomicBoolean shutdown = new AtomicBoolean(false); + private final AtomicBoolean shutdown = + new AtomicBoolean(false); public AbstractDaemonService(final String serviceName, final long intervalMs) { this.name = serviceName; diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java index 75b363da10f..74144f81f0c 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java @@ -66,9 +66,8 @@ import org.apache.inlong.tubemq.server.broker.nodeinfo.ConsumerNodeInfo; import org.apache.inlong.tubemq.server.broker.offset.OffsetRecordInfo; import org.apache.inlong.tubemq.server.broker.offset.OffsetService; -import org.apache.inlong.tubemq.server.broker.stats.CountService; -import org.apache.inlong.tubemq.server.broker.stats.GroupCountService; import org.apache.inlong.tubemq.server.broker.stats.ServiceStatsHolder; +import org.apache.inlong.tubemq.server.broker.stats.TrafficStatsService; import org.apache.inlong.tubemq.server.common.TServerConstants; import org.apache.inlong.tubemq.server.common.TStatusConstants; import org.apache.inlong.tubemq.server.common.aaaserver.CertificateBrokerHandler; @@ -107,9 +106,9 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic // row lock. private final RowLock brokerRowLock; // statistics of produce. - private final CountService putCounterGroup; + private final TrafficStatsService putCounterGroup; // statistics of consume. - private final CountService getCounterGroup; + private final TrafficStatsService getCounterGroup; // certificate handler. private final CertificateBrokerHandler serverAuthHandler; // consumer timeout listener. @@ -128,8 +127,8 @@ public BrokerServiceServer(final TubeBroker tubeBroker, this.serverAuthHandler = tubeBroker.getServerAuthHandler(); ServiceStatusHolder.setStatisParameters(tubeConfig.getAllowedReadIOExcptCnt(), tubeConfig.getAllowedWriteIOExcptCnt(), tubeConfig.getIoExcptStatsDurationMs()); - this.putCounterGroup = new GroupCountService("PutCounterGroup", "Producer", 60 * 1000); - this.getCounterGroup = new GroupCountService("GetCounterGroup", "Consumer", 60 * 1000); + this.putCounterGroup = new TrafficStatsService("PutCounterGroup", "Producer", 60 * 1000); + this.getCounterGroup = new TrafficStatsService("GetCounterGroup", "Consumer", 60 * 1000); this.heartbeatManager = new HeartbeatManager(); this.brokerRowLock = new RowLock("Broker-RowLock", this.tubeConfig.getRowLockWaitDurMs()); diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java index 37645595b76..61b94f4992f 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java @@ -47,7 +47,7 @@ import org.apache.inlong.tubemq.server.broker.msgstore.mem.MsgMemStatisInfo; import org.apache.inlong.tubemq.server.broker.msgstore.mem.MsgMemStore; import org.apache.inlong.tubemq.server.broker.nodeinfo.ConsumerNodeInfo; -import org.apache.inlong.tubemq.server.broker.stats.CountItem; +import org.apache.inlong.tubemq.server.broker.stats.TrafficInfo; import org.apache.inlong.tubemq.server.broker.utils.DataStoreUtils; import org.apache.inlong.tubemq.server.common.utils.AppendResult; import org.apache.inlong.tubemq.server.common.utils.IdWorker; @@ -219,7 +219,7 @@ public GetMessageResult getMessages(int reqSwitch, long requestOffset, if (inMemCache) { // return not found when data is under memory sink operation. if (memMsgRlt.isSuccess) { - HashMap countMap = + HashMap countMap = new HashMap<>(); List transferedMessageList = new ArrayList<>(); diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/GetMessageResult.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/GetMessageResult.java index 5cd6fef65ed..0c0d90308e5 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/GetMessageResult.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/GetMessageResult.java @@ -22,7 +22,7 @@ import java.util.List; import org.apache.inlong.tubemq.corebase.TBaseConstants; import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientBroker.TransferedMessage; -import org.apache.inlong.tubemq.server.broker.stats.CountItem; +import org.apache.inlong.tubemq.server.broker.stats.TrafficInfo; /** * Broker's reply to Consumer's GetMessage request. @@ -38,14 +38,14 @@ public class GetMessageResult { public long waitTime = -1; public boolean isSlowFreq = false; public boolean isFromSsdFile = false; - public HashMap tmpCounters = new HashMap<>(); + public HashMap tmpCounters = new HashMap<>(); public List transferedMessageList = new ArrayList<>(); public long maxOffset = TBaseConstants.META_VALUE_UNDEFINED; public GetMessageResult(boolean isSuccess, int retCode, final String errInfo, final long reqOffset, final int lastReadOffset, final long lastRdDataOffset, final int totalSize, - HashMap tmpCounters, + HashMap tmpCounters, List transferedMessageList) { this(isSuccess, retCode, errInfo, reqOffset, lastReadOffset, lastRdDataOffset, totalSize, tmpCounters, transferedMessageList, false); @@ -54,7 +54,7 @@ public GetMessageResult(boolean isSuccess, int retCode, final String errInfo, public GetMessageResult(boolean isSuccess, int retCode, final String errInfo, final long reqOffset, final int lastReadOffset, final long lastRdDataOffset, final int totalSize, - HashMap tmpCounters, + HashMap tmpCounters, List transferedMessageList, boolean isFromSsdFile) { this.isSuccess = isSuccess; @@ -108,11 +108,11 @@ public void setWaitTime(long waitTime) { this.waitTime = waitTime; } - public HashMap getTmpCounters() { + public HashMap getTmpCounters() { return tmpCounters; } - public void setTmpCounters(HashMap tmpCounters) { + public void setTmpCounters(HashMap tmpCounters) { this.tmpCounters = tmpCounters; } diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java index f2c5c0df40e..64ad394991a 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java @@ -35,8 +35,8 @@ import org.apache.inlong.tubemq.corebase.utils.ServiceStatusHolder; import org.apache.inlong.tubemq.server.broker.BrokerConfig; import org.apache.inlong.tubemq.server.broker.msgstore.MessageStore; -import org.apache.inlong.tubemq.server.broker.stats.CountItem; import org.apache.inlong.tubemq.server.broker.stats.ServiceStatsHolder; +import org.apache.inlong.tubemq.server.broker.stats.TrafficInfo; import org.apache.inlong.tubemq.server.broker.utils.DataStoreUtils; import org.apache.inlong.tubemq.server.broker.utils.DiskSamplePrint; import org.apache.inlong.tubemq.server.common.TServerConstants; @@ -263,7 +263,7 @@ public GetMessageResult getMessages(int partitionId, long lastRdOffset, final StringBuilder sBuilder = new StringBuilder(512); final long curDataMaxOffset = getDataMaxOffset(); final long curDataMinOffset = getDataMinOffset(); - HashMap countMap = new HashMap<>(); + HashMap countMap = new HashMap<>(); ByteBuffer dataBuffer = ByteBuffer.allocate(TServerConstants.CFG_STORE_DEFAULT_MSG_READ_UNIT); List transferedMessageList = diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/GroupCountService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/GroupCountService.java deleted file mode 100644 index a162dd318f0..00000000000 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/GroupCountService.java +++ /dev/null @@ -1,146 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.tubemq.server.broker.stats; - -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import org.apache.inlong.tubemq.corebase.daemon.AbstractDaemonService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Statistics of broker. It use two CountSet alternatively print statistics to log. - */ -public class GroupCountService extends AbstractDaemonService implements CountService { - private final Logger logger; - private final String cntHdr; - private final CountSet[] countSets = new CountSet[2]; - private AtomicInteger index = new AtomicInteger(0); - - public GroupCountService(String logFileName, String countType, long scanIntervalMs) { - super(logFileName, scanIntervalMs); - this.cntHdr = countType; - if (logFileName == null) { - this.logger = LoggerFactory.getLogger(GroupCountService.class); - } else { - this.logger = LoggerFactory.getLogger(logFileName); - } - countSets[0] = new CountSet(); - countSets[1] = new CountSet(); - super.start(); - } - - @Override - protected void loopProcess(long intervalMs) { - int tmpIndex = 0; - int befIndex = 0; - AtomicLong curRunCnt; - ConcurrentHashMap counters; - while (!super.isStopped()) { - try { - Thread.sleep(intervalMs); - befIndex = tmpIndex = index.get(); - if (index.compareAndSet(befIndex, (++tmpIndex) % 2)) { - curRunCnt = countSets[befIndex].refCnt; - do { - try { - Thread.sleep(10); - } catch (InterruptedException e) { - return; - } - } while (curRunCnt.get() > 0); - counters = countSets[befIndex].counterItem; - if (counters != null) { - for (Map.Entry entry : counters.entrySet()) { - logger.info("{}#{}#{}#{}", new Object[]{cntHdr, entry.getKey(), - entry.getValue().getMsgCount(), entry.getValue().getMsgSize()}); - } - counters.clear(); - } - } - } catch (InterruptedException e) { - return; - } catch (Throwable t) { - // - } - } - } - - @Override - public void close(long waitTimeMs) { - if (super.stop()) { - return; - } - int befIndex = index.get(); - ConcurrentHashMap counters; - for (int i = 0; i < countSets.length; i++) { - counters = countSets[(++befIndex) % 2].counterItem; - if (counters != null) { - for (Map.Entry entry : counters.entrySet()) { - logger.info("{}#{}#{}#{}", new Object[]{cntHdr, entry.getKey(), - entry.getValue().getMsgCount(), entry.getValue().getMsgSize()}); - } - counters.clear(); - } - } - } - - @Override - public void add(Map counterGroup) { - CountSet countSet = countSets[index.get()]; - countSet.refCnt.incrementAndGet(); - ConcurrentHashMap counters = countSet.counterItem; - for (Entry entry : counterGroup.entrySet()) { - CountItem currData = counters.get(entry.getKey()); - if (currData == null) { - CountItem tmpData = new CountItem(0L, 0L); - currData = counters.putIfAbsent(entry.getKey(), tmpData); - if (currData == null) { - currData = tmpData; - } - } - currData.appendMsg(entry.getValue().getMsgCount(), entry.getValue().getMsgSize()); - } - countSet.refCnt.decrementAndGet(); - } - - @Override - public void add(String name, Long delta, int msgSize) { - CountSet countSet = countSets[index.get()]; - countSet.refCnt.incrementAndGet(); - CountItem currData = countSet.counterItem.get(name); - if (currData == null) { - CountItem tmpData = new CountItem(0L, 0L); - currData = countSet.counterItem.putIfAbsent(name, tmpData); - if (currData == null) { - currData = tmpData; - } - } - currData.appendMsg(delta, msgSize); - countSet.refCnt.decrementAndGet(); - } - - private static class CountSet { - public AtomicLong refCnt = new AtomicLong(0); - public ConcurrentHashMap counterItem = - new ConcurrentHashMap<>(); - } -} diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/CountItem.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficInfo.java similarity index 61% rename from inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/CountItem.java rename to inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficInfo.java index 58ed92f9039..09685520e7a 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/CountItem.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficInfo.java @@ -17,39 +17,37 @@ package org.apache.inlong.tubemq.server.broker.stats; -import java.util.concurrent.atomic.AtomicLong; - /** * Statistic of message, contains message's count and message's size. */ -public class CountItem { - AtomicLong msgCount = new AtomicLong(0); - AtomicLong msgSize = new AtomicLong(0); - - public CountItem(long msgCount, long msgSize) { - this.msgCount.set(msgCount); - this.msgSize.set(msgSize); - } +public class TrafficInfo { + private long msgCnt = 0L; + private long msgSize = 0L; - public long getMsgSize() { - return msgSize.get(); + public TrafficInfo() { + clear(); } - public void setMsgSize(long msgSize) { - this.msgSize.set(msgSize); + public TrafficInfo(long msgCount, long msgSize) { + this.msgCnt = msgCount; + this.msgSize = msgSize; } public long getMsgCount() { - return msgCount.get(); + return msgCnt; } - public void setMsgCount(long msgCount) { - this.msgCount.set(msgCount); + public long getMsgSize() { + return msgSize; } - public void appendMsg(final long msgCount, final long msgSize) { - this.msgCount.addAndGet(msgCount); - this.msgSize.addAndGet(msgSize); + public void addMsgCntAndSize(long msgCount, long msgSize) { + this.msgCnt += msgCount; + this.msgSize += msgSize; } + public void clear() { + this.msgCnt = 0L; + this.msgSize = 0L; + } } diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/CountService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficService.java similarity index 54% rename from inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/CountService.java rename to inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficService.java index d502a118fd6..ea6a7b2f0bb 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/CountService.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficService.java @@ -19,11 +19,34 @@ import java.util.Map; -public interface CountService { +/** + * TrafficService, incoming and outgoing traffic statistics service + * + * Supports adding new metric data one by one or in batches, + * and outputting metric data to a file at specified intervals. + */ +public interface TrafficService { + /** + * Close service. + * + * @param waitTimeMs the wait time + */ void close(long waitTimeMs); - void add(Map counterGroup); + /** + * Add traffic information in batches + * + * @param trafficInfos the traffic information + */ + void add(Map trafficInfos); - void add(String name, Long delta, int msgSize); + /** + * Add a traffic information record + * + * @param statsKey the statistical key + * @param msgCnt the total message count + * @param msgSize the total message size + */ + void add(String statsKey, long msgCnt, long msgSize); } diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java new file mode 100644 index 00000000000..34c678b7bd7 --- /dev/null +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java @@ -0,0 +1,245 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.tubemq.server.broker.stats; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.inlong.tubemq.corebase.daemon.AbstractDaemonService; +import org.apache.inlong.tubemq.corebase.metric.impl.LongOnlineCounter; +import org.apache.inlong.tubemq.corebase.metric.impl.LongStatsCounter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * TrafficStatsService, Input and Output traffic statistics Service + * + * Due to the large amount of traffic-related metric data, this statistics service uses + * a daemon thread to periodically refresh the data to the special metric file + * for metric data collection. + */ +public class TrafficStatsService extends AbstractDaemonService implements TrafficService { + // Maximum write wait time + private static final long MAX_WRITING_WAIT_DLT = 5000L; + // Statistics output log file + private final Logger logger; + // Statistic category + private final String statsCat; + // Switchable traffic statistic units + private final WritableUnit[] switchableUnits = new WritableUnit[2]; + // Current writable index + private final AtomicInteger writableIndex = new AtomicInteger(0); + + /** + * Initial traffic statistics service + * + * @param logFileName the output file name + * @param countType the statistic type + * @param scanIntervalMs the snapshot interval + */ + public TrafficStatsService(String logFileName, String countType, long scanIntervalMs) { + super(logFileName, scanIntervalMs); + this.statsCat = countType; + if (logFileName == null) { + this.logger = LoggerFactory.getLogger(TrafficStatsService.class); + } else { + this.logger = LoggerFactory.getLogger(logFileName); + } + switchableUnits[0] = new WritableUnit(); + switchableUnits[1] = new WritableUnit(); + super.start(); + } + + @Override + protected void loopProcess(long intervalMs) { + int befIndex; + while (!super.isStopped()) { + try { + Thread.sleep(intervalMs); + // Snapshot metric data + befIndex = writableIndex.getAndIncrement(); + // Output 2 file + output2file(befIndex); + } catch (InterruptedException e) { + return; + } catch (Throwable t) { + // + } + } + } + + @Override + public void close(long waitTimeMs) { + if (super.stop()) { + return; + } + // Output remain information + int index = writableIndex.get(); + for (int i = 0; i < switchableUnits.length; i++) { + output2file(++index); + } + } + + @Override + public void add(Map trafficInfos) { + TrafficStatsSet tmpStatsSet; + TrafficStatsSet trafficStatsSet; + // Increment write reference count + switchableUnits[getIndex()].refCnt.incValue(); + try { + // Accumulate statistics information + ConcurrentHashMap tmpStatsSetMap = + switchableUnits[getIndex()].statsUnitMap; + for (Entry entry : trafficInfos.entrySet()) { + trafficStatsSet = tmpStatsSetMap.get(entry.getKey()); + if (trafficStatsSet == null) { + tmpStatsSet = new TrafficStatsSet(); + trafficStatsSet = tmpStatsSetMap.putIfAbsent(entry.getKey(), tmpStatsSet); + if (trafficStatsSet == null) { + trafficStatsSet = tmpStatsSet; + } + } + trafficStatsSet.addMsgCntAndSize( + entry.getValue().getMsgCount(), entry.getValue().getMsgSize()); + } + } finally { + // Decrement write reference count + switchableUnits[getIndex()].refCnt.decValue(); + } + } + + @Override + public void add(String statsKey, long msgCnt, long msgSize) { + // Increment write reference count + switchableUnits[getIndex()].refCnt.incValue(); + try { + // Accumulate statistics information + ConcurrentHashMap tmpStatsSetMap = + switchableUnits[getIndex()].statsUnitMap; + TrafficStatsSet trafficStatsSet = tmpStatsSetMap.get(statsKey); + if (trafficStatsSet == null) { + TrafficStatsSet tmpStatsSet = new TrafficStatsSet(); + trafficStatsSet = tmpStatsSetMap.putIfAbsent(statsKey, tmpStatsSet); + if (trafficStatsSet == null) { + trafficStatsSet = tmpStatsSet; + } + } + trafficStatsSet.addMsgCntAndSize(msgCnt, msgSize); + } finally { + // Decrement write reference count + switchableUnits[getIndex()].refCnt.decValue(); + } + } + + /** + * Print statistics data to file + * + * @param readIndex the readable index + */ + private void output2file(int readIndex) { + WritableUnit selectedUnit = + switchableUnits[getIndex(readIndex)]; + if (selectedUnit == null) { + return; + } + // Wait for the data update operation to complete + long startTime = System.currentTimeMillis(); + do { + if (System.currentTimeMillis() - startTime >= MAX_WRITING_WAIT_DLT) { + break; + } + try { + Thread.sleep(20); + } catch (InterruptedException e) { + break; + } + } while (selectedUnit.refCnt.getValue() > 0); + // Output data to file + Map statsMap = selectedUnit.statsUnitMap; + for (Entry entry : statsMap.entrySet()) { + logger.info("{}#{}#{}#{}", statsCat, entry.getKey(), + entry.getValue().msgCnt.getAndResetValue(), + entry.getValue().msgSize.getAndResetValue()); + } + statsMap.clear(); + } + + /** + * Get current writable block index. + * + * @return the writable block index + */ + private int getIndex() { + return getIndex(this.writableIndex.get()); + } + + /** + * Gets the metric block index based on the specified value. + * + * @param origIndex the specified value + * @return the metric block index + */ + private int getIndex(int origIndex) { + return Math.abs(origIndex % 2); + } + + /** + * StatsItemSet, Metric Statistics item set + * + * Currently includes the total number of messages and bytes + * according to the statistics dimension, which can be expanded later as needed + */ + private static class TrafficStatsSet { + protected LongStatsCounter msgCnt = + new LongStatsCounter("msgCount", null); + protected LongStatsCounter msgSize = + new LongStatsCounter("msgSize", null); + + public TrafficStatsSet() { + // + } + + /** + * Accumulate the count of messages and message bytes. + * + * @param msgCount the specified message count + * @param msgSize the specified message size + */ + public void addMsgCntAndSize(long msgCount, long msgSize) { + this.msgCnt.addValue(msgCount); + this.msgSize.addValue(msgSize); + } + } + + /** + * WritableUnit, + * + * This class is mainly defined to facilitate reading and writing of + * statistic set through array operations, which contains a Map of + * statistic dimensions and corresponding metric values + */ + private static class WritableUnit { + // Current writing thread count + public LongOnlineCounter refCnt = + new LongOnlineCounter("ref_count", null); + // statistic unit map + protected ConcurrentHashMap statsUnitMap = + new ConcurrentHashMap<>(512); + } +} diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/utils/DataStoreUtils.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/utils/DataStoreUtils.java index 5c1eabdfab6..f74cd3ffb76 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/utils/DataStoreUtils.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/utils/DataStoreUtils.java @@ -27,7 +27,7 @@ import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientBroker; import org.apache.inlong.tubemq.corebase.utils.MessageFlagUtils; import org.apache.inlong.tubemq.corebase.utils.TStringUtils; -import org.apache.inlong.tubemq.server.broker.stats.CountItem; +import org.apache.inlong.tubemq.server.broker.stats.TrafficInfo; /** * Storage util. Used for data and index file storage format. @@ -113,10 +113,10 @@ public static String nameFromOffset(final long offset, final String fileSuffix) * @param sBuilder the string buffer * @return the converted messages */ - public static ClientBroker.TransferedMessage getTransferMsg(final ByteBuffer dataBuffer, int dataTotalSize, - final HashMap countMap, - final String statisKeyBase, - final StringBuilder sBuilder) { + public static ClientBroker.TransferedMessage getTransferMsg(ByteBuffer dataBuffer, int dataTotalSize, + HashMap countMap, + String statisKeyBase, + StringBuilder sBuilder) { if (dataBuffer.array().length < dataTotalSize) { return null; } @@ -187,11 +187,11 @@ public static ClientBroker.TransferedMessage getTransferMsg(final ByteBuffer dat String baseKey = sBuilder.append(statisKeyBase) .append("#").append(messageTime).toString(); sBuilder.delete(0, sBuilder.length()); - CountItem getCount = countMap.get(baseKey); + TrafficInfo getCount = countMap.get(baseKey); if (getCount == null) { - countMap.put(baseKey, new CountItem(1L, payLoadLen2)); + countMap.put(baseKey, new TrafficInfo(1L, payLoadLen2)); } else { - getCount.appendMsg(1L, payLoadLen2); + getCount.addMsgCntAndSize(1L, payLoadLen2); } ClientBroker.TransferedMessage transferedMessage = dataBuilder.build(); dataBuilder.clear(); diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/GroupCountServiceTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/GroupCountServiceTest.java deleted file mode 100644 index b9e80fe5f9f..00000000000 --- a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/GroupCountServiceTest.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.tubemq.server.broker.stats; - -import java.util.HashMap; -import java.util.Map; -import org.junit.Test; - -/** - * GroupCountService test. - */ -public class GroupCountServiceTest { - - @Test - public void add() { - GroupCountService groupCountService = new GroupCountService("PutCounterGroup", "Producer", 60 * 1000); - groupCountService.add("key", 1L, 100); - Map items = new HashMap<>(); - items.put("key1", new CountItem(1L, 1024)); - items.put("key2", new CountItem(1L, 1024)); - // add counts - groupCountService.add(items); - } -} diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/ServiceStatsHolderTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/ServiceStatsHolderTest.java index f8ae24b37d6..4da079d92eb 100644 --- a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/ServiceStatsHolderTest.java +++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/ServiceStatsHolderTest.java @@ -75,6 +75,7 @@ public void testServiceStatsHolder() { Assert.assertEquals(10, retMap.get("file_sync_dlt_min").longValue()); Assert.assertEquals(1, retMap.get("file_sync_dlt_cell_8t16").longValue()); Assert.assertEquals(1, retMap.get("file_sync_dlt_cell_64t128").longValue()); + final long sinceTime1 = retMap.get("reset_time"); // verify snapshot ServiceStatsHolder.snapShort(retMap); retMap.clear(); @@ -83,6 +84,7 @@ public void testServiceStatsHolder() { // add disk sync data, add 1 ServiceStatsHolder.updDiskSyncDataDlt(999); ServiceStatsHolder.snapShort(retMap); + Assert.assertNotEquals(sinceTime1, retMap.get("reset_time").longValue()); Assert.assertEquals(2, retMap.get("consumer_online_cnt").longValue()); Assert.assertEquals(0, retMap.get("consumer_timeout_cnt").longValue()); Assert.assertEquals(0, retMap.get("broker_hb_exc_cnt").longValue()); diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsServiceTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsServiceTest.java new file mode 100644 index 00000000000..e8e08eabfd8 --- /dev/null +++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsServiceTest.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.tubemq.server.broker.stats; + +import java.util.HashMap; +import java.util.Map; +import org.junit.Assert; +import org.junit.Test; + +/** + * TrafficStatsService test. + */ +public class TrafficStatsServiceTest { + + @Test + public void testTrafficInfo() { + // case 1 + TrafficInfo trafficInfo1 = new TrafficInfo(); + trafficInfo1.addMsgCntAndSize(1, 100); + trafficInfo1.addMsgCntAndSize(3, 500); + Assert.assertEquals(4, trafficInfo1.getMsgCount()); + Assert.assertEquals(600, trafficInfo1.getMsgSize()); + trafficInfo1.clear(); + trafficInfo1.addMsgCntAndSize(50, 5000); + Assert.assertEquals(50, trafficInfo1.getMsgCount()); + Assert.assertEquals(5000, trafficInfo1.getMsgSize()); + // case 2 + TrafficInfo trafficInfo2 = new TrafficInfo(99, 1000); + trafficInfo2.addMsgCntAndSize(1, 100); + Assert.assertEquals(100, trafficInfo2.getMsgCount()); + Assert.assertEquals(1100, trafficInfo2.getMsgSize()); + } + + @Test + public void testTrafficStatsService() { + TrafficStatsService trafficService = + new TrafficStatsService("PutCounterGroup", "Producer", 60 * 1000L); + trafficService.add("key", 1L, 100); + Map items = new HashMap<>(); + items.put("key1", new TrafficInfo(1L, 1024)); + items.put("key2", new TrafficInfo(1L, 1024)); + // add counts + trafficService.add(items); + trafficService.add("key3", 3L, 500L); + } +}