-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-6896: Add producer metrics exporting in KafkaStreams.java #4998
Conversation
@guozhangwang @mjsax Let me know your thoughts on this one. Thank you! |
I think the change makes sense. However, we should have a corresponding JIRA. Furthermore, the PR would not be sufficient if EOS is enabled, because for EOS there is no thread producer but a producer per task. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add a unit test around KafkaStreams.metrics()
to check that all the expected metrics are included?
final Map<MetricName, ? extends Metric> producerMetrics = producer.metrics(); | ||
result.putAll(producerMetrics); | ||
} else { | ||
log.info("producer is null in thread {}", toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Capitalize p
.
@@ -387,6 +387,7 @@ public void setGlobalStateRestoreListener(final StateRestoreListener globalState | |||
public Map<MetricName, ? extends Metric> metrics() { | |||
final Map<MetricName, Metric> result = new LinkedHashMap<>(); | |||
for (final StreamThread thread : threads) { | |||
result.putAll(thread.producerMetrics()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update the above TODO with only admin client left.
@@ -690,6 +693,7 @@ public StreamThread(final Time time, | |||
this.log = logContext.logger(StreamThread.class); | |||
this.rebalanceListener = new RebalanceListener(time, taskManager, this, this.log); | |||
this.taskManager = taskManager; | |||
this.producer = producer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... note that when EOS is turned on, each task will has its own producer client and the producer object passed in here will be null.
So I'd suggest update the following code in line 1224: when producer == null
, try to iterate the owned tasks from the taskManager.activeTasks().values()
and get its producer (it is private, so we may need to add a package private getter).
In this way for both EOS and non-EOS we will get the producer metrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense to me!
@mjsax thank you for the suggestion! Will prepare a JIRA. |
@guozhangwang regarding the unit test on |
@abbccdda I see... I think we can overwrite If you think this change is too large, we can also do that in another JIRA. |
@guozhangwang thanks for the feedback. I added a helper function in the |
Build failed with checkstyle error. |
@mjsax thanks for the heads up! |
@abbccdda I think for EOS the test coverage it a bit tricky as it needs the mock task manager to also return the assigned tasks. Let wait to put in into another PR (please add a TODO on |
@guozhangwang thanks for the tip. I have added the |
6be6243
to
8c857b9
Compare
@guozhangwang could you take a final look since the test passes? Thank you! |
Merged to trunk, thanks @abbccdda ! |
@guozhangwang thank you!!! |
KAFKA-6986:Export Admin Client metrics through Stream Threads We already exported producer and consumer metrics through KafkaStreams class: #4998 It makes sense to also export the Admin client metrics. I didn't add a separate unittest case for this. Let me know if it's needed. This is my first contribution, feel free to point out any mistakes that I did. Reviewers: Guozhang Wang <wangguoz@gmail.com>
We would like to also export the producer metrics from StreamThread just like consumer metrics, so that we could gain more visibility of stream application. The approach is to pass in the threadProducer into the StreamThread so that we could export its metrics in dynamic. Note that this is a pure internal change that doesn't require a KIP, and in the future we also want to export admin client metrics. A followup KIP for admin client will be created once this is merged. Reviewers: Guozhang Wang <wangguoz@gmail.com>
…e#5210) KAFKA-6986:Export Admin Client metrics through Stream Threads We already exported producer and consumer metrics through KafkaStreams class: apache#4998 It makes sense to also export the Admin client metrics. I didn't add a separate unittest case for this. Let me know if it's needed. This is my first contribution, feel free to point out any mistakes that I did. Reviewers: Guozhang Wang <wangguoz@gmail.com>
We would like to also export the producer metrics from
StreamThread
just like consumer metrics, so that we could gain more visibility of stream application. The approach is to pass in thethreadProducer
into the StreamThread so that we could export its metrics in dynamic.Note that this is a pure internal change that doesn't require a KIP, and in the future we also want to export admin client metrics. A followup KIP for admin client will be created once this is merged.