Skip to content

Commit

Permalink
Fix msg-acknowledge counter at client-stats (#36)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia authored and merlimat committed Sep 27, 2016
1 parent d7d5163 commit bea1087
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 53 deletions.
Expand Up @@ -71,6 +71,8 @@ public void testSyncProducerAndConsumer(int batchMessageDelayMs, int ackTimeoutS
log.info("-- Starting {} test --", methodName);
ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setSubscriptionType(SubscriptionType.Exclusive);
// Cumulative Ack-counter works if ackTimeOutTimer-task is enabled
boolean isAckTimeoutTaskEnabledForCumulativeAck = ackTimeoutSec > 0;
if (ackTimeoutSec > 0) {
conf.setAckTimeout(ackTimeoutSec, TimeUnit.SECONDS);
}
Expand Down Expand Up @@ -107,14 +109,16 @@ public void testSyncProducerAndConsumer(int batchMessageDelayMs, int ackTimeoutS
Thread.sleep(2000);
consumer.close();
producer.close();
validatingLogInfo(consumer, producer);
validatingLogInfo(consumer, producer, isAckTimeoutTaskEnabledForCumulativeAck);
log.info("-- Exiting {} test --", methodName);
}

@Test(dataProvider = "batch_with_timeout")
public void testAsyncProducerAndAsyncAck(int batchMessageDelayMs, int ackTimeoutSec) throws Exception {
log.info("-- Starting {} test --", methodName);
ConsumerConfiguration conf = new ConsumerConfiguration();
// Cumulative Ack-counter works if ackTimeOutTimer-task is enabled
boolean isAckTimeoutTaskEnabledForCumulativeAck = ackTimeoutSec > 0;
conf.setSubscriptionType(SubscriptionType.Exclusive);
if (ackTimeoutSec > 0) {
conf.setAckTimeout(ackTimeoutSec, TimeUnit.SECONDS);
Expand Down Expand Up @@ -165,7 +169,7 @@ public void testAsyncProducerAndAsyncAck(int batchMessageDelayMs, int ackTimeout
Thread.sleep(2000);
consumer.close();
producer.close();
validatingLogInfo(consumer, producer);
validatingLogInfo(consumer, producer, isAckTimeoutTaskEnabledForCumulativeAck);
log.info("-- Exiting {} test --", methodName);
}

Expand All @@ -174,6 +178,8 @@ public void testAsyncProducerAndReceiveAsyncAndAsyncAck(int batchMessageDelayMs,
throws Exception {
log.info("-- Starting {} test --", methodName);
ConsumerConfiguration conf = new ConsumerConfiguration();
// Cumulative Ack-counter works if ackTimeOutTimer-task is enabled
boolean isAckTimeoutTaskEnabledForCumulativeAck = ackTimeoutSec > 0;
conf.setSubscriptionType(SubscriptionType.Exclusive);
if (ackTimeoutSec > 0) {
conf.setAckTimeout(ackTimeoutSec, TimeUnit.SECONDS);
Expand Down Expand Up @@ -224,14 +230,15 @@ public void testAsyncProducerAndReceiveAsyncAndAsyncAck(int batchMessageDelayMs,
Thread.sleep(5000);
consumer.close();
producer.close();
validatingLogInfo(consumer, producer);
validatingLogInfo(consumer, producer, isAckTimeoutTaskEnabledForCumulativeAck);
log.info("-- Exiting {} test --", methodName);
}

@Test(dataProvider = "batch", timeOut = 100000)
public void testMessageListener(int batchMessageDelayMs) throws Exception {
log.info("-- Starting {} test --", methodName);
ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setAckTimeout(100, TimeUnit.SECONDS);
conf.setSubscriptionType(SubscriptionType.Exclusive);

int numMessages = 100;
Expand Down Expand Up @@ -272,7 +279,7 @@ public void testMessageListener(int batchMessageDelayMs) throws Exception {
assertEquals(latch.await(numMessages, TimeUnit.SECONDS), true, "Timed out waiting for message listener acks");
consumer.close();
producer.close();
validatingLogInfo(consumer, producer);
validatingLogInfo(consumer, producer, true);
log.info("-- Exiting {} test --", methodName);
}

Expand Down Expand Up @@ -324,15 +331,17 @@ public void testSendTimeout(int batchMessageDelayMs) throws Exception {
log.info("-- Exiting {} test --", methodName);
}

public void validatingLogInfo(Consumer consumer, Producer producer) throws InterruptedException {
public void validatingLogInfo(Consumer consumer, Producer producer, boolean verifyAckCount) throws InterruptedException {
// Waiting for recording last stat info
Thread.sleep(1000);
ConsumerStats cStat = consumer.getStats();
ProducerStats pStat = producer.getStats();
assertEquals(pStat.getTotalMsgsSent(), cStat.getTotalMsgsReceived());
assertEquals(pStat.getTotalBytesSent(), cStat.getTotalBytesReceived());
assertEquals(pStat.getTotalMsgsSent(), pStat.getTotalAcksReceived());
assertEquals(cStat.getTotalMsgsReceived(), cStat.getTotalAcksSent());
if (verifyAckCount) {
assertEquals(cStat.getTotalMsgsReceived(), cStat.getTotalAcksSent());
}
}

}
Expand Up @@ -23,6 +23,8 @@
import java.util.BitSet;
import java.util.NavigableMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -287,6 +289,7 @@ boolean markAckForBatchMessage(BatchMessageIdImpl batchMessageId, AckType ackTyp
return true;
}
int batchIndex = batchMessageId.getBatchIndex();
int batchSize = bitSet.length();
if (ackType == AckType.Individual) {
bitSet.clear(batchIndex);
} else {
Expand All @@ -303,6 +306,11 @@ boolean markAckForBatchMessage(BatchMessageIdImpl batchMessageId, AckType ackTyp
batchMessageAckTracker.keySet().removeIf(m -> (m.compareTo(message) <= 0));
}
batchMessageAckTracker.remove(message);
// increment Acknowledge-msg counter with number of messages in batch only if AckType is Individual.
// CumulativeAckType is handled while sending ack to broker
if (ackType == AckType.Individual) {
stats.incrementNumAcksSent(batchSize);
}
return true;
} else {
// we cannot ack this message to broker. but prior message may be ackable
Expand Down Expand Up @@ -396,16 +404,17 @@ public void operationComplete(Future<Void> future) throws Exception {
if (unAckedMessageTracker != null) {
unAckedMessageTracker.remove(msgId);
}
stats.incrementNumAcksSent(stats.getNumAcksTrackerSumThenReset());
// increment counter by 1 for non-batch msg
if (!(messageId instanceof BatchMessageIdImpl)) {
stats.incrementNumAcksSent(1);
}
} else if (ackType == AckType.Cumulative) {
if (unAckedMessageTracker != null) {
stats.incrementNumAcksSent(unAckedMessageTracker.removeMessagesTill(msgId));
} else {
stats.incrementNumAcksSent(stats.getNumAcksTrackerSumThenReset());
int ackedMessages = unAckedMessageTracker.removeMessagesTill(msgId);
stats.incrementNumAcksSent(ackedMessages);
}
}
ackFuture.complete(null);
stats.resetNumAckTracker();
} else {
stats.incrementNumAcksFailed();
ackFuture.completeExceptionally(new PulsarClientException(future.cause()));
Expand Down Expand Up @@ -638,8 +647,6 @@ void messageReceived(MessageIdData messageId, ByteBuf headersAndPayload, ClientC
msgMetadata.recycle();
}

stats.incrementNumAcksTracker(numMessages);

if (listener != null) {
// Trigger the notification on the message listener in a separate thread to avoid blocking the networking
// thread while the message processing happens
Expand Down
Expand Up @@ -47,7 +47,6 @@ public class ConsumerStats implements Serializable {
private final LongAdder numReceiveFailed;
private final LongAdder numAcksSent;
private final LongAdder numAcksFailed;
private final LongAdder numAcksTracker;
private final LongAdder totalMsgsReceived;
private final LongAdder totalBytesReceived;
private final LongAdder totalReceiveFailed;
Expand All @@ -64,7 +63,6 @@ public ConsumerStats() {
numReceiveFailed = null;
numAcksSent = null;
numAcksFailed = null;
numAcksTracker = null;
totalMsgsReceived = null;
totalBytesReceived = null;
totalReceiveFailed = null;
Expand All @@ -82,7 +80,6 @@ public ConsumerStats(PulsarClientImpl pulsarClient, ConsumerConfiguration conf,
numReceiveFailed = new LongAdder();
numAcksSent = new LongAdder();
numAcksFailed = new LongAdder();
numAcksTracker = new LongAdder();
totalMsgsReceived = new LongAdder();
totalBytesReceived = new LongAdder();
totalReceiveFailed = new LongAdder();
Expand Down Expand Up @@ -156,10 +153,6 @@ void updateNumMsgsReceived(Message message) {
}
}

void incrementNumAcksSent() {
numAcksSent.increment();
}

void incrementNumAcksSent(long numAcks) {
numAcksSent.add(numAcks);
}
Expand All @@ -176,18 +169,6 @@ Timeout getStatTimeout() {
return statTimeout;
}

void resetNumAckTracker() {
numAcksTracker.reset();
}

void incrementNumAcksTracker(final int numMessages) {
numAcksTracker.add(numMessages);
}

long getNumAcksTrackerSumThenReset() {
return numAcksTracker.sumThenReset();
}

void reset() {
numMsgsReceived.reset();
numBytesReceived.reset();
Expand Down
Expand Up @@ -30,11 +30,6 @@ void incrementNumReceiveFailed() {
// Do nothing
}

@Override
void incrementNumAcksSent() {
// Do nothing
}

@Override
void incrementNumAcksSent(long numAcks) {
// Do nothing
Expand All @@ -45,20 +40,4 @@ void incrementNumAcksFailed() {
// Do nothing
}

@Override
void resetNumAckTracker() {
// Do nothing
}

@Override
void incrementNumAcksTracker(final int numMessages) {
// Do nothing
}

@Override
long getNumAcksTrackerSumThenReset() {
// Do nothing
return -1;
}

}

0 comments on commit bea1087

Please sign in to comment.