-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Move offset validation logic to consumer classes #13015
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no guarantee we can always detect gap with start offset and batch first offset, especially after we introduce the exclusive start offset concept.
Seems like we still want to report gap within the general handling flow. In that case it should be easier to add an API hasGap()
into MessageBatch
, which can be filled by the stream consumer
That makes sense since we already have the information in the consumer from which offset did we actually wanted to consume the data vs from which offset we actually did OR any other criteria which consumer wants to implement |
f51ee5b
to
ffe8014
Compare
ffe8014
to
7918286
Compare
553cdb5
to
236dee9
Compare
pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
Outdated
Show resolved
Hide resolved
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
Outdated
Show resolved
Hide resolved
.../pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisMessageBatch.java
Outdated
Show resolved
Hide resolved
.../pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisMessageBatch.java
Outdated
Show resolved
Hide resolved
.../pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
Outdated
Show resolved
Hide resolved
...re/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
Outdated
Show resolved
Hide resolved
...re/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
Outdated
Show resolved
Hide resolved
...ka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM otherwise
*/ | ||
private void validateStartOffset(StreamPartitionMsgOffset startOffset, StreamPartitionMsgOffset batchFirstOffset) { | ||
if (batchFirstOffset.compareTo(startOffset) > 0) { | ||
private void reportDataLoss(MessageBatch messageBatch) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please update the java doc for this method
_serverMetrics.addMeteredTableValue(_tableStreamName, ServerMeter.STREAM_DATA_LOSS, 1L); | ||
String message = | ||
"startOffset(" + startOffset + ") is older than topic's beginning offset(" + batchFirstOffset + ")"; | ||
"Message loss detected in stream partition: " + _partitionGroupId + " for table: " + _tableNameWithType |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use String.format()
for readability
@@ -66,7 +66,7 @@ public synchronized KinesisMessageBatch fetchMessages(StreamPartitionMsgOffset s | |||
KinesisPartitionGroupOffset startOffset = (KinesisPartitionGroupOffset) startMsgOffset; | |||
String shardId = startOffset.getShardId(); | |||
String startSequenceNumber = startOffset.getSequenceNumber(); | |||
|
|||
boolean isMissingMessages = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert this
@@ -96,6 +96,10 @@ default boolean isEndOfPartitionGroup() { | |||
return false; | |||
} | |||
|
|||
default boolean hasDataLoss() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add some javadoc for this method
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #13015 +/- ##
============================================
+ Coverage 61.75% 62.20% +0.45%
+ Complexity 207 198 -9
============================================
Files 2436 2531 +95
Lines 133233 138570 +5337
Branches 20636 21448 +812
============================================
+ Hits 82274 86197 +3923
- Misses 44911 45926 +1015
- Partials 6048 6447 +399
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
* Enhance Kinesis consumer * Simplify the handling * Address comments * Move offset validation logic to consumer classes * Add missing message interface to message batch * fix linting * remove unused interface * Cleanup and refactoring * lint fixes --------- Co-authored-by: Xiaotian (Jackie) Jiang <jackie.jxt@gmail.com> Co-authored-by: Kartik Khare <kharekartik@kartiks-macbook-pro.tail8a064.ts.net> Co-authored-by: Kartik Khare <kharekartik@Kartiks-MacBook-Pro.local>
Depends on - #12806