From e834938c50b4f9ee9497b51c21c86fac5e82bc45 Mon Sep 17 00:00:00 2001 From: richardstartin Date: Wed, 22 Dec 2021 11:51:54 +0000 Subject: [PATCH] realtime debug logging --- .../LLRealtimeSegmentDataManager.java | 25 ++++++++++++++++--- .../kafka20/KafkaPartitionLevelConsumer.java | 12 +++++++++ 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index f463e0b0287..b56a7945fe3 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -397,6 +397,11 @@ protected boolean consumeLoop() try { messageBatch = _partitionGroupConsumer .fetchMessages(_currentOffset, null, _partitionLevelStreamConfig.getFetchTimeoutMillis()); + if (_segmentLogger.isDebugEnabled()) { + _segmentLogger.debug("message batch received. filter={} unfiltered={} endOfPartitionGroup={}", + messageBatch.getUnfilteredMessageCount(), messageBatch.getMessageCount(), + messageBatch.isEndOfPartitionGroup()); + } _endOfPartitionGroup = messageBatch.isEndOfPartitionGroup(); _consecutiveErrorCount = 0; } catch (PermanentConsumerException e) { @@ -426,8 +431,12 @@ protected boolean consumeLoop() } else if (messageBatch.getUnfilteredMessageCount() > 0) { // we consumed something from the stream but filtered all the content out, // so we need to advance the offsets to avoid getting stuck - _currentOffset = messageBatch.getOffsetOfNextBatch(); - lastUpdatedOffset = _streamPartitionMsgOffsetFactory.create(_currentOffset); + StreamPartitionMsgOffset nextOffset = messageBatch.getOffsetOfNextBatch(); + if (_segmentLogger.isDebugEnabled()) { + _segmentLogger.debug("Skipped empty batch. Advancing from {} to {}", _currentOffset, nextOffset); + } + _currentOffset = nextOffset; + lastUpdatedOffset = _streamPartitionMsgOffsetFactory.create(nextOffset); } else { // We did not consume any rows. Update the partition-consuming metric only if we have been idling for a long // time. @@ -459,6 +468,9 @@ private void processStreamEvents(MessageBatch messagesAndOffsets, long idlePipeS GenericRow reuse = new GenericRow(); for (int index = 0; index < messagesAndOffsets.getMessageCount(); index++) { if (_shouldStop || endCriteriaReached()) { + if (_segmentLogger.isDebugEnabled()) { + _segmentLogger.debug("stop processing message batch early shouldStop: {}", _shouldStop); + } break; } if (!canTakeMore) { @@ -556,9 +568,14 @@ private void processStreamEvents(MessageBatch messagesAndOffsets, long idlePipeS } updateCurrentDocumentCountMetrics(); if (streamMessageCount != 0) { - _segmentLogger.debug("Indexed {} messages ({} messages read from stream) current offset {}", indexedMessageCount, - streamMessageCount, _currentOffset); + if (_segmentLogger.isDebugEnabled()) { + _segmentLogger.debug("Indexed {} messages ({} messages read from stream) current offset {}", + indexedMessageCount, streamMessageCount, _currentOffset); + } } else if (messagesAndOffsets.getUnfilteredMessageCount() == 0) { + if (_segmentLogger.isDebugEnabled()) { + _segmentLogger.debug("empty batch received - sleeping for {}ms", idlePipeSleepTimeMillis); + } // If there were no messages to be fetched from stream, wait for a little bit as to avoid hammering the stream Uninterruptibles.sleepUninterruptibly(idlePipeSleepTimeMillis, TimeUnit.MILLISECONDS); } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java index 68bbc9e49f3..a1e0cc8df74 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java @@ -30,11 +30,15 @@ import org.apache.pinot.spi.stream.PartitionLevelConsumer; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class KafkaPartitionLevelConsumer extends KafkaPartitionLevelConnectionHandler implements PartitionLevelConsumer { + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPartitionLevelConsumer.class); + public KafkaPartitionLevelConsumer(String clientId, StreamConfig streamConfig, int partition) { super(clientId, streamConfig, partition); } @@ -48,6 +52,10 @@ public MessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffse } public MessageBatch fetchMessages(long startOffset, long endOffset, int timeoutMillis) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("poll consumer: {}, startOffset: {}, endOffset:{} timeout: {}ms", _topicPartition, startOffset, + endOffset, timeoutMillis); + } _consumer.seek(_topicPartition, startOffset); ConsumerRecords consumerRecords = _consumer.poll(Duration.ofMillis(timeoutMillis)); List> messageAndOffsets = consumerRecords.records(_topicPartition); @@ -59,8 +67,12 @@ public MessageBatch fetchMessages(long startOffset, long endOffset, int if (offset >= startOffset & (endOffset > offset | endOffset == -1)) { if (message != null) { filtered.add(new MessageAndOffset(message.get(), offset)); + } else if (LOGGER.isDebugEnabled()) { + LOGGER.debug("tombstone message at offset {}", offset); } lastOffset = offset; + } else if (LOGGER.isDebugEnabled()) { + LOGGER.debug("filter message at offset {} (outside of offset range {} {})", offset, startOffset, endOffset); } } return new KafkaMessageBatch(messageAndOffsets.size(), lastOffset, filtered);