New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GOBBLIN-876: Expose metrics() API in GobblinKafkaConsumerClient to al… #2730
Conversation
…low consume metrics to be reported.
Codecov Report
@@ Coverage Diff @@
## master #2730 +/- ##
============================================
+ Coverage 44.99% 45.15% +0.15%
- Complexity 8742 8801 +59
============================================
Files 1884 1890 +6
Lines 70295 70620 +325
Branches 7715 7747 +32
============================================
+ Hits 31629 31886 +257
- Misses 35735 35780 +45
- Partials 2931 2954 +23
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM. Minor comments
* @param kafkaMetric | ||
* @return | ||
*/ | ||
private Metric kafkaToCodaHaleMetric(final KafkaMetric kafkaMetric) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this likely to be re-used in other implementation under the same package? I am suggesting to make it package private.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Unfortunately, the KafkaMetric class is specific to a given Kafka version, making code duplication necessary. So it seems that each consumer client version will have its own implementation of the method.
* @param metricName the name of the Kafka metric e.g. "records-lag-max", "fetch-throttle-time-max" etc. | ||
* @return the canonicalized metric name. | ||
*/ | ||
public String canonicalMetricName(String metricGroup, Collection<String> metricTags, String metricName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we make it protected
or package private
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will make the change.
@sv2000 : why is codecov so red? Something wrong with the codecov plugin? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some clarifying questions and suggestions...
return gauge; | ||
} | ||
|
||
private String canonicalMerticName(KafkaMetric kafkaMetric) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo on Mertic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Thanks!
* @param metricName the name of the Kafka metric e.g. "records-lag-max", "fetch-throttle-time-max" etc. | ||
* @return the canonicalized metric name. | ||
*/ | ||
String canonicalMetricName(String metricGroup, Collection<String> metricTags, String metricName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
who should call it and when? is it a good idea to call it many times with the same input?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a clarifying comment.
|
||
private String canonicalMerticName(KafkaMetric kafkaMetric) { | ||
MetricName name = kafkaMetric.metricName(); | ||
return canonicalMetricName(name.group(), name.tags().values(), name.name()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this method will be called repeatedly with the same input values (the Kafka Metric Names) over the lifetime of the process, does it make sense to cache the mapping of KafkaMetricName -> canonicalMetricName and re-use it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah - thought about it. Even if we cache the metric name mappings, we need to compute the key for the map every time. It seems like the key needs to be derived off of metric group, metric name and tags to make the metric unique, which is essentially the canonical representation of the kafka metric.
Not sure how code coverage is computed. It shows my changes impacting classes that seem totally unrelated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
* org.apache.kafka.common.Metric to Coda Hale Metrics. | ||
* @return | ||
*/ | ||
public default Map<String, Metric> metrics() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since metrics is not a verb... maybe this method should be called getMetrics()?
@Override | ||
public Map<String, Metric> metrics() { | ||
Map<MetricName, KafkaMetric> kafkaMetrics = (Map<MetricName, KafkaMetric>) this.consumer.metrics(); | ||
Map<String, Metric> codaHaleMetricMap = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need a new instance every time?
This diff doesn't show when and how this method is being called, so its hard to say if this new instance and copy on each call to metrics is a good idea or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the PR can be improved a bit by
-
Changing the name of the metrics method to getMetrics
-
Not creating extra instances of CodaHale metrics
2 is less of a show-stopper than 1
Thanks @shirshanka ! Changed metrics method to getMetrics per your suggestion. For 2, the possibility of TopicPartition reassignments to the underlying Kafka consumer makes it difficult to retain the same map, unless we compute a diff to obtain new metrics on each call to KafkaConsumer#metrics(). For simplicity and due to the fact that this method will be called inside a scheduled thread (typically, once every 30 s or more), the relative overhead of instantiating a new map should be insignificant. As a result, I am keeping the implementation as is. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
…erClient to al… Closes apache#2730 from sv2000/kafkaConsumerMetrics
…low consume metrics to be reported.
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
Newer Kafka consumer expose metrics() API that report a number of consumer metrics such as lag, latency, etc. which are very useful for monitoring and debugging. We expose a metrics() API in GobblinKafkaConsumerClient to allow consumer metrics to be reported.
Tests
NA.
Commits