Skip to content

Commit

Permalink
realtime debug logging
Browse files Browse the repository at this point in the history
  • Loading branch information
richardstartin committed Dec 22, 2021
1 parent afd5115 commit e834938
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 4 deletions.
Expand Up @@ -397,6 +397,11 @@ protected boolean consumeLoop()
try {
messageBatch = _partitionGroupConsumer
.fetchMessages(_currentOffset, null, _partitionLevelStreamConfig.getFetchTimeoutMillis());
if (_segmentLogger.isDebugEnabled()) {
_segmentLogger.debug("message batch received. filter={} unfiltered={} endOfPartitionGroup={}",
messageBatch.getUnfilteredMessageCount(), messageBatch.getMessageCount(),
messageBatch.isEndOfPartitionGroup());
}
_endOfPartitionGroup = messageBatch.isEndOfPartitionGroup();
_consecutiveErrorCount = 0;
} catch (PermanentConsumerException e) {
Expand Down Expand Up @@ -426,8 +431,12 @@ protected boolean consumeLoop()
} else if (messageBatch.getUnfilteredMessageCount() > 0) {
// we consumed something from the stream but filtered all the content out,
// so we need to advance the offsets to avoid getting stuck
_currentOffset = messageBatch.getOffsetOfNextBatch();
lastUpdatedOffset = _streamPartitionMsgOffsetFactory.create(_currentOffset);
StreamPartitionMsgOffset nextOffset = messageBatch.getOffsetOfNextBatch();
if (_segmentLogger.isDebugEnabled()) {
_segmentLogger.debug("Skipped empty batch. Advancing from {} to {}", _currentOffset, nextOffset);
}
_currentOffset = nextOffset;
lastUpdatedOffset = _streamPartitionMsgOffsetFactory.create(nextOffset);
} else {
// We did not consume any rows. Update the partition-consuming metric only if we have been idling for a long
// time.
Expand Down Expand Up @@ -459,6 +468,9 @@ private void processStreamEvents(MessageBatch messagesAndOffsets, long idlePipeS
GenericRow reuse = new GenericRow();
for (int index = 0; index < messagesAndOffsets.getMessageCount(); index++) {
if (_shouldStop || endCriteriaReached()) {
if (_segmentLogger.isDebugEnabled()) {
_segmentLogger.debug("stop processing message batch early shouldStop: {}", _shouldStop);
}
break;
}
if (!canTakeMore) {
Expand Down Expand Up @@ -556,9 +568,14 @@ private void processStreamEvents(MessageBatch messagesAndOffsets, long idlePipeS
}
updateCurrentDocumentCountMetrics();
if (streamMessageCount != 0) {
_segmentLogger.debug("Indexed {} messages ({} messages read from stream) current offset {}", indexedMessageCount,
streamMessageCount, _currentOffset);
if (_segmentLogger.isDebugEnabled()) {
_segmentLogger.debug("Indexed {} messages ({} messages read from stream) current offset {}",
indexedMessageCount, streamMessageCount, _currentOffset);
}
} else if (messagesAndOffsets.getUnfilteredMessageCount() == 0) {
if (_segmentLogger.isDebugEnabled()) {
_segmentLogger.debug("empty batch received - sleeping for {}ms", idlePipeSleepTimeMillis);
}
// If there were no messages to be fetched from stream, wait for a little bit as to avoid hammering the stream
Uninterruptibles.sleepUninterruptibly(idlePipeSleepTimeMillis, TimeUnit.MILLISECONDS);
}
Expand Down
Expand Up @@ -30,11 +30,15 @@
import org.apache.pinot.spi.stream.PartitionLevelConsumer;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class KafkaPartitionLevelConsumer extends KafkaPartitionLevelConnectionHandler
implements PartitionLevelConsumer {

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPartitionLevelConsumer.class);

public KafkaPartitionLevelConsumer(String clientId, StreamConfig streamConfig, int partition) {
super(clientId, streamConfig, partition);
}
Expand All @@ -48,6 +52,10 @@ public MessageBatch<byte[]> fetchMessages(StreamPartitionMsgOffset startMsgOffse
}

public MessageBatch<byte[]> fetchMessages(long startOffset, long endOffset, int timeoutMillis) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("poll consumer: {}, startOffset: {}, endOffset:{} timeout: {}ms", _topicPartition, startOffset,
endOffset, timeoutMillis);
}
_consumer.seek(_topicPartition, startOffset);
ConsumerRecords<String, Bytes> consumerRecords = _consumer.poll(Duration.ofMillis(timeoutMillis));
List<ConsumerRecord<String, Bytes>> messageAndOffsets = consumerRecords.records(_topicPartition);
Expand All @@ -59,8 +67,12 @@ public MessageBatch<byte[]> fetchMessages(long startOffset, long endOffset, int
if (offset >= startOffset & (endOffset > offset | endOffset == -1)) {
if (message != null) {
filtered.add(new MessageAndOffset(message.get(), offset));
} else if (LOGGER.isDebugEnabled()) {
LOGGER.debug("tombstone message at offset {}", offset);
}
lastOffset = offset;
} else if (LOGGER.isDebugEnabled()) {
LOGGER.debug("filter message at offset {} (outside of offset range {} {})", offset, startOffset, endOffset);
}
}
return new KafkaMessageBatch(messageAndOffsets.size(), lastOffset, filtered);
Expand Down

0 comments on commit e834938

Please sign in to comment.