Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-4000: Collect and record per-topic consumer metrics #1684

Closed
wants to merge 1 commit into from

Conversation

vahidhashemian
Copy link
Contributor

@vahidhashemian vahidhashemian commented Jul 29, 2016

Improve consumer metric collection by collecting and recording metrics per topic.

@vahidhashemian
Copy link
Contributor Author

@hachikuji Is this fix close to what you had in mind?

@hachikuji
Copy link

@vahidhashemian I'm not sure this quite does it. It looks like this is going to keep aggregating across multiple fetches. I was actually wondering if there was a way to reuse FetchResponseMetricAggregator to accumulate the metrics for each topic. For example, maybe we just need to move the map from topic to FetchMetrics that you have here into that class. Then we can increment it accordingly in calls to record(). Would that work?

@vahidhashemian
Copy link
Contributor Author

@hachikuji Thanks for the quick feedback. I believe this line would make it aggregate within the same fetch only. But I'm going to give what you suggested a try and update the PR. Thanks again.

@hachikuji
Copy link

Hmm... that still doesn't seem quite right. That resets the counts on every call to fetchedRecords, but the same fetch response could be split across several calls, right?

@vahidhashemian
Copy link
Contributor Author

OK. I think I'm still missing something. Based on what you mentioned, it seems we would want to collect metrics at a higher level (at fetch response levels instead of fetchedRecords).

In your first comment I believe you were referring to this record() call (as a member of FetchResponseMetricAggregator), and not the call to recordTopicFetchMetrics() which I thought was the culprit. That record() method doesn't seem to aggregate metrics per partition. It just aggregates across all partitions in the fetch response. Do we want to change that and collect per topic, or just add per-topic metric aggregation too?

Thanks a lot in advance for clearing this up.

@hachikuji
Copy link

@vahidhashemian Just to clarify, a single fetch response contains fetch data from multiple partitions, some of which may be from the same topic. My interpretation of the topic-level fetch metrics is that they should be recording the per-topic number of records/bytes fetched from the entire fetch response rather than from each partition separately (does this seem correct?). That means we need to aggregate these stats by topic for each fetch response. The tricky thing is that we now parse the fetch data from each partition separately, so we need somewhere to store the stats so that they can be aggregated incrementally. That is the purpose of FetchResponseMetricAggregator. It contains a set of all the partitions that were contained in the fetch response and as we parse the data from each partition, we remove that partition from the set. Once all partitions have been parsed, we can record the fetch metrics. So my thought was that maybe we only needed to extend that idea. In addition to keeping the total number of bytes/records from the fetch response in FetchResponseMetricAggregator, we can also track the per-topic tallies. Would that make sense or not?

@vahidhashemian
Copy link
Contributor Author

@hachikuji Thanks for explaining in details how you envision these metrics should be collected at topic level. It makes sense. Other than tracking, I assume we also need to "record" these per-topic metrics; perhaps through per-topic sensors. This can be done at the same time partition metrics are recorded (here). Would you agree?

@hachikuji
Copy link

@vahidhashemian Yep, I think that will work.

@vahidhashemian vahidhashemian force-pushed the KAFKA-4000 branch 3 times, most recently from abb65f6 to fe343bb Compare August 1, 2016 23:18
@vahidhashemian
Copy link
Contributor Author

Thanks @hachikuji. I hope this better aligns with what you explained above.

@vahidhashemian vahidhashemian changed the title KAFKA-4000: Aggregate partitions of each topic for consumer metrics KAFKA-4000: Collect and record per-topic consumer metrics Aug 1, 2016
@@ -704,14 +712,22 @@ public CompletedFetch(TopicPartition partition,
*/
private static class FetchResponseMetricAggregator {
private final FetchManagerMetrics sensors;
private final Map<String, FetchManagerMetrics> topicSensors;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary? There should only ever be one instance of FetchManagerMetrics, and it already exposes a method recordTopicFetchMetrics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aah, you're right. I overlooked that existing method. Thanks.

@vahidhashemian
Copy link
Contributor Author

Thanks again @hachikuji for your feedback. The PR is updated.

@vahidhashemian
Copy link
Contributor Author

One question. I think we may not need this line anymore now that we are properly recording fetch metrics for each topic. What do you think?

@hachikuji
Copy link

@vahidhashemian Yeah, good call. We should definitely remove that line.

if (unrecordedPartitions.isEmpty()) {
// collect and aggregate per-topic metrics
String topic = partition.topic();
if (!this.topicFetchMetrics.containsKey(topic))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a micro-optimization, so feel free to ignore. An alternative idiom is to call map.get() and check the result against null. This works because we know the value will never be null and it saves a hash lookup.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, it makes sense. I'll update the PR.

String topic = partition.topic();
if (this.topicFetchMetrics.get(topic) == null)
this.topicFetchMetrics.put(topic, new FetchMetrics());
FetchMetrics topicFetchMetric = this.topicFetchMetrics.get(topic);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha, to clarify, I was trying to suggest moving this line above null check. Something like this:

FetchMetrics topicFetchMetric = this.topicFetchMetrics.get(topic);
if (topicFetchMetric == null) {
  topicFetchMetric = new FetchMetrics();
  this.topicFetchMetrics.put(topic, topicFetchMetric);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aah, right, sorry I missed the redundant call. Will fix it now. Thanks.

@@ -706,8 +705,18 @@ public CompletedFetch(TopicPartition partition,
private final FetchManagerMetrics sensors;
private final Set<TopicPartition> unrecordedPartitions;

private int totalBytes;
private int totalRecords;
private static class FetchMetrics {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: could we move this below the record() function? It would be nice to keep the class fields together.

@hachikuji
Copy link

Left a minor comment, but overall LGTM. Maybe @ijuma can take a look when he has time.

Improve consumer metric collection by collecting and recording metrics per topic.
@vahidhashemian
Copy link
Contributor Author

@hachikuji I moved the class as you suggested. Thanks for all the feedback on this PR.

@vahidhashemian
Copy link
Contributor Author

@hachikuji A while back you gave your blessings on this PR after some reviews, but it was before you had the "commit" superpower. It would be great if we can merge it if you see no issues.

This one is also related, but not reviewed yet.

@hachikuji
Copy link

@vahidhashemian Thanks for the remainder (even if I got to it a week late). Took me a while to remember what this issue was about, but the patch still LGTM.

@asfgit asfgit closed this in 7f8edbc Dec 9, 2016
@hachikuji
Copy link

BTW, I made a few minor visibility tweaks before merging.

@asfbot
Copy link

asfbot commented Dec 9, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/54/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Dec 10, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/55/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Dec 10, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/53/
Test PASSed (JDK 7 and Scala 2.10).

soenkeliebau pushed a commit to soenkeliebau/kafka that referenced this pull request Feb 7, 2017
Improve consumer metric collection by collecting and recording metrics per topic.

Author: Vahid Hashemian <vahidhashemian@us.ibm.com>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes apache#1684 from vahidhashemian/KAFKA-4000
efeg pushed a commit to efeg/kafka that referenced this pull request May 29, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants