Skip to content

Commit

Permalink
Merge 504ed4e into 6f7b0d6
Browse files Browse the repository at this point in the history
  • Loading branch information
areyouok committed Jul 29, 2021
2 parents 6f7b0d6 + 504ed4e commit 68ed059
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 78 deletions.
8 changes: 4 additions & 4 deletions store/src/main/java/org/apache/rocketmq/store/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -685,8 +685,8 @@ public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBroke
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1);
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());

CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
Expand Down Expand Up @@ -802,8 +802,8 @@ public CompletableFuture<PutMessageResult> asyncPutMessages(final MessageExtBatc
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(result.getMsgNum());
storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(result.getWroteBytes());
storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).add(1);
storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).add(result.getWroteBytes());

CompletableFuture<PutMessageStatus> flushOKFuture = submitFlushRequest(result, messageExtBatch);
CompletableFuture<PutMessageStatus> replicaOKFuture = submitReplicaRequest(result, messageExtBatch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);

if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
this.storeStatsService.getPutMessageFailedTimes().add(1);
}
});

Expand Down Expand Up @@ -468,7 +468,7 @@ public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch mess
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);

if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
this.storeStatsService.getPutMessageFailedTimes().add(1);
}
});

Expand Down Expand Up @@ -634,7 +634,7 @@ public GetMessageResult getMessage(final String group, final String topic, final
continue;
}

this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
this.storeStatsService.getGetMessageTransferedMsgCount().add(1);
getResult.addMessage(selectResult);
status = GetMessageStatus.FOUND;
nextPhyFileStartOffset = Long.MIN_VALUE;
Expand Down Expand Up @@ -668,9 +668,9 @@ public GetMessageResult getMessage(final String group, final String topic, final
}

if (GetMessageStatus.FOUND == status) {
this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();
this.storeStatsService.getGetMessageTimesTotalFound().add(1);
} else {
this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();
this.storeStatsService.getGetMessageTimesTotalMiss().add(1);
}
long elapsedTime = this.getSystemClock().now() - beginTime;
this.storeStatsService.setGetMessageEntireTimeMax(elapsedTime);
Expand Down Expand Up @@ -1922,10 +1922,10 @@ private void doReput() {
readSize += size;
if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
DefaultMessageStore.this.storeStatsService
.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);
DefaultMessageStore.this.storeStatsService
.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
.addAndGet(dispatchRequest.getMsgSize());
.add(dispatchRequest.getMsgSize());
}
} else if (size == 0) {
this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
Expand Down
106 changes: 53 additions & 53 deletions store/src/main/java/org/apache/rocketmq/store/StoreStatsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.constant.LoggerName;
Expand All @@ -41,23 +41,23 @@ public class StoreStatsService extends ServiceThread {

private static int printTPSInterval = 60 * 1;

private final AtomicLong putMessageFailedTimes = new AtomicLong(0);
private final LongAdder putMessageFailedTimes = new LongAdder();

private final ConcurrentMap<String, AtomicLong> putMessageTopicTimesTotal =
new ConcurrentHashMap<String, AtomicLong>(128);
private final ConcurrentMap<String, AtomicLong> putMessageTopicSizeTotal =
new ConcurrentHashMap<String, AtomicLong>(128);
private final ConcurrentMap<String, LongAdder> putMessageTopicTimesTotal =
new ConcurrentHashMap<>(128);
private final ConcurrentMap<String, LongAdder> putMessageTopicSizeTotal =
new ConcurrentHashMap<>(128);

private final AtomicLong getMessageTimesTotalFound = new AtomicLong(0);
private final AtomicLong getMessageTransferedMsgCount = new AtomicLong(0);
private final AtomicLong getMessageTimesTotalMiss = new AtomicLong(0);
private final LongAdder getMessageTimesTotalFound = new LongAdder();
private final LongAdder getMessageTransferedMsgCount = new LongAdder();
private final LongAdder getMessageTimesTotalMiss = new LongAdder();
private final LinkedList<CallSnapshot> putTimesList = new LinkedList<CallSnapshot>();

private final LinkedList<CallSnapshot> getTimesFoundList = new LinkedList<CallSnapshot>();
private final LinkedList<CallSnapshot> getTimesMissList = new LinkedList<CallSnapshot>();
private final LinkedList<CallSnapshot> transferedMsgCountList = new LinkedList<CallSnapshot>();
private volatile AtomicLong[] putMessageDistributeTime;
private volatile AtomicLong[] lastPutMessageDistributeTime;
private volatile LongAdder[] putMessageDistributeTime;
private volatile LongAdder[] lastPutMessageDistributeTime;
private long messageStoreBootTimestamp = System.currentTimeMillis();
private volatile long putMessageEntireTimeMax = 0;
private volatile long getMessageEntireTimeMax = 0;
Expand All @@ -75,10 +75,10 @@ public StoreStatsService() {
this.initPutMessageDistributeTime();
}

private AtomicLong[] initPutMessageDistributeTime() {
AtomicLong[] next = new AtomicLong[13];
private LongAdder[] initPutMessageDistributeTime() {
LongAdder[] next = new LongAdder[13];
for (int i = 0; i < next.length; i++) {
next[i] = new AtomicLong(0);
next[i] = new LongAdder();
}

this.lastPutMessageDistributeTime = this.putMessageDistributeTime;
Expand All @@ -93,48 +93,48 @@ public long getPutMessageEntireTimeMax() {
}

public void setPutMessageEntireTimeMax(long value) {
final AtomicLong[] times = this.putMessageDistributeTime;
final LongAdder[] times = this.putMessageDistributeTime;

if (null == times)
return;

// us
if (value <= 0) {
times[0].incrementAndGet();
times[0].add(1);
} else if (value < 10) {
times[1].incrementAndGet();
times[1].add(1);
} else if (value < 50) {
times[2].incrementAndGet();
times[2].add(1);
} else if (value < 100) {
times[3].incrementAndGet();
times[3].add(1);
} else if (value < 200) {
times[4].incrementAndGet();
times[4].add(1);
} else if (value < 500) {
times[5].incrementAndGet();
times[5].add(1);
} else if (value < 1000) {
times[6].incrementAndGet();
times[6].add(1);
}
// 2s
else if (value < 2000) {
times[7].incrementAndGet();
times[7].add(1);
}
// 3s
else if (value < 3000) {
times[8].incrementAndGet();
times[8].add(1);
}
// 4s
else if (value < 4000) {
times[9].incrementAndGet();
times[9].add(1);
}
// 5s
else if (value < 5000) {
times[10].incrementAndGet();
times[10].add(1);
}
// 10s
else if (value < 10000) {
times[11].incrementAndGet();
times[11].add(1);
} else {
times[12].incrementAndGet();
times[12].add(1);
}

if (value > this.putMessageEntireTimeMax) {
Expand Down Expand Up @@ -194,8 +194,8 @@ public String toString() {

public long getPutMessageTimesTotal() {
long rs = 0;
for (AtomicLong data : putMessageTopicTimesTotal.values()) {
rs += data.get();
for (LongAdder data : putMessageTopicTimesTotal.values()) {
rs += data.longValue();
}
return rs;
}
Expand All @@ -218,8 +218,8 @@ private String getFormatRuntime() {

public long getPutMessageSizeTotal() {
long rs = 0;
for (AtomicLong data : putMessageTopicSizeTotal.values()) {
rs += data.get();
for (LongAdder data : putMessageTopicSizeTotal.values()) {
rs += data.longValue();
}
return rs;
}
Expand Down Expand Up @@ -299,13 +299,13 @@ private String getGetTransferedTps() {
}

private String putMessageDistributeTimeToString() {
final AtomicLong[] times = this.lastPutMessageDistributeTime;
final LongAdder[] times = this.lastPutMessageDistributeTime;
if (null == times)
return null;

final StringBuilder sb = new StringBuilder();
for (int i = 0; i < times.length; i++) {
long value = times[i].get();
long value = times[i].longValue();
sb.append(String.format("%s:%d", PUT_MESSAGE_ENTIRE_TIME_MAX_DESC[i], value));
sb.append(" ");
}
Expand Down Expand Up @@ -477,19 +477,19 @@ private void sampling() {
}

this.getTimesFoundList.add(new CallSnapshot(System.currentTimeMillis(),
this.getMessageTimesTotalFound.get()));
this.getMessageTimesTotalFound.longValue()));
if (this.getTimesFoundList.size() > (MAX_RECORDS_OF_SAMPLING + 1)) {
this.getTimesFoundList.removeFirst();
}

this.getTimesMissList.add(new CallSnapshot(System.currentTimeMillis(),
this.getMessageTimesTotalMiss.get()));
this.getMessageTimesTotalMiss.longValue()));
if (this.getTimesMissList.size() > (MAX_RECORDS_OF_SAMPLING + 1)) {
this.getTimesMissList.removeFirst();
}

this.transferedMsgCountList.add(new CallSnapshot(System.currentTimeMillis(),
this.getMessageTransferedMsgCount.get()));
this.getMessageTransferedMsgCount.longValue()));
if (this.transferedMsgCountList.size() > (MAX_RECORDS_OF_SAMPLING + 1)) {
this.transferedMsgCountList.removeFirst();
}
Expand All @@ -510,14 +510,14 @@ private void printTps() {
this.getGetTransferedTps(printTPSInterval)
);

final AtomicLong[] times = this.initPutMessageDistributeTime();
final LongAdder[] times = this.initPutMessageDistributeTime();
if (null == times)
return;

final StringBuilder sb = new StringBuilder();
long totalPut = 0;
for (int i = 0; i < times.length; i++) {
long value = times[i].get();
long value = times[i].longValue();
totalPut += value;
sb.append(String.format("%s:%d", PUT_MESSAGE_ENTIRE_TIME_MAX_DESC[i], value));
sb.append(" ");
Expand All @@ -527,51 +527,51 @@ private void printTps() {
}
}

public AtomicLong getGetMessageTimesTotalFound() {
public LongAdder getGetMessageTimesTotalFound() {
return getMessageTimesTotalFound;
}

public AtomicLong getGetMessageTimesTotalMiss() {
public LongAdder getGetMessageTimesTotalMiss() {
return getMessageTimesTotalMiss;
}

public AtomicLong getGetMessageTransferedMsgCount() {
public LongAdder getGetMessageTransferedMsgCount() {
return getMessageTransferedMsgCount;
}

public AtomicLong getPutMessageFailedTimes() {
public LongAdder getPutMessageFailedTimes() {
return putMessageFailedTimes;
}

public AtomicLong getSinglePutMessageTopicSizeTotal(String topic) {
AtomicLong rs = putMessageTopicSizeTotal.get(topic);
public LongAdder getSinglePutMessageTopicSizeTotal(String topic) {
LongAdder rs = putMessageTopicSizeTotal.get(topic);
if (null == rs) {
rs = new AtomicLong(0);
AtomicLong previous = putMessageTopicSizeTotal.putIfAbsent(topic, rs);
rs = new LongAdder();
LongAdder previous = putMessageTopicSizeTotal.putIfAbsent(topic, rs);
if (previous != null) {
rs = previous;
}
}
return rs;
}

public AtomicLong getSinglePutMessageTopicTimesTotal(String topic) {
AtomicLong rs = putMessageTopicTimesTotal.get(topic);
public LongAdder getSinglePutMessageTopicTimesTotal(String topic) {
LongAdder rs = putMessageTopicTimesTotal.get(topic);
if (null == rs) {
rs = new AtomicLong(0);
AtomicLong previous = putMessageTopicTimesTotal.putIfAbsent(topic, rs);
rs = new LongAdder();
LongAdder previous = putMessageTopicTimesTotal.putIfAbsent(topic, rs);
if (previous != null) {
rs = previous;
}
}
return rs;
}

public Map<String, AtomicLong> getPutMessageTopicTimesTotal() {
public Map<String, LongAdder> getPutMessageTopicTimesTotal() {
return putMessageTopicTimesTotal;
}

public Map<String, AtomicLong> getPutMessageTopicSizeTotal() {
public Map<String, LongAdder> getPutMessageTopicSizeTotal() {
return putMessageTopicSizeTotal;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,8 +502,8 @@ public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner
PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult);
if (putMessageStatus == PutMessageStatus.PUT_OK) {
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(finalTopic).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(msg.getTopic()).addAndGet(appendResult.getWroteBytes());
storeStatsService.getSinglePutMessageTopicTimesTotal(finalTopic).add(1);
storeStatsService.getSinglePutMessageTopicSizeTotal(msg.getTopic()).add(appendResult.getWroteBytes());
}
return putMessageResult;
});
Expand Down Expand Up @@ -629,8 +629,8 @@ public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch mess
PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult);
if (putMessageStatus == PutMessageStatus.PUT_OK) {
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(appendResult.getWroteBytes());
storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).add(1);
storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).add(appendResult.getWroteBytes());
}
return putMessageResult;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void record() {
this.msgPutTotalTodayMorning =
this.defaultMessageStore.getStoreStatsService().getPutMessageTimesTotal();
this.msgGetTotalTodayMorning =
this.defaultMessageStore.getStoreStatsService().getGetMessageTransferedMsgCount().get();
this.defaultMessageStore.getStoreStatsService().getGetMessageTransferedMsgCount().longValue();

log.info("yesterday put message total: {}", msgPutTotalTodayMorning - msgPutTotalYesterdayMorning);
log.info("yesterday get message total: {}", msgGetTotalTodayMorning - msgGetTotalYesterdayMorning);
Expand Down Expand Up @@ -88,6 +88,6 @@ public long getMsgPutTotalTodayNow() {
}

public long getMsgGetTotalTodayNow() {
return this.defaultMessageStore.getStoreStatsService().getGetMessageTransferedMsgCount().get();
return this.defaultMessageStore.getStoreStatsService().getGetMessageTransferedMsgCount().longValue();
}
}

0 comments on commit 68ed059

Please sign in to comment.