Skip to content

Commit

Permalink
Merge branch 'fix-consume-metric' into 'inlong-2.8.1.1' (merge reques…
Browse files Browse the repository at this point in the history
…t !26)


fix-consume-metric
Fixes #22 
### Motivation
fix consume metric not print problem;
add metrics for pengdingAcks and pendingAckTimeAndSize;
add ut for new metrics about pengding ack
  • Loading branch information
baomingyu committed Oct 29, 2021
2 parents dad8eb4 + 2b0a4be commit fcb57c3
Show file tree
Hide file tree
Showing 4 changed files with 294 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ public class Consumer {
private final LongAdder msgOutCounter;
private final LongAdder bytesOutCounter;

private final LongAdder messageAckedIndvidualCounter;
private final LongAdder messageAckedCumulativeCounter;
private final LongAdder pendingAckRemovedCounter;
private final LongAdder pendingAckTimesAndSizesRemovedCounter;


private long lastConsumedTimestamp;
private long lastAckedTimestamp;
private Rate chunkedMessageRate;
Expand Down Expand Up @@ -152,6 +158,12 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
this.msgRedeliver = new Rate();
this.bytesOutCounter = new LongAdder();
this.msgOutCounter = new LongAdder();

this.messageAckedIndvidualCounter = new LongAdder();
this.messageAckedCumulativeCounter = new LongAdder();
this.pendingAckRemovedCounter = new LongAdder();
this.pendingAckTimesAndSizesRemovedCounter = new LongAdder();

this.appId = appId;

// Ensure we start from compacted view
Expand Down Expand Up @@ -268,6 +280,8 @@ public Future<Void> sendMessages(final List<Entry> entries, EntryBatchSizes batc
topicName, subscription, entry.getLedgerId(), entry.getEntryId(),
metricTime, msgSize, consumerId);
}
} else {
log.warn("BatchMetricTimes is null for entry {}:{}", entry.getLedgerId(), entry.getEntryId());
}
}
}
Expand Down Expand Up @@ -299,8 +313,7 @@ public Future<Void> sendMessages(final List<Entry> entries, EntryBatchSizes batc
}

private void incrementUnackedMessages(int ackedMessages) {
if (Subscription.isIndividualAckMode(subType)
&& addAndGetUnAckedMsgs(this, ackedMessages) >= maxUnackedMessages
if (addAndGetUnAckedMsgs(this, ackedMessages) >= maxUnackedMessages
&& maxUnackedMessages > 0) {
blockedConsumerOnUnackedMsgs = true;
}
Expand Down Expand Up @@ -388,6 +401,7 @@ public CompletableFuture<Void> messageAcked(CommandAck ack) {
List<Position> positionsAcked = Collections.singletonList(position);
subscription.acknowledgeMessage(positionsAcked, AckType.Cumulative, properties);
removePendingAcks(position, AckType.Cumulative);
messageAckedCumulativeCounter.increment();
return CompletableFuture.completedFuture(null);
}
} else {
Expand All @@ -404,6 +418,7 @@ private CompletableFuture<Void> individualAckNormal(CommandAck ack, Map<String,
List<Position> positionsAcked = new ArrayList<>();

for (int i = 0; i < ack.getMessageIdsCount(); i++) {
messageAckedIndvidualCounter.increment();
MessageIdData msgId = ack.getMessageIdAt(i);
PositionImpl position;
if (msgId.getAckSetsCount() > 0) {
Expand Down Expand Up @@ -505,8 +520,8 @@ private void checkAckValidationError(CommandAck ack, PositionImpl position) {
}

private void checkCanRemovePendingAcksAndHandle(PositionImpl position, MessageIdData msgId) {
if (Subscription.isIndividualAckMode(subType) && msgId.getAckSetsCount() == 0) {
removePendingAcks(position);
if (msgId.getAckSetsCount() == 0) {
removePendingAcks(position, AckType.Individual);
}
}

Expand Down Expand Up @@ -656,6 +671,12 @@ public ConsumerStatsImpl getStats() {
if (readPositionWhenJoining != null) {
stats.readPositionWhenJoining = readPositionWhenJoining.toString();
}
stats.pendingAcks = pendingAcks.size();
stats.pendingAckTimesAndMsgSizes = pendingAckTimesAndMsgSizes.size();
stats.messageAckedIndividual = messageAckedIndvidualCounter.longValue();
stats.messageAckedCumulative = messageAckedCumulativeCounter.longValue();
stats.pendingAcksRemoved = pendingAckRemovedCounter.longValue();
stats.pendingTimeAndSizeRemoved = pendingAckTimesAndSizesRemovedCounter.longValue();
return stats;
}

Expand Down Expand Up @@ -735,6 +756,7 @@ private void removePendingAcks(PositionImpl position, AckType ackType) {
}

ConcurrentLongLongPairHashMap pendingAcks = ackOwnedConsumer.getPendingAcks();
ConcurrentLongLongPairHashMap pendingAckTimesAndMsgSizes = ackOwnedConsumer.getPendingAckTimeAndMsgSizes();
// remove pending message from appropriate consumer and unblock unAckMsg-flow if requires
LongPair ackedPosition = ackOwnedConsumer != null
? pendingAcks.get(position.getLedgerId(), position.getEntryId())
Expand Down Expand Up @@ -771,13 +793,14 @@ private void removePendingAcks(PositionImpl position, AckType ackType) {
}
}
});

pendingAckRemovedCounter.add(ackedPositions.size());
pendingAckTimesAndSizesRemovedCounter.add(ackedPositions.size());
// remove ack-ed positions
ackedPositions.forEach(pos -> pendingAcks.remove(pos.getLedgerId(), pos.getEntryId()));
ackedPositions.forEach(pos -> pendingAckTimesAndMsgSizes.remove(pos.getLedgerId(), pos.getEntryId()));

// process current ack-ed position
recordMetricAndRemovePendingAck(position);
recordMetricAndRemovePendingAck(position, pendingAcks, pendingAckTimesAndMsgSizes);
} else { // for individual ack
int totalAckedMsgs = (int) ackedPosition.first;
if (!pendingAcks.containsKey(position.getLedgerId(), position.getEntryId())) {
Expand All @@ -790,6 +813,10 @@ private void removePendingAcks(PositionImpl position, AckType ackType) {
if (log.isDebugEnabled()) {
log.debug("[{}-{}] consumer {} received ack {}", topicName, subscription, consumerId, position);
}

// process current ack-ed position
recordMetricAndRemovePendingAck(position, pendingAcks, pendingAckTimesAndMsgSizes);

// unblock consumer-throttling when limit check is disabled or receives half of maxUnackedMessages =>
// consumer can start again consuming messages
int unAckedMsgs = addAndGetUnAckedMsgs(ackOwnedConsumer, -totalAckedMsgs);
Expand All @@ -799,23 +826,25 @@ private void removePendingAcks(PositionImpl position, AckType ackType) {
ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false;
flowConsumerBlockedPermits(ackOwnedConsumer);
}
// process current ack-ed position
recordMetricAndRemovePendingAck(position);
}
}

private void recordMetricAndRemovePendingAck(PositionImpl position) {
private void recordMetricAndRemovePendingAck(PositionImpl position, ConcurrentLongLongPairHashMap pendingAcks,
ConcurrentLongLongPairHashMap pendingAckTimesAndMsgSizes) {
LongPair batchSizeAndStickyKey = pendingAcks.get(position.getLedgerId(), position.getEntryId());
if (batchSizeAndStickyKey == null) {
log.warn("[{}-{}] consumer {} no entry batchSize and stickKey found for current position {}:{}",
topicName, subscription, consumerName, position.getLedgerId(), position.getEntryId());
pendingAckTimesAndMsgSizes.remove(position.getLedgerId(), position.getEntryId());
pendingAckTimesAndSizesRemovedCounter.increment();
return;
}
LongPair timeAndMsgSize =
pendingAckTimesAndMsgSizes.get(position.getLedgerId(), position.getEntryId());
if (timeAndMsgSize == null) {
log.warn("[{}-{}] consumer {} no entry timestamp and msgSize found for current position {}:{}",
topicName, subscription, consumerName, position.getLedgerId(), position.getEntryId());
pendingAcks.remove(position.getLedgerId(), position.getEntryId());
return;
}
// process ack-ed position, first for the batch messages
Expand All @@ -831,6 +860,8 @@ private void recordMetricAndRemovePendingAck(PositionImpl position) {
long entryMsgSize = timeAndMsgSize.second;
pendingAcks.remove(position.getLedgerId(), position.getEntryId());
pendingAckTimesAndMsgSizes.remove(position.getLedgerId(), position.getEntryId());
pendingAckRemovedCounter.increment();
pendingAckTimesAndSizesRemovedCounter.increment();
TDBankMetricsManager.record("CONSUMER", stats.getAddress(), getSubscription().getName(),
topicName, timestamp, batchSize, 1, entryMsgSize);
}
Expand All @@ -841,6 +872,9 @@ private void recordMetricAndRemovePendingAck(PositionImpl position) {
long timestamp = timeAndMsgSize.first;
long entryMsgSize = timeAndMsgSize.second;
pendingAcks.remove(position.getLedgerId(), position.getEntryId());
pendingAckTimesAndMsgSizes.remove(position.getLedgerId(), position.getEntryId());
pendingAckRemovedCounter.increment();
pendingAckTimesAndSizesRemovedCounter.increment();
TDBankMetricsManager.record("CONSUMER", stats.getAddress(), getSubscription().getName(),
topicName, timestamp, ackedMsgs, 1, entryMsgSize);
}
Expand Down
Loading

0 comments on commit fcb57c3

Please sign in to comment.