Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

realtime debug logging #7946

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -397,6 +397,11 @@ protected boolean consumeLoop()
try {
messageBatch = _partitionGroupConsumer
.fetchMessages(_currentOffset, null, _partitionLevelStreamConfig.getFetchTimeoutMillis());
if (_segmentLogger.isDebugEnabled()) {
_segmentLogger.debug("message batch received. filter={} unfiltered={} endOfPartitionGroup={}",
messageBatch.getUnfilteredMessageCount(), messageBatch.getMessageCount(),
messageBatch.isEndOfPartitionGroup());
}
_endOfPartitionGroup = messageBatch.isEndOfPartitionGroup();
_consecutiveErrorCount = 0;
} catch (PermanentConsumerException e) {
Expand Down Expand Up @@ -426,8 +431,12 @@ protected boolean consumeLoop()
} else if (messageBatch.getUnfilteredMessageCount() > 0) {
// we consumed something from the stream but filtered all the content out,
// so we need to advance the offsets to avoid getting stuck
_currentOffset = messageBatch.getOffsetOfNextBatch();
lastUpdatedOffset = _streamPartitionMsgOffsetFactory.create(_currentOffset);
StreamPartitionMsgOffset nextOffset = messageBatch.getOffsetOfNextBatch();
if (_segmentLogger.isDebugEnabled()) {
_segmentLogger.debug("Skipped empty batch. Advancing from {} to {}", _currentOffset, nextOffset);
}
_currentOffset = nextOffset;
lastUpdatedOffset = _streamPartitionMsgOffsetFactory.create(nextOffset);
} else {
// We did not consume any rows. Update the partition-consuming metric only if we have been idling for a long
// time.
Expand Down Expand Up @@ -459,6 +468,9 @@ private void processStreamEvents(MessageBatch messagesAndOffsets, long idlePipeS
GenericRow reuse = new GenericRow();
for (int index = 0; index < messagesAndOffsets.getMessageCount(); index++) {
if (_shouldStop || endCriteriaReached()) {
if (_segmentLogger.isDebugEnabled()) {
_segmentLogger.debug("stop processing message batch early shouldStop: {}", _shouldStop);
}
break;
}
if (!canTakeMore) {
Expand Down Expand Up @@ -556,9 +568,14 @@ private void processStreamEvents(MessageBatch messagesAndOffsets, long idlePipeS
}
updateCurrentDocumentCountMetrics();
if (streamMessageCount != 0) {
_segmentLogger.debug("Indexed {} messages ({} messages read from stream) current offset {}", indexedMessageCount,
streamMessageCount, _currentOffset);
if (_segmentLogger.isDebugEnabled()) {
_segmentLogger.debug("Indexed {} messages ({} messages read from stream) current offset {}",
indexedMessageCount, streamMessageCount, _currentOffset);
}
} else if (messagesAndOffsets.getUnfilteredMessageCount() == 0) {
if (_segmentLogger.isDebugEnabled()) {
_segmentLogger.debug("empty batch received - sleeping for {}ms", idlePipeSleepTimeMillis);
}
// If there were no messages to be fetched from stream, wait for a little bit as to avoid hammering the stream
Uninterruptibles.sleepUninterruptibly(idlePipeSleepTimeMillis, TimeUnit.MILLISECONDS);
}
Expand Down
Expand Up @@ -30,11 +30,15 @@
import org.apache.pinot.spi.stream.PartitionLevelConsumer;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class KafkaPartitionLevelConsumer extends KafkaPartitionLevelConnectionHandler
implements PartitionLevelConsumer {

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPartitionLevelConsumer.class);

public KafkaPartitionLevelConsumer(String clientId, StreamConfig streamConfig, int partition) {
super(clientId, streamConfig, partition);
}
Expand All @@ -48,6 +52,10 @@ public MessageBatch<byte[]> fetchMessages(StreamPartitionMsgOffset startMsgOffse
}

public MessageBatch<byte[]> fetchMessages(long startOffset, long endOffset, int timeoutMillis) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("poll consumer: {}, startOffset: {}, endOffset:{} timeout: {}ms", _topicPartition, startOffset,
endOffset, timeoutMillis);
}
_consumer.seek(_topicPartition, startOffset);
ConsumerRecords<String, Bytes> consumerRecords = _consumer.poll(Duration.ofMillis(timeoutMillis));
List<ConsumerRecord<String, Bytes>> messageAndOffsets = consumerRecords.records(_topicPartition);
Expand All @@ -59,8 +67,12 @@ public MessageBatch<byte[]> fetchMessages(long startOffset, long endOffset, int
if (offset >= startOffset & (endOffset > offset | endOffset == -1)) {
if (message != null) {
filtered.add(new MessageAndOffset(message.get(), offset));
} else if (LOGGER.isDebugEnabled()) {
LOGGER.debug("tombstone message at offset {}", offset);
}
lastOffset = offset;
} else if (LOGGER.isDebugEnabled()) {
LOGGER.debug("filter message at offset {} (outside of offset range {} {})", offset, startOffset, endOffset);
}
}
return new KafkaMessageBatch(messageAndOffsets.size(), lastOffset, filtered);
Expand Down