Skip to content

Commit

Permalink
Move offset validation logic to consumer classes (#13015)
Browse files Browse the repository at this point in the history
* Enhance Kinesis consumer

* Simplify the handling

* Address comments

* Move offset validation logic to consumer classes

* Add missing message interface to message batch

* fix linting

* remove unused interface

* Cleanup and refactoring

* lint fixes

---------

Co-authored-by: Xiaotian (Jackie) Jiang <jackie.jxt@gmail.com>
Co-authored-by: Kartik Khare <kharekartik@kartiks-macbook-pro.tail8a064.ts.net>
Co-authored-by: Kartik Khare <kharekartik@Kartiks-MacBook-Pro.local>
  • Loading branch information
4 people committed May 23, 2024
1 parent d4e2ee1 commit 29c560f
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -468,10 +468,7 @@ protected boolean consumeLoop()
throw t;
}

StreamPartitionMsgOffset batchFirstOffset = messageBatch.getFirstMessageOffset();
if (batchFirstOffset != null) {
validateStartOffset(_currentOffset, batchFirstOffset);
}
reportDataLoss(messageBatch);

boolean endCriteriaReached = processStreamEvents(messageBatch, idlePipeSleepTimeMillis);

Expand Down Expand Up @@ -922,18 +919,16 @@ public Map<String, PartitionLagState> getPartitionToLagState(
}

/**
* Checks if the begin offset of the stream partition has been fast-forwarded.
* batchFirstOffset should be less than or equal to startOffset.
* If batchFirstOffset is greater, then some messages were not received.
* Checks and reports if the consumer is going through data loss.
*
* @param startOffset The offset of the first message desired, inclusive.
* @param batchFirstOffset The offset of the first message in the batch.
* @param messageBatch Message batch to validate
*/
private void validateStartOffset(StreamPartitionMsgOffset startOffset, StreamPartitionMsgOffset batchFirstOffset) {
if (batchFirstOffset.compareTo(startOffset) > 0) {
private void reportDataLoss(MessageBatch messageBatch) {
if (messageBatch.hasDataLoss()) {
_serverMetrics.addMeteredTableValue(_tableStreamName, ServerMeter.STREAM_DATA_LOSS, 1L);
String message =
"startOffset(" + startOffset + ") is older than topic's beginning offset(" + batchFirstOffset + ")";
String message = String.format("Message loss detected in stream partition: %s for table: %s startOffset: %s "
+ "batchFirstOffset: %s", _partitionGroupId, _tableNameWithType, _startOffset,
messageBatch.getFirstMessageOffset());
_segmentLogger.error(message);
_realtimeTableDataManager.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), message, null));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class KafkaMessageBatch implements MessageBatch<byte[]> {
private final long _offsetOfNextBatch;
private final long _firstOffset;
private final StreamMessageMetadata _lastMessageMetadata;
private final boolean _hasDataLoss;

/**
* @param messages the messages, which may be smaller than {@see unfilteredMessageCount}
Expand All @@ -43,12 +44,13 @@ public class KafkaMessageBatch implements MessageBatch<byte[]> {
* delay when a batch has all messages filtered.
*/
public KafkaMessageBatch(List<BytesStreamMessage> messages, int unfilteredMessageCount, long offsetOfNextBatch,
long firstOffset, @Nullable StreamMessageMetadata lastMessageMetadata) {
long firstOffset, @Nullable StreamMessageMetadata lastMessageMetadata, boolean hasDataLoss) {
_messages = messages;
_unfilteredMessageCount = unfilteredMessageCount;
_offsetOfNextBatch = offsetOfNextBatch;
_firstOffset = firstOffset;
_lastMessageMetadata = lastMessageMetadata;
_hasDataLoss = hasDataLoss;
}

@Override
Expand Down Expand Up @@ -82,4 +84,9 @@ public StreamPartitionMsgOffset getFirstMessageOffset() {
public StreamMessageMetadata getLastMessageMetadata() {
return _lastMessageMetadata;
}

@Override
public boolean hasDataLoss() {
return _hasDataLoss;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public synchronized KafkaMessageBatch fetchMessages(StreamPartitionMsgOffset sta
}
_consumer.seek(_topicPartition, startOffset);
}

ConsumerRecords<String, Bytes> consumerRecords = _consumer.poll(Duration.ofMillis(timeoutMs));
List<ConsumerRecord<String, Bytes>> records = consumerRecords.records(_topicPartition);
List<BytesStreamMessage> filteredRecords = new ArrayList<>(records.size());
Expand All @@ -84,7 +85,9 @@ public synchronized KafkaMessageBatch fetchMessages(StreamPartitionMsgOffset sta
lastMessageMetadata = messageMetadata;
}
}
return new KafkaMessageBatch(filteredRecords, records.size(), offsetOfNextBatch, firstOffset, lastMessageMetadata);

return new KafkaMessageBatch(filteredRecords, records.size(), offsetOfNextBatch, firstOffset, lastMessageMetadata,
firstOffset > startOffset);
}

private StreamMessageMetadata extractMessageMetadata(ConsumerRecord<String, Bytes> record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public synchronized KinesisMessageBatch fetchMessages(StreamPartitionMsgOffset s
KinesisPartitionGroupOffset startOffset = (KinesisPartitionGroupOffset) startMsgOffset;
String shardId = startOffset.getShardId();
String startSequenceNumber = startOffset.getSequenceNumber();

// Get the shard iterator
String shardIterator;
if (startSequenceNumber.equals(_nextStartSequenceNumber)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ default boolean isEndOfPartitionGroup() {
return false;
}

/**
* Returns {code true} if the current batch has data loss.
* This is useful to determine if there were gaps in the stream.
*/
default boolean hasDataLoss() {
return false;
}

@Deprecated
default T getMessageAtIndex(int index) {
throw new UnsupportedOperationException();
Expand Down

0 comments on commit 29c560f

Please sign in to comment.