-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Measure kafka queue in consumer with better exactitude #3423
Conversation
consumer.poll(duration.toMillis).map(r => (r.topic, r.partition, r.offset, r.value)) | ||
val response = consumer.poll(duration.toMillis).map(r => (r.topic, r.partition, r.offset, r.value)) | ||
response.lastOption.foreach { | ||
case (_, _, newOffset, _) => offset = newOffset + 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you fold this into the previous map instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean updating the offset each time new message arrives?
0fd304c
to
408f031
Compare
408f031
to
84eee72
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thank you 👍 🎉
Measure kafka queue metric directly in the consumer by comparing the offsets instead of relying on the built-in consumer metrics. This method provides more precision compared to the old approach, it was observed that the built-in metrics show certain lagging in case of burst. Additionally make kamon flush cadency configurable in the application.conf.
Measure kafka queue metric directly in the consumer by comparing the offsets instead of relying on the built-in consumer metrics. This method provides more precision compared to the old approach, it was observed that the built-in metrics show certain lagging in case of burst.
Additionally make kamon flush cadency configurable in the application.conf.
My changes affect the following components
Types of changes