Skip to content

Commit

Permalink
add msgAck and redelivery debug-logs And add topic-name in debug-logs (
Browse files Browse the repository at this point in the history
…#714)

* Add msgAck and redelivery debug-logs And add topic-name in debug-logs
  • Loading branch information
rdhabalia committed Aug 30, 2017
1 parent 3751baf commit df93a42
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 43 deletions.
Expand Up @@ -65,6 +65,7 @@ public class Consumer {
private final SubType subType;
private final ServerCnx cnx;
private final String appId;
private final String topicName;

private final long consumerId;
private final int priorityLevel;
Expand Down Expand Up @@ -95,11 +96,12 @@ public class Consumer {
private volatile int unackedMessages = 0;
private volatile boolean blockedConsumerOnUnackedMsgs = false;

public Consumer(Subscription subscription, SubType subType, long consumerId, int priorityLevel, String consumerName,
public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId, int priorityLevel, String consumerName,
int maxUnackedMessages, ServerCnx cnx, String appId) throws BrokerServiceException {

this.subscription = subscription;
this.subType = subType;
this.topicName = topicName;
this.consumerId = consumerId;
this.priorityLevel = priorityLevel;
this.consumerName = consumerName;
Expand Down Expand Up @@ -151,8 +153,8 @@ public SendMessageInfo sendMessages(final List<Entry> entries) {
sentMessages.channelPromse = writePromise;
if (entries.isEmpty()) {
if (log.isDebugEnabled()) {
log.debug("[{}] List of messages is empty, triggering write future immediately for consumerId {}",
subscription, consumerId);
log.debug("[{}-{}] List of messages is empty, triggering write future immediately for consumerId {}",
topicName, subscription, consumerId);
}
writePromise.setSuccess();
sentMessages.totalSentMessages = 0;
Expand Down Expand Up @@ -192,8 +194,8 @@ public SendMessageInfo sendMessages(final List<Entry> entries) {
}

if (log.isDebugEnabled()) {
log.debug("[{}] Sending message to consumerId {}, entry id {}", subscription, consumerId,
pos.getEntryId());
log.debug("[{}-{}] Sending message to consumerId {}, entry id {}", topicName, subscription,
consumerId, pos.getEntryId());
}

// We only want to pass the "real" promise on the last entry written
Expand Down Expand Up @@ -273,7 +275,8 @@ void updatePermitsAndPendingAcks(final List<Entry> entries, SendMessageInfo sent
}
if (permits < 0) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] message permits dropped below 0 - {}", subscription, consumerId, permits);
log.debug("[{}-{}] [{}] message permits dropped below 0 - {}", topicName, subscription, consumerId,
permits);
}
}

Expand Down Expand Up @@ -363,8 +366,8 @@ void flowPermits(int additionalNumberOfMessages) {
}

if (log.isDebugEnabled()) {
log.debug("[{}] Added more flow control message permits {} (old was: {})", this, additionalNumberOfMessages,
oldPermits);
log.debug("[{}-{}] Added more flow control message permits {} (old was: {}), blocked = ", topicName,
subscription, additionalNumberOfMessages, oldPermits, blockedConsumerOnUnackedMsgs);
}

}
Expand Down Expand Up @@ -497,6 +500,9 @@ private void removePendingAcks(PositionImpl position) {
if (ackOwnedConsumer != null) {
int totalAckedMsgs = (int) ackOwnedConsumer.getPendingAcks().get(position.getLedgerId(), position.getEntryId()).first;
ackOwnedConsumer.getPendingAcks().remove(position.getLedgerId(), position.getEntryId());
if (log.isDebugEnabled()) {
log.debug("[{}-{}] consumer {} received ack {}", topicName, subscription, consumerId, position);
}
// unblock consumer-throttling when receives half of maxUnackedMessages => consumer can start again
// consuming messages
if (((addAndGetUnAckedMsgs(ackOwnedConsumer, -totalAckedMsgs) <= (maxUnackedMessages / 2))
Expand All @@ -520,6 +526,9 @@ public void redeliverUnacknowledgedMessages() {
// cleanup unackedMessage bucket and redeliver those unack-msgs again
clearUnAckedMsgs(this);
blockedConsumerOnUnackedMsgs = false;
if (log.isDebugEnabled()) {
log.debug("[{}-{}] consumer {} received redelivery", topicName, subscription, consumerId);
}
// redeliver unacked-msgs
subscription.redeliverUnacknowledgedMessages(this);
flowConsumerBlockedPermits(this);
Expand Down Expand Up @@ -550,6 +559,11 @@ public void redeliverUnacknowledgedMessages(List<MessageIdData> messageIds) {
addAndGetUnAckedMsgs(this, -totalRedeliveryMessages);
blockedConsumerOnUnackedMsgs = false;

if (log.isDebugEnabled()) {
log.debug("[{}-{}] consumer {} received {} msg-redelivery {}", topicName, subscription, consumerId,
totalRedeliveryMessages, pendingPositions.size());
}

subscription.redeliverUnacknowledgedMessages(this, pendingPositions);
msgRedeliver.recordMultipleEvents(totalRedeliveryMessages, totalRedeliveryMessages);

Expand Down
Expand Up @@ -625,7 +625,12 @@ protected void handleFlow(CommandFlow flow) {
CompletableFuture<Consumer> consumerFuture = consumers.get(flow.getConsumerId());

if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
consumerFuture.getNow(null).flowPermits(flow.getMessagePermits());
Consumer consumer = consumerFuture.getNow(null);
if (consumer != null) {
consumer.flowPermits(flow.getMessagePermits());
} else {
log.info("[{}] Couldn't find consumer {}", remoteAddress, flow.getConsumerId());
}
}
}

Expand Down
Expand Up @@ -321,7 +321,7 @@ public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscri
name -> new NonPersistentSubscription(this, subscriptionName));

try {
Consumer consumer = new Consumer(subscription, subType, consumerId, priorityLevel, consumerName, 0, cnx,
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0, cnx,
cnx.getRole());
subscription.addConsumer(consumer);
if (!cnx.isActive()) {
Expand Down
Expand Up @@ -163,7 +163,8 @@ public synchronized void consumerFlow(Consumer consumer, int additionalNumberOfM

totalAvailablePermits += additionalNumberOfMessages;
if (log.isDebugEnabled()) {
log.debug("[{}] Trigger new read after receiving flow control message", consumer);
log.debug("[{}-{}] Trigger new read after receiving flow control message with permits {}", name, consumer,
totalAvailablePermits);
}
readMoreEntries();
}
Expand Down Expand Up @@ -331,6 +332,7 @@ public synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
Consumer c = getNextConsumer();
if (c == null) {
// Do nothing, cursor will be rewind at reconnection
log.info("[{}] rewind because no available consumer found from total {}", name, consumerList.size());
entries.subList(start, entries.size()).forEach(Entry::release);
cursor.rewind();
return;
Expand Down Expand Up @@ -460,7 +462,7 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) {
messagesToReplay.add(ledgerId, entryId);
});
if (log.isDebugEnabled()) {
log.debug("[{}] Redelivering unacknowledged messages for consumer {}", consumer, messagesToReplay);
log.debug("[{}-{}] Redelivering unacknowledged messages for consumer {}", name, consumer, messagesToReplay);
}
readMoreEntries();
}
Expand All @@ -469,7 +471,7 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) {
public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions) {
positions.forEach(position -> messagesToReplay.add(position.getLedgerId(), position.getEntryId()));
if (log.isDebugEnabled()) {
log.debug("[{}] Redelivering unacknowledged messages for consumer {}", consumer, positions);
log.debug("[{}-{}] Redelivering unacknowledged messages for consumer {}", name, consumer, positions);
}
readMoreEntries();
}
Expand Down
Expand Up @@ -37,13 +37,15 @@
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcherSingleActiveConsumer implements Dispatcher, ReadEntriesCallback {

private final PersistentTopic topic;
private final ManagedCursor cursor;
private final String name;

private boolean havePendingRead = false;

Expand All @@ -55,6 +57,8 @@ public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType su
PersistentTopic topic) {
super(subscriptionType, partitionIndex, topic.getName());
this.topic = topic;
this.name = topic.getName() + " / " + (cursor.getName() != null ? Codec.decode(cursor.getName())
: ""/* NonDurableCursor doesn't have name */);
this.cursor = cursor;
this.readBatchSize = MaxReadBatchSize;
}
Expand Down Expand Up @@ -82,15 +86,15 @@ protected void cancelPendingRead() {
public synchronized void readEntriesComplete(final List<Entry> entries, Object obj) {
Consumer readConsumer = (Consumer) obj;
if (log.isDebugEnabled()) {
log.debug("[{}] Got messages: {}", readConsumer, entries.size());
log.debug("[{}-{}] Got messages: {}", name, readConsumer, entries.size());
}

havePendingRead = false;

if (readBatchSize < MaxReadBatchSize) {
int newReadBatchSize = Math.min(readBatchSize * 2, MaxReadBatchSize);
if (log.isDebugEnabled()) {
log.debug("[{}] Increasing read batch size from {} to {}", readConsumer, readBatchSize,
log.debug("[{}-{}] Increasing read batch size from {} to {}", name, readConsumer, readBatchSize,
newReadBatchSize);
}

Expand All @@ -103,6 +107,9 @@ public synchronized void readEntriesComplete(final List<Entry> entries, Object o
if (currentConsumer == null || readConsumer != currentConsumer) {
// Active consumer has changed since the read request has been issued. We need to rewind the cursor and
// re-issue the read request for the new consumer
if (log.isDebugEnabled()) {
log.debug("[{}] rewind because no available consumer found", name);
}
entries.forEach(Entry::release);
cursor.rewind();
if (currentConsumer != null) {
Expand All @@ -124,8 +131,8 @@ public synchronized void readEntriesComplete(final List<Entry> entries, Object o
} else {
if (log.isDebugEnabled()) {
log.debug(
"[{}] Ignoring write future complete. consumerAvailable={} havePendingRead={}",
newConsumer, newConsumer != null, havePendingRead);
"[{}-{}] Ignoring write future complete. consumerAvailable={} havePendingRead={}",
name, newConsumer, newConsumer != null, havePendingRead);
}
}
}
Expand All @@ -139,18 +146,18 @@ public synchronized void consumerFlow(Consumer consumer, int additionalNumberOfM
if (!havePendingRead) {
if (ACTIVE_CONSUMER_UPDATER.get(this) == consumer) {
if (log.isDebugEnabled()) {
log.debug("[{}] Trigger new read after receiving flow control message", consumer);
log.debug("[{}-{}] Trigger new read after receiving flow control message", name, consumer);
}
readMoreEntries(consumer);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Ignoring flow control message since consumer is not active partition consumer",
consumer);
log.debug("[{}-{}] Ignoring flow control message since consumer is not active partition consumer",
name, consumer);
}
}
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Ignoring flow control message since we already have a pending read req", consumer);
log.debug("[{}-{}] Ignoring flow control message since we already have a pending read req", name, consumer);
}
}
}
Expand All @@ -168,11 +175,12 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) {
if (!havePendingRead) {
cursor.rewind();
if (log.isDebugEnabled()) {
log.debug("[{}] Cursor rewinded, redelivering unacknowledged messages. ", consumer);
log.debug("[{}-{}] Cursor rewinded, redelivering unacknowledged messages. ", name, consumer);
}
readMoreEntries(consumer);
} else {
log.info("[{}] Ignoring reDeliverUnAcknowledgedMessages: cancelPendingRequest on cursor failed", consumer);
log.info("[{}-{}] Ignoring reDeliverUnAcknowledgedMessages: cancelPendingRequest on cursor failed", name,
consumer);
}

}
Expand Down Expand Up @@ -207,7 +215,7 @@ protected void readMoreEntries(Consumer consumer) {
if (!rateLimiter.hasMessageDispatchPermit()) {
if (log.isDebugEnabled()) {
log.debug("[{}] message-read exceeded message-rate {}/{}, schedule after a {}",
topic.getName(), rateLimiter.getDispatchRateOnMsg(), rateLimiter.getDispatchRateOnByte(),
name, rateLimiter.getDispatchRateOnMsg(), rateLimiter.getDispatchRateOnByte(),
MESSAGE_RATE_BACKOFF_MS);
}
topic.getBrokerService().executor().schedule(() -> {
Expand Down Expand Up @@ -235,13 +243,13 @@ protected void readMoreEntries(Consumer consumer) {

// Schedule read
if (log.isDebugEnabled()) {
log.debug("[{}] Schedule read of {} messages", consumer, messagesToRead);
log.debug("[{}-{}] Schedule read of {} messages", name, consumer, messagesToRead);
}
havePendingRead = true;
cursor.asyncReadEntriesOrWait(messagesToRead, this, consumer);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Consumer buffer is full, pause reading", consumer);
log.debug("[{}-{}] Consumer buffer is full, pause reading", name, consumer);
}
}
}
Expand All @@ -261,12 +269,12 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj
consumers.forEach(Consumer::reachedEndOfTopic);
}
} else if (!(exception instanceof TooManyRequestsException)) {
log.error("[{}] Error reading entries at {} : {} - Retrying to read in {} seconds", c,
log.error("[{}-{}] Error reading entries at {} : {} - Retrying to read in {} seconds", name, c,
cursor.getReadPosition(), exception.getMessage(), waitTimeMillis / 1000.0);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Got throttled by bookies while reading at {} : {} - Retrying to read in {} seconds", c,
cursor.getReadPosition(), exception.getMessage(), waitTimeMillis / 1000.0);
log.debug("[{}-{}] Got throttled by bookies while reading at {} : {} - Retrying to read in {} seconds",
name, c, cursor.getReadPosition(), exception.getMessage(), waitTimeMillis / 1000.0);
}
}

Expand All @@ -281,12 +289,12 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj
// we should retry the read if we have an active consumer and there is no pending read
if (currentConsumer != null && !havePendingRead) {
if (log.isDebugEnabled()) {
log.debug("[{}] Retrying read operation", c);
log.debug("[{}-{}] Retrying read operation", name, c);
}
readMoreEntries(currentConsumer);
} else {
log.info("[{}] Skipping read retry: Current Consumer {}, havePendingRead {}", c, currentConsumer,
havePendingRead);
log.info("[{}-{}] Skipping read retry: Current Consumer {}, havePendingRead {}", name, c,
currentConsumer, havePendingRead);
}
}
}, waitTimeMillis, TimeUnit.MILLISECONDS);
Expand Down
Expand Up @@ -423,7 +423,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
} else {
if (log.isDebugEnabled()) {
log.debug("[{}][{} -> {}] Throttled by bookies while reading at {}. Retrying to read in {}s. ({})",
topic, localCluster, remoteCluster, ctx, waitTimeMillis / 1000.0, exception.getMessage(),
topicName, localCluster, remoteCluster, ctx, waitTimeMillis / 1000.0, exception.getMessage(),
exception);
}
}
Expand Down
Expand Up @@ -404,7 +404,7 @@ public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscri

subscriptionFuture.thenAccept(subscription -> {
try {
Consumer consumer = new Consumer(subscription, subType, consumerId, priorityLevel, consumerName,
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName,
maxUnackedMessages, cnx, cnx.getRole());
subscription.addConsumer(consumer);
if (!cnx.isActive()) {
Expand Down

0 comments on commit df93a42

Please sign in to comment.