Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-2478][TubeMQ] Optimize GroupCountService logic implementation #2479

Merged
merged 1 commit into from
Feb 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import org.slf4j.LoggerFactory;

public abstract class AbstractDaemonService implements Service, Runnable {
private static final Logger logger = LoggerFactory.getLogger(AbstractDaemonService.class);
private static final Logger logger =
LoggerFactory.getLogger(AbstractDaemonService.class);
private final String name;
private final long intervalMs;
private final Thread daemon;
private AtomicBoolean shutdown = new AtomicBoolean(false);
private final AtomicBoolean shutdown =
new AtomicBoolean(false);

public AbstractDaemonService(final String serviceName, final long intervalMs) {
this.name = serviceName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,8 @@
import org.apache.inlong.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
import org.apache.inlong.tubemq.server.broker.offset.OffsetRecordInfo;
import org.apache.inlong.tubemq.server.broker.offset.OffsetService;
import org.apache.inlong.tubemq.server.broker.stats.CountService;
import org.apache.inlong.tubemq.server.broker.stats.GroupCountService;
import org.apache.inlong.tubemq.server.broker.stats.ServiceStatsHolder;
import org.apache.inlong.tubemq.server.broker.stats.TrafficStatsService;
import org.apache.inlong.tubemq.server.common.TServerConstants;
import org.apache.inlong.tubemq.server.common.TStatusConstants;
import org.apache.inlong.tubemq.server.common.aaaserver.CertificateBrokerHandler;
Expand Down Expand Up @@ -107,9 +106,9 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
// row lock.
private final RowLock brokerRowLock;
// statistics of produce.
private final CountService putCounterGroup;
private final TrafficStatsService putCounterGroup;
// statistics of consume.
private final CountService getCounterGroup;
private final TrafficStatsService getCounterGroup;
// certificate handler.
private final CertificateBrokerHandler serverAuthHandler;
// consumer timeout listener.
Expand All @@ -128,8 +127,8 @@ public BrokerServiceServer(final TubeBroker tubeBroker,
this.serverAuthHandler = tubeBroker.getServerAuthHandler();
ServiceStatusHolder.setStatisParameters(tubeConfig.getAllowedReadIOExcptCnt(),
tubeConfig.getAllowedWriteIOExcptCnt(), tubeConfig.getIoExcptStatsDurationMs());
this.putCounterGroup = new GroupCountService("PutCounterGroup", "Producer", 60 * 1000);
this.getCounterGroup = new GroupCountService("GetCounterGroup", "Consumer", 60 * 1000);
this.putCounterGroup = new TrafficStatsService("PutCounterGroup", "Producer", 60 * 1000);
this.getCounterGroup = new TrafficStatsService("GetCounterGroup", "Consumer", 60 * 1000);
this.heartbeatManager = new HeartbeatManager();
this.brokerRowLock =
new RowLock("Broker-RowLock", this.tubeConfig.getRowLockWaitDurMs());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import org.apache.inlong.tubemq.server.broker.msgstore.mem.MsgMemStatisInfo;
import org.apache.inlong.tubemq.server.broker.msgstore.mem.MsgMemStore;
import org.apache.inlong.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
import org.apache.inlong.tubemq.server.broker.stats.CountItem;
import org.apache.inlong.tubemq.server.broker.stats.TrafficInfo;
import org.apache.inlong.tubemq.server.broker.utils.DataStoreUtils;
import org.apache.inlong.tubemq.server.common.utils.AppendResult;
import org.apache.inlong.tubemq.server.common.utils.IdWorker;
Expand Down Expand Up @@ -219,7 +219,7 @@ public GetMessageResult getMessages(int reqSwitch, long requestOffset,
if (inMemCache) {
// return not found when data is under memory sink operation.
if (memMsgRlt.isSuccess) {
HashMap<String, CountItem> countMap =
HashMap<String, TrafficInfo> countMap =
new HashMap<>();
List<ClientBroker.TransferedMessage> transferedMessageList =
new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.List;
import org.apache.inlong.tubemq.corebase.TBaseConstants;
import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientBroker.TransferedMessage;
import org.apache.inlong.tubemq.server.broker.stats.CountItem;
import org.apache.inlong.tubemq.server.broker.stats.TrafficInfo;

/**
* Broker's reply to Consumer's GetMessage request.
Expand All @@ -38,14 +38,14 @@ public class GetMessageResult {
public long waitTime = -1;
public boolean isSlowFreq = false;
public boolean isFromSsdFile = false;
public HashMap<String, CountItem> tmpCounters = new HashMap<>();
public HashMap<String, TrafficInfo> tmpCounters = new HashMap<>();
public List<TransferedMessage> transferedMessageList = new ArrayList<>();
public long maxOffset = TBaseConstants.META_VALUE_UNDEFINED;

public GetMessageResult(boolean isSuccess, int retCode, final String errInfo,
final long reqOffset, final int lastReadOffset,
final long lastRdDataOffset, final int totalSize,
HashMap<String, CountItem> tmpCounters,
HashMap<String, TrafficInfo> tmpCounters,
List<TransferedMessage> transferedMessageList) {
this(isSuccess, retCode, errInfo, reqOffset, lastReadOffset,
lastRdDataOffset, totalSize, tmpCounters, transferedMessageList, false);
Expand All @@ -54,7 +54,7 @@ public GetMessageResult(boolean isSuccess, int retCode, final String errInfo,
public GetMessageResult(boolean isSuccess, int retCode, final String errInfo,
final long reqOffset, final int lastReadOffset,
final long lastRdDataOffset, final int totalSize,
HashMap<String, CountItem> tmpCounters,
HashMap<String, TrafficInfo> tmpCounters,
List<TransferedMessage> transferedMessageList,
boolean isFromSsdFile) {
this.isSuccess = isSuccess;
Expand Down Expand Up @@ -108,11 +108,11 @@ public void setWaitTime(long waitTime) {
this.waitTime = waitTime;
}

public HashMap<String, CountItem> getTmpCounters() {
public HashMap<String, TrafficInfo> getTmpCounters() {
return tmpCounters;
}

public void setTmpCounters(HashMap<String, CountItem> tmpCounters) {
public void setTmpCounters(HashMap<String, TrafficInfo> tmpCounters) {
this.tmpCounters = tmpCounters;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
import org.apache.inlong.tubemq.corebase.utils.ServiceStatusHolder;
import org.apache.inlong.tubemq.server.broker.BrokerConfig;
import org.apache.inlong.tubemq.server.broker.msgstore.MessageStore;
import org.apache.inlong.tubemq.server.broker.stats.CountItem;
import org.apache.inlong.tubemq.server.broker.stats.ServiceStatsHolder;
import org.apache.inlong.tubemq.server.broker.stats.TrafficInfo;
import org.apache.inlong.tubemq.server.broker.utils.DataStoreUtils;
import org.apache.inlong.tubemq.server.broker.utils.DiskSamplePrint;
import org.apache.inlong.tubemq.server.common.TServerConstants;
Expand Down Expand Up @@ -263,7 +263,7 @@ public GetMessageResult getMessages(int partitionId, long lastRdOffset,
final StringBuilder sBuilder = new StringBuilder(512);
final long curDataMaxOffset = getDataMaxOffset();
final long curDataMinOffset = getDataMinOffset();
HashMap<String, CountItem> countMap = new HashMap<>();
HashMap<String, TrafficInfo> countMap = new HashMap<>();
ByteBuffer dataBuffer =
ByteBuffer.allocate(TServerConstants.CFG_STORE_DEFAULT_MSG_READ_UNIT);
List<ClientBroker.TransferedMessage> transferedMessageList =
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,39 +17,37 @@

package org.apache.inlong.tubemq.server.broker.stats;

import java.util.concurrent.atomic.AtomicLong;

/**
* Statistic of message, contains message's count and message's size.
*/
public class CountItem {
AtomicLong msgCount = new AtomicLong(0);
AtomicLong msgSize = new AtomicLong(0);

public CountItem(long msgCount, long msgSize) {
this.msgCount.set(msgCount);
this.msgSize.set(msgSize);
}
public class TrafficInfo {
private long msgCnt = 0L;
private long msgSize = 0L;

public long getMsgSize() {
return msgSize.get();
public TrafficInfo() {
clear();
}

public void setMsgSize(long msgSize) {
this.msgSize.set(msgSize);
public TrafficInfo(long msgCount, long msgSize) {
this.msgCnt = msgCount;
this.msgSize = msgSize;
}

public long getMsgCount() {
return msgCount.get();
return msgCnt;
}

public void setMsgCount(long msgCount) {
this.msgCount.set(msgCount);
public long getMsgSize() {
return msgSize;
}

public void appendMsg(final long msgCount, final long msgSize) {
this.msgCount.addAndGet(msgCount);
this.msgSize.addAndGet(msgSize);
public void addMsgCntAndSize(long msgCount, long msgSize) {
this.msgCnt += msgCount;
this.msgSize += msgSize;
}

public void clear() {
this.msgCnt = 0L;
this.msgSize = 0L;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,34 @@

import java.util.Map;

public interface CountService {
/**
* TrafficService, incoming and outgoing traffic statistics service
*
* Supports adding new metric data one by one or in batches,
* and outputting metric data to a file at specified intervals.
*/
public interface TrafficService {

/**
* Close service.
*
* @param waitTimeMs the wait time
*/
void close(long waitTimeMs);

void add(Map<String, CountItem> counterGroup);
/**
* Add traffic information in batches
*
* @param trafficInfos the traffic information
*/
void add(Map<String, TrafficInfo> trafficInfos);

void add(String name, Long delta, int msgSize);
/**
* Add a traffic information record
*
* @param statsKey the statistical key
* @param msgCnt the total message count
* @param msgSize the total message size
*/
void add(String statsKey, long msgCnt, long msgSize);
}