feat: Implement SingleSubscriptionConsumerImpl#281
Conversation
Codecov Report
@@ Coverage Diff @@
## master #281 +/- ##
=========================================
Coverage ? 61.68%
Complexity ? 984
=========================================
Files ? 189
Lines ? 6063
Branches ? 525
=========================================
Hits ? 3740
Misses ? 2017
Partials ? 306
Continue to review full report at Codecov.
|
| public ConsumerRecords<byte[], byte[]> poll(Duration duration) { | ||
| Map<Partition, Queue<SequencedMessage>> partitionQueues = new HashMap<>(); | ||
| try { | ||
| if (wakeupTriggered) throw new WakeupException(); |
There was a problem hiding this comment.
Can you read this without the mutex?
There was a problem hiding this comment.
No. Fixed.
| try { | ||
| if (wakeupTriggered) throw new WakeupException(); | ||
| while (!duration.isZero()) { | ||
| Duration sleepFor = Collections.min(ImmutableList.of(duration, Duration.ofMillis(100))); |
There was a problem hiding this comment.
I'm worried that this will result in very high e2e latency for consumers that set high poll durations. IIUC, Kafka returns from Poll as soon as any data is available, and does not wait until "max.poll.records" records are available. Ideally, poll() would return as soon as any one partition has any data unless "fetch.min.bytes" is set to >1.
There was a problem hiding this comment.
Yes, I was misreading the behavior. I've restructured this to return as soon as any messages are available, PTAL.
There was a problem hiding this comment.
This works well for cases where messages are already in the buffer when the client calls poll(), but it will still have high latency when messages are delivered during the 100ms sleep. I can think of some cases where this will matter: imagine messages being published uniformly every 10ms. Also, we need to remember that clients are used to single-digit milliseconds latency for Kafka. Would it be possible to have some kind of notification when any of the SinglePartitionSubscribers has messages ready to be delivered?
| })); | ||
| } | ||
| } | ||
| try (CloseableMonitor.Hold h = monitor.enter()) { |
There was a problem hiding this comment.
Do we need this last read?
There was a problem hiding this comment.
Yes. Added an explanatory comment as to why.
…t data, not wait the full duration.
manuelmenzella-google
left a comment
There was a problem hiding this comment.
I have a single comment that I would like to discuss regarding latency for Kafka subscribers. Everything else looks great, thanks Dan.
| try { | ||
| if (wakeupTriggered) throw new WakeupException(); | ||
| while (!duration.isZero()) { | ||
| Duration sleepFor = Collections.min(ImmutableList.of(duration, Duration.ofMillis(100))); |
There was a problem hiding this comment.
This works well for cases where messages are already in the buffer when the client calls poll(), but it will still have high latency when messages are delivered during the 100ms sleep. I can think of some cases where this will matter: imagine messages being published uniformly every 10ms. Also, we need to remember that clients are used to single-digit milliseconds latency for Kafka. Would it be possible to have some kind of notification when any of the SinglePartitionSubscribers has messages ready to be delivered?
Ack. As discussed offline, this requires a bit of a gross "register watch" type api to implement. I will add a FR for this behavior and lower the polling duration to 10 ms in a subsequent PR. |
This is an API that can be used to implement the kafka Consumer API.