Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-3701: Expose KafkaStreams Metrics in public API #1362

Closed
wants to merge 1 commit into from

Conversation

jklukas
Copy link
Contributor

@jklukas jklukas commented May 10, 2016

The Kafka clients expose their metrics registries through a metrics method presenting an unmodifiable collection (see KafkaProducer's metrics() method), but KafkaStreams does not expose its registry.

This PR exposes an unmodifiable collection of metrics on KafkaStreams in direct analogy to KafkaProducer.

This contribution is my original work and I license the work to the project under the project's open source license.

cc @guozhangwang @miguno @mjsax

@ijuma
Copy link
Contributor

ijuma commented May 10, 2016

@guozhangwang Looking at this PR, it also shows that we are not closing the metrics and we should.

@mjsax
Copy link
Member

mjsax commented May 10, 2016

Good catch @ijuma
I guess this should be a separate issue though? Or do you want to fix it in this PR? If yes, StreamThread.close() should be extended to call metrics.close() (the class need a private member to reference the Metrics object, too)

As there is no JIRA for this PR, I am wondering if KafkaStreams should expose metrics? What is the purpose? @jklukas can you please clarify.

@ijuma
Copy link
Contributor

ijuma commented May 10, 2016

@mjsax Yes, it should be done separately. I was just raising the issue so that one of the Streams contributors/committers would pick it up. :)

I agree that it would be good to have a JIRA for this PR as we are exposing a new method in a public class.

@guozhangwang
Copy link
Contributor

@ijuma Good point. I think we need to fix this.

@jklukas We currently expose the metrics through ProcessorContext to let users specify latency metrics for their customized metrics; thinking it a bit more, I agree that it is better to expose this metrics as well for defining the sensors, and the same metrics object can then be used inside a Processor for recording the sensor. Does this sounds good to you?

Also since this is not a minor change, could you file a JIRA as well and explain the motivation / etc? Some related issues that I have been thinking, which we can see whether or not we should include in the JIRA or in a separate one:

  1. The StreamsMetrics interface limit user's ability to define other metrics rather than latency, etc. We can consider let context.metrics() to return the org.apache.kafka.common.metrics.Metrics directly in order to allow users define any sensors following the same way as we do in StreamsMetricsImpl.
  2. We can consider adding metrics for process / punctuate / commit rate at the granularity of each processor node in addition to the global rate mentioned above. This is very helpful in debugging.
  3. We can consider adding rate / total cumulated metrics for context.forward indicating how many records were forwarded downstream from this processor node as well. This is helpful in debugging.
  4. We can consider adding metrics for each stream partition's timestamp. This is helpful in debugging.
  5. Besides the latency metrics, we can also add throughput latency in terms of source records consumed.

@jklukas jklukas changed the title MINOR: Expose KafkaStreams Metrics in public API KAFKA-3701 [WIP]: Expose KafkaStreams Metrics in public API May 11, 2016
@jklukas
Copy link
Contributor Author

jklukas commented May 11, 2016

Thanks all for the discussion and suggestions here. I created https://issues.apache.org/jira/browse/KAFKA-3701 and updated the title of this PR for that issue and added [WIP]. The main effect I'm hoping for is being able to define health checks for my application based on streams metrics.

We currently expose the metrics through ProcessorContext to let users specify latency metrics for their customized metrics; thinking it a bit more, I agree that it is better to expose this metrics as well for defining the sensors, and the same metrics object can then be used inside a Processor for recording the sensor. Does this sounds good to you?

Having the same metrics registry available from the KafkaStreams instance and from the ProcessorContext sounds good.


Sounds like scope of what needs to change is bigger than I thought. Should I close this PR to make way for a maintainer to make a broader change?

There's discussion here in this PR thread that's not captured in the JIRA. Should I be porting that content over to JIRA?

@guozhangwang
Copy link
Contributor

@jklukas About the scope of this JIRA: it is up to you :) It'll will definitely great if you would like to take on this general "sub-project" for improving the metrics, but I also do not want to drag you into this if your time does not permits or you are not interested. So either keep the scope of this JIRA as is, or incorporating this general project into the JIRA and copy our discussions in this PR to that JIRA is fine.

@jklukas
Copy link
Contributor Author

jklukas commented May 12, 2016

@guozhangwang - I have some interest in improving the metrics situation for Kafka Streams, so will pursue some of the ideas you laid out. There are several potentially independent ideas here, though, so I will plan to create a few follow-up JIRA issues to discuss where this should go.

To clear, I'd like to break off discussion of the following to separate channels:

  • The Metrics instance is currently not being closed. I've opened MINOR: Fix bugs in KafkaStreams.close() #1379 to address that.
  • We are currently missing some opportunities for per-node metrics and other metric additions. I will plan to open a JIRA issue to discuss this.
  • Users can currently only add latency sensors due to the StreamsMetrics interface. Should they have access to the main metrics registry? I'll open a separate JIRA issue for this.

So, this PR is still focused on the idea of exposing a read-only map of internal Kafka Streams metrics. As mentioned in the JIRA, this could be useful for developers defining health checks for their Kafka Streams application.

I'd expect a KafkaStreams.metrics() method to return an unmodifiable map (as implemented in this PR) because we already have that convention for producers and consumers.

@guozhangwang - Do you think we might want to reserve KafkaStreams.metrics to return something else (a StreamsMetrics instance or the Metrics instance directly)? If so, then I'll close this PR for now in favor or further discussion of the design in https://issues.apache.org/jira/browse/KAFKA-3701

@guozhangwang
Copy link
Contributor

@jklukas The dividing looks great to me! Ping me after those JIRAs are created and I can help putting more content in the task description.

@jklukas
Copy link
Contributor Author

jklukas commented May 12, 2016

Looks like JIRA is locked down for issue creation right now, so I'll try to pick up creating the breakout issues tomorrow.

@guozhangwang - Do you think the current PR is dependent on discussion in those other issues? Or are we good to continue discussing the possibility of adding a KafkaStreams.metrics() method as implemented here?

asfgit pushed a commit that referenced this pull request May 13, 2016
Initially proposed by ijuma in #1362 (comment)

mjsax commented:

> StreamThread.close() should be extended to call metrics.close() (the class need a private member to reference the Metrics object, too)

The `Metrics` instance is created in the `KafkaStreams` constructor and shared between all threads, so closing it within the threads doesn't seem like the right approach. This PR calls `Metrics.close()` in `KafkaStreams.close()` instead.

cc guozhangwang

Author: Jeff Klukas <jeff@klukas.net>

Reviewers: Ismael Juma, Guozhang Wang

Closes #1379 from jklukas/close-streams-metrics

(cherry picked from commit f34164e)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>
asfgit pushed a commit that referenced this pull request May 13, 2016
Initially proposed by ijuma in #1362 (comment)

mjsax commented:

> StreamThread.close() should be extended to call metrics.close() (the class need a private member to reference the Metrics object, too)

The `Metrics` instance is created in the `KafkaStreams` constructor and shared between all threads, so closing it within the threads doesn't seem like the right approach. This PR calls `Metrics.close()` in `KafkaStreams.close()` instead.

cc guozhangwang

Author: Jeff Klukas <jeff@klukas.net>

Reviewers: Ismael Juma, Guozhang Wang

Closes #1379 from jklukas/close-streams-metrics
@jklukas jklukas changed the title KAFKA-3701 [WIP]: Expose KafkaStreams Metrics in public API KAFKA-3701: Expose KafkaStreams Metrics in public API May 13, 2016
@jklukas
Copy link
Contributor Author

jklukas commented May 13, 2016

Rebased on trunk to pull in #1379. Removed WIP from the PR title.

@jklukas
Copy link
Contributor Author

jklukas commented May 13, 2016

JIRA is still down to non-committers for new issue creation, so I'm going to write down some text for those issues here while I'm thinking about it.

Should users be given access to the main Metrics registry so they can add arbitrary new metrics?

Kafka Streams is largely a higher-level abstraction on top of producers and consumers, and it seems sensible to match the KafkaStreams interface to that of KafkaProducer and KafkaConsumer where possible. For producers and consumers, the metric registry is internal and metrics are only exposed as an unmodifiable map. This allows users to access client metric values for use in application health checks, etc., but doesn't allow them to register new metrics.

That approach seems reasonable if we assume that a user interested in defining custom metrics is already going to be using a separate metrics library. In such a case, users will likely find it easier to define metrics using whatever library they're familiar with rather than learning the API for Kafka's Metrics class. Is this a reasonable assumption?

If we want to expose the Metrics instance so that users can define arbitrary metrics, I'd argue that there's need for documentation updates. In particular, I find the notion of metric tags confusing. Tags can be defined in a MetricConfig when the Metrics instance is constructed, StreamsMetricsImpl is maintaining its own set of tags, and users can set tag overrides.

If a user were to get access to the Metrics instance, they would be missing the tags defined in StreamsMetricsImpl. I'm imagining that users would want their custom metrics to sit alongside the predefined metrics with the same tags, and users shouldn't be expected to manage those additional tags themselves.

So, why are we allowing users to define their own metrics via the StreamsMetrics interface in the first place? Is it that we'd like to be able to provide a built-in latency metric, but the definition depends on the details of the use case so there's no generic solution? That would be sufficient motivation for this special case of addLatencySensor. If we want to continue down that path and give users access to define a wider range of custom metrics, I'd prefer to extend the StreamsMetrics interface so that users can call methods on that object, automatically getting the tags appropriate for that instance rather than interacting with the raw Metrics instance.

@guozhangwang
Copy link
Contributor

@jklukas JIRA was down due to some spam attack I think. You can try again.

As for the discussion above, here are my two cents (would love to hear from other people as well @mjsax @enothereska @miguno @ijuma @junrao ):

  1. For the producer/consumer cases, all internal metrics are provided and abstracted from users, and they just need to read the documentation to poll whatever provided metrics that they are interested; and if they want to define more metrics, they are likely to be outside the clients themselves and they can use whatever methods they like, so Metrics do not need to be exposed to users.
  2. For streams, things are a bit different: users define the computational logic, which becomes part of the "Streams Client" processing and may be of interests to be monitored by user themselves; think of a customized processor that sends an email to some address based on a condition, and users want to monitor the average rate of emails sent. Hence it is worth considering whether or not they should be able to access the Metrics instance to define their own along side the pre-defined metrics provided by the library.
  3. Now, since the Metrics class was not previously designed for public usage, it is not designed to be very user-friendly for defining sensors, especially the semantics differences between name / scope / tags. StreamsMetrics tries to hide some of these semantics confusion from users, but it still expose tags and hence is not perfect in doing so. We need to think of a better approach so that: 1) user defined metrics will be "aligned" (i.e. with the same name prefix within a single application, with similar scope hierarchy definition, etc) with library provided metrics, 2) natural APIs to do so.

I do not have concrete ideas about 3) above on top of my head, comments are more than welcomed.

@jklukas
Copy link
Contributor Author

jklukas commented May 16, 2016

I've copied much of the discussion here into some breakout JIRA issues:

I'm closing this PR for now since it might no longer make sense if KafkaStreams ends up exposing its Metrics instance for mutation by users.

@jklukas jklukas closed this May 16, 2016
aartigupta pushed a commit to aartigupta/kafka that referenced this pull request May 30, 2016
…register non latency metrics in StreamsMetrics

From apache#1362 (comment)
We can consider adding metrics for process / punctuate / commit rate at the granularity of each processor node in addition to the global rate mentioned above. This is very helpful in debugging.

We can consider adding rate / total cumulated metrics for context.forward indicating how many records were forwarded downstream from this processor node as well. This is helpful in debugging.

We can consider adding metrics for each stream partition's timestamp.
This is helpful in debugging.

Besides the latency metrics, we can also add throughput latency in terms of source records consumed.

More discussions here https://issues.apache.org/jira/browse/KAFKA-3715
aartigupta pushed a commit to aartigupta/kafka that referenced this pull request May 30, 2016
…register non latency metrics in StreamsMetrics

From apache#1362 (comment)
We can consider adding metrics for process / punctuate / commit rate at the granularity of each processor node in addition to the global rate mentioned above. This is very helpful in debugging.

We can consider adding rate / total cumulated metrics for context.forward indicating how many records were forwarded downstream from this processor node as well. This is helpful in debugging.

We can consider adding metrics for each stream partition's timestamp.
This is helpful in debugging.

Besides the latency metrics, we can also add throughput latency in terms of source records consumed.

More discussions here https://issues.apache.org/jira/browse/KAFKA-3715
aartigupta pushed a commit to aartigupta/kafka that referenced this pull request May 30, 2016
…register non latency metrics in StreamsMetrics

From apache#1362 (comment)
We can consider adding metrics for process / punctuate / commit rate at the granularity of each processor node in addition to the global rate mentioned above. This is very helpful in debugging.

We can consider adding rate / total cumulated metrics for context.forward indicating how many records were forwarded downstream from this processor node as well. This is helpful in debugging.

We can consider adding metrics for each stream partition's timestamp.
This is helpful in debugging.

Besides the latency metrics, we can also add throughput latency in terms of source records consumed.

More discussions here https://issues.apache.org/jira/browse/KAFKA-3715
aartigupta pushed a commit to aartigupta/kafka that referenced this pull request May 31, 2016
…register non latency metrics in StreamsMetrics

From apache#1362 (comment)
********************************************************************
We can consider adding metrics for process / punctuate / commit rate at the granularity of each processor node in addition to the global rate mentioned above. This is very helpful in debugging.

We can consider adding rate / total accumulated metrics for context.forward indicating how many records were forwarded downstream from this processor node as well. This is helpful in debugging.

We can consider adding metrics for each stream partition's timestamp.
This is helpful in debugging.

Besides the latency metrics, we can also add throughput latency in terms of source records consumed.
********************************************************
More discussions here https://issues.apache.org/jira/browse/KAFKA-3715
aartigupta pushed a commit to aartigupta/kafka that referenced this pull request Jun 2, 2016
…register non latency metrics in StreamsMetrics

From apache#1362 (comment)
********************************************************************
We can consider adding metrics for process / punctuate / commit rate at the granularity of each processor node in addition to the global rate mentioned above. This is very helpful in debugging.

We can consider adding rate / total accumulated metrics for context.forward indicating how many records were forwarded downstream from this processor node as well. This is helpful in debugging.

We can consider adding metrics for each stream partition's timestamp.
This is helpful in debugging.

Besides the latency metrics, we can also add throughput latency in terms of source records consumed.
********************************************************
More discussions here https://issues.apache.org/jira/browse/KAFKA-3715
gfodor pushed a commit to AltspaceVR/kafka that referenced this pull request Jun 3, 2016
Initially proposed by ijuma in apache#1362 (comment)

mjsax commented:

> StreamThread.close() should be extended to call metrics.close() (the class need a private member to reference the Metrics object, too)

The `Metrics` instance is created in the `KafkaStreams` constructor and shared between all threads, so closing it within the threads doesn't seem like the right approach. This PR calls `Metrics.close()` in `KafkaStreams.close()` instead.

cc guozhangwang

Author: Jeff Klukas <jeff@klukas.net>

Reviewers: Ismael Juma, Guozhang Wang

Closes apache#1379 from jklukas/close-streams-metrics
@mitch-seymour
Copy link

mitch-seymour commented Nov 17, 2016

This may not be the right place for this question, but I'll try anyways. If the metrics registry is private (which this PR aimed to expose), then what is the proper way to access the metric values collected by a KafkaStreams instance? e.g. if I wanted to poll the underlying consumer/producer/streams metrics registries, how would I do that in 0.10? I apologize if I'm just missing this in the documentation, I can't seem to find it though..

@miguno
Copy link
Contributor

miguno commented Nov 17, 2016

@mitch-seymour:
(Copied from today's comment of mine at http://stackoverflow.com/questions/40641437/kafka-streams-accessing-data-from-the-metrics-registry, funny coincidence)

Have you already read http://docs.confluent.io/current/kafka/monitoring.html and http://kafka.apache.org/documentation#monitoring? Excerpt: "You can monitor individual components of Kafka using Apache Kafka’s internal metrics. Kafka uses Yammer Metrics for metrics reporting in both the server and the client. This can be configured to report stats using pluggable stats reporters to hook up to your monitoring system." See also http://datadoghq.com/blog/collecting-kafka-performance-metrics, for example, which explains how to tap into Kafka's metrics via JMX (and also how to forward those metrics to other systems).

@mitch-seymour
Copy link

mitch-seymour commented Nov 17, 2016

@miguno Yes, I read the monitoring docs, and specifically I'm trying to access the metrics (without JMX) listed here: http://kafka.apache.org/documentation#selector_monitoring. According to these docs, the metrics are exposed at the instance level: "The following metrics are available on producer/consumer/connector instances". KafkaStreams does not expose the metrics registry though, and I don't believe it exposes the underlying Producer and Consumer. I think I may be able to accomplish this by passing in a custom client supplier to the KafkaStreams constructor, and polling the underlying Producer/Consumer metric registries. It would be nice to just have the metrics registry exposed though for the KafkaStreams instance, but hopefully this will get resolved in 0.10.2.0 (https://issues.apache.org/jira/browse/KAFKA-3701).

Anyways, I'm going to try passing in a custom client supplier (may just extend DefaultKafkaClientSupplier and see where that gets me), and may try extracting the metric values from the ProcessorContext (mentioned in the JIRA), but I'll post an answer to the SO post if I find a solution.

@guozhangwang
Copy link
Contributor

@mitch-seymour If you trying to poll the metric values by implementing your own metrics reporter, this link may help you: http://stackoverflow.com/questions/40641437/kafka-streams-accessing-data-from-the-metrics-registry/40661888#40661888

@enothereska
Copy link
Contributor

@guozhangwang This has gotten a bit too complicated. Kafka Streams should return the metrics registry, but also ideally allow users to add their own metrics using a well-formed way. So it seems to me a slight modification is needed: The StreamsMetrics interface should be augmented to return the global registry as read only, so StreamMetrics.globalMetrics() should return a read-only Metrics collection. Users can add new streams metrics through StreamMetrics APIs.

asfgit pushed a commit that referenced this pull request Jan 11, 2017
… logging levels to Metrics

Kafka Streams: add granular metrics per node, also expose ability to …
…register non latency metrics in StreamsMetrics

from #1362 (comment)
We can consider adding metrics for process / punctuate / commit rate at the granularity of each processor node in addition to the global rate mentioned above. This is very helpful in debugging.

We can consider adding rate / total cumulated metrics for context.forward indicating how many records were forwarded downstream from this processor node as well. This is helpful in debugging.

We can consider adding metrics for each stream partition's timestamp.
This is helpful in debugging.
## Besides the latency metrics, we can also add throughput latency in terms of source records consumed.

More discussions here https://issues.apache.org/jira/browse/KAFKA-3715

Author: Eno Thereska <eno@confluent.io>
Author: aartigupta <aartiguptaa@gmail.com>
Author: aartiguptaa <aartigupta@gmail.com>
Author: aartigupta <agupta@agupta-m01.vmware.com>

Reviewers: Greg Fodor, Ismael Juma, Damian Guy, Guozhang Wang

Closes #1446 from aartigupta/trunk
asfgit pushed a commit that referenced this pull request Jan 11, 2017
… logging levels to Metrics

Kafka Streams: add granular metrics per node and per task, also expose ability to register non latency metrics in StreamsMetrics
Also added different recording levels to Metrics.

This is joint contribution from Eno Thereska and Aarti Gupta.

from #1362 (comment)
We can consider adding metrics for process / punctuate / commit rate at the granularity of each processor node in addition to the global rate mentioned above. This is very helpful in debugging.

We can consider adding rate / total cumulated metrics for context.forward indicating how many records were forwarded downstream from this processor node as well. This is helpful in debugging.

We can consider adding metrics for each stream partition timestamp.
This is helpful in debugging.
## Besides the latency metrics, we can also add throughput latency in terms of source records consumed.

More discussions here https://issues.apache.org/jira/browse/KAFKA-3715, KIP-104, KIP-105

Author: Eno Thereska <eno@confluent.io>
Author: Aarti Gupta <aartiguptaa@gmail.com>

Reviewers: Greg Fodor, Ismael Juma, Damian Guy, Guozhang Wang

Closes #1446 from aartigupta/trunk
soenkeliebau pushed a commit to soenkeliebau/kafka that referenced this pull request Feb 7, 2017
… logging levels to Metrics

Kafka Streams: add granular metrics per node and per task, also expose ability to register non latency metrics in StreamsMetrics
Also added different recording levels to Metrics.

This is joint contribution from Eno Thereska and Aarti Gupta.

from apache#1362 (comment)
We can consider adding metrics for process / punctuate / commit rate at the granularity of each processor node in addition to the global rate mentioned above. This is very helpful in debugging.

We can consider adding rate / total cumulated metrics for context.forward indicating how many records were forwarded downstream from this processor node as well. This is helpful in debugging.

We can consider adding metrics for each stream partition timestamp.
This is helpful in debugging.
## Besides the latency metrics, we can also add throughput latency in terms of source records consumed.

More discussions here https://issues.apache.org/jira/browse/KAFKA-3715, KIP-104, KIP-105

Author: Eno Thereska <eno@confluent.io>
Author: Aarti Gupta <aartiguptaa@gmail.com>

Reviewers: Greg Fodor, Ismael Juma, Damian Guy, Guozhang Wang

Closes apache#1446 from aartigupta/trunk
valeraBr pushed a commit to valeraBr/kafka that referenced this pull request May 2, 2024
… logging levels to Metrics

Kafka Streams: add granular metrics per node and per task, also expose ability to register non latency metrics in StreamsMetrics
Also added different recording levels to Metrics.

This is joint contribution from Eno Thereska and Aarti Gupta.

from apache/kafka#1362 (comment)
We can consider adding metrics for process / punctuate / commit rate at the granularity of each processor node in addition to the global rate mentioned above. This is very helpful in debugging.

We can consider adding rate / total cumulated metrics for context.forward indicating how many records were forwarded downstream from this processor node as well. This is helpful in debugging.

We can consider adding metrics for each stream partition timestamp.
This is helpful in debugging.
## Besides the latency metrics, we can also add throughput latency in terms of source records consumed.

More discussions here https://issues.apache.org/jira/browse/KAFKA-3715, KIP-104, KIP-105

Author: Eno Thereska <eno@confluent.io>
Author: Aarti Gupta <aartiguptaa@gmail.com>

Reviewers: Greg Fodor, Ismael Juma, Damian Guy, Guozhang Wang

Closes #1446 from aartigupta/trunk
efeg added a commit to efeg/kafka that referenced this pull request May 29, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants