Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-3715: add granular metrics per node #1446

Closed
wants to merge 62 commits into from

Conversation

aartigupta
Copy link

Kafka Streams: add granular metrics per node, also expose ability to …
…register non latency metrics in StreamsMetrics

from #1362 (comment)
We can consider adding metrics for process / punctuate / commit rate at the granularity of each processor node in addition to the global rate mentioned above. This is very helpful in debugging.

We can consider adding rate / total cumulated metrics for context.forward indicating how many records were forwarded downstream from this processor node as well. This is helpful in debugging.

We can consider adding metrics for each stream partition's timestamp.
This is helpful in debugging.

Besides the latency metrics, we can also add throughput latency in terms of source records consumed.

More discussions here https://issues.apache.org/jira/browse/KAFKA-3715

…register non latency metrics in StreamsMetrics

From apache#1362 (comment)
We can consider adding metrics for process / punctuate / commit rate at the granularity of each processor node in addition to the global rate mentioned above. This is very helpful in debugging.

We can consider adding rate / total cumulated metrics for context.forward indicating how many records were forwarded downstream from this processor node as well. This is helpful in debugging.

We can consider adding metrics for each stream partition's timestamp.
This is helpful in debugging.

Besides the latency metrics, we can also add throughput latency in terms of source records consumed.

More discussions here https://issues.apache.org/jira/browse/KAFKA-3715
…register non latency metrics in StreamsMetrics

From apache#1362 (comment)
We can consider adding metrics for process / punctuate / commit rate at the granularity of each processor node in addition to the global rate mentioned above. This is very helpful in debugging.

We can consider adding rate / total cumulated metrics for context.forward indicating how many records were forwarded downstream from this processor node as well. This is helpful in debugging.

We can consider adding metrics for each stream partition's timestamp.
This is helpful in debugging.

Besides the latency metrics, we can also add throughput latency in terms of source records consumed.

More discussions here https://issues.apache.org/jira/browse/KAFKA-3715
…register non latency metrics in StreamsMetrics

From apache#1362 (comment)
We can consider adding metrics for process / punctuate / commit rate at the granularity of each processor node in addition to the global rate mentioned above. This is very helpful in debugging.

We can consider adding rate / total cumulated metrics for context.forward indicating how many records were forwarded downstream from this processor node as well. This is helpful in debugging.

We can consider adding metrics for each stream partition's timestamp.
This is helpful in debugging.

Besides the latency metrics, we can also add throughput latency in terms of source records consumed.

More discussions here https://issues.apache.org/jira/browse/KAFKA-3715
…register non latency metrics in StreamsMetrics

From apache#1362 (comment)
********************************************************************
We can consider adding metrics for process / punctuate / commit rate at the granularity of each processor node in addition to the global rate mentioned above. This is very helpful in debugging.

We can consider adding rate / total accumulated metrics for context.forward indicating how many records were forwarded downstream from this processor node as well. This is helpful in debugging.

We can consider adding metrics for each stream partition's timestamp.
This is helpful in debugging.

Besides the latency metrics, we can also add throughput latency in terms of source records consumed.
********************************************************
More discussions here https://issues.apache.org/jira/browse/KAFKA-3715
@aartigupta
Copy link
Author

@guozhangwang, what do you think? I was able to run the examples and see the metrics per node in a jmx console.

@guozhangwang
Copy link
Contributor

Thanks @aartigupta , @enothereska could you take a look first at this ticket? I have assigned you as the reviewer on the ticket, and please feel free to re-assign to me otherwise.


fetcherThread.shutdown()
}

private def allMetricsNames = Metrics.defaultRegistry().allMetrics().asScala.keySet.map(_.getName)
private def allMetricNames = Metrics.defaultRegistry().allMetrics().asScala.keySet.map(_.getName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These name changes are not strictly part of this fix, I'm wondering if we can open a MINOR pr for these while having this PR focus on streams only (to avoid confusion).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, theses were not intended for this fix, they managed to sneak their way in. My bad, fixed it now

@enothereska
Copy link
Contributor

@aartigupta perhaps the PR name should be "KAFKA-3715: add granular metrics per node"? The JIRA number is usually part of the PR name. Minor thing but just for consistency.

@aartigupta aartigupta changed the title Kafka Streams: add granular metrics per node KAFKA-3715: add granular metrics per node Jun 2, 2016
…register non latency metrics in StreamsMetrics

From apache#1362 (comment)
********************************************************************
We can consider adding metrics for process / punctuate / commit rate at the granularity of each processor node in addition to the global rate mentioned above. This is very helpful in debugging.

We can consider adding rate / total accumulated metrics for context.forward indicating how many records were forwarded downstream from this processor node as well. This is helpful in debugging.

We can consider adding metrics for each stream partition's timestamp.
This is helpful in debugging.

Besides the latency metrics, we can also add throughput latency in terms of source records consumed.
********************************************************
More discussions here https://issues.apache.org/jira/browse/KAFKA-3715


public NodeMetricsImpl(StreamsMetrics metrics, String
name) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The line break here seems unnecessary.

@enothereska
Copy link
Contributor

Thanks @aartigupta . Two higher level questions: does it make sense to add a unit test or two for the new metrics? And do we have any overhead measurements in the sense of how much to the new recordings add to the end to end latency?

@aartigupta
Copy link
Author

aartigupta commented Jun 13, 2016

simplebenchmarknostatestorenochanges

simplebenchmarkpernodemetrics

Ran org.apache.kafka.streams.perf.SimpleBenchmark with the following configuration (i.e. without state store backed streams and simple print statements indicating which part of the benchmark is being run)

   System.out.println("producer");
    benchmark.produce();
    System.out.println("consumer ");
    benchmark.consume();
    System.out.println("simple stream performance source->process");
    // simple stream performance source->process
    benchmark.processStream();
    System.out.println("simple stream performance source->sink");
    // simple stream performance source->sink
    benchmark.processStreamWithSink();
    // simple stream performance source->store

// benchmark.processStreamWithStateStore();

then attached a yourkit profiler and saw the following differences (see attached screenshots)

without any changes to the code and using CPU Sampling in yourkit saw 61% cpu contention
with the per node metrics and using CPU sampling in yourkit, saw 70% cpu contention,

without any changes

org.apache.kafka.streams.perf.SimpleBenchmark
producer
Producer Performance [MB/sec write]: 8.853212193170378
consumer
[YourKit Java Profiler 2016.02-b38] Log file: /Users/aartikumargupta/.yjp/log/SimpleBenchmark-1754.log
Consumer Performance [MB/sec read]: 4.596191726854892
simple stream performance source->process
Streams Performance [MB/sec read]: 14.361679855964493
simple stream performance source->sink
Streams Performance [MB/sec read+write]: 4.535097423059803

with node metrics
Producer Performance [MB/sec write]: 5.035256582778346
consumer
[YourKit Java Profiler 2016.02-b38] Log file: /Users/aartikumargupta/.yjp/log/SimpleBenchmark-1549.log
Consumer Performance [MB/sec read]: 2.751484036579496
simple stream performance source->process
Streams Performance [MB/sec read]: 8.014018691588785
simple stream performance source->sink
Streams Performance [MB/sec read+write]: 6.562667414985077

Ran this multiple times and the results varied between 63%(no changes) and 72%(with per node metrics) The difference seems to be around the point at which yourkit profiler is attached

That said, not sure if this is a valid load simulating scenario
@guozhangwang mentions in #1490 that

if your traffic is very small and consumer is already at the log tail throughout your test, it will cause the polling / processing to be called with less batched data and hence further increased overhead.

@guozhangwang Is the simpleBenchmark a good scenario to be profiling ?
If not any suggestions on another scenario, maybe we can add (check in) such a scenario under examples, which can be used for all similar future profiling exercises

Still working on the unit tests for per node metrics.

@gfodor
Copy link
Contributor

gfodor commented Jun 21, 2016

hey @aartigupta it's kind of hard to tell based on your screenshots where the time is going since I don't see any drilldown into the call stacks of the StreamThread run loops. It's probably necessary for you to flip things on in the YourKit profiler so you can get the full call stacks and determine if Sensor.record is the source of most of the time.

@guozhangwang
Copy link
Contributor

guozhangwang commented Jun 21, 2016

Thanks @aartigupta , some general comments:

  1. For naming consistency as with other metrics objects, for finer grained metrics we tend to name the sensors as "level-name.level-id.metrics-name", for example in SenderMetrics we used topic.[topic-name].records-per-batch etc for per topic-level metrics and in SelectorMetrics we used node-[node-id].bytes-sent etc for per node-level metrics, and in my latest PR KAFKA-3769: Create new sensors per-thread in KafkaStreams #1530 I was doing similar naming. You may already notice that this is for creating different sensors as we synchronize at the per-sensor basis, and since in producer / consumer we always has single-thread, today we do not have any contentions for the lock yet, and in Streams we are trying to add per-thread metrics and consider adding global metrics only after the syncrhonization is removed in KAFKA-3155 since as we have discussed in other PRs with multiple threads contention overhead can be large.
  2. Different metrics reporter has the freedom of constructing their reporting metrics name from the hierarchy of "metrics-prefix, group-name, metrics-name, metrics-tags" where metrics-prefix are "kafka.producer" / "kafka.consumer" / "kafka.streams" depending on which client library you are using. And in this case the sensor names are actually ignored as they are used internally of the metrics object for grouping different metrics only. For example in JmxReporter we create the mbeanName / attributeName as
mbean: "metrics-prefix": type="group-name", "tag1key"="tag1value", ..., "tagNkey"="tagNvalue"
    attribute1: "metrics-name1"
    attribute2: "metrics-name2"
    ...

So we need to make sure that the hierarchy is sufficient for different reporters to differentiate these metrics in their own space.

@guozhangwang
Copy link
Contributor

Btw the SimpleBenchmark numbers are pretty low compared to my laptop (4GB memory, and low-end CPUs). What environment did you run the profiler?

@aartigupta
Copy link
Author

@guozhangwang Mackbook 12 inch 2015 early edition, 1.3GHz dual-core Intel Core M processor (Turbo Boost up to 2.9GHz) with 4MB shared L3 cache.
8GB of 1600MHz LPDDR3 onboard memory
I think that it has to do with attaching yourkit profiler.
Without the profiler I get the following

producer
Producer Performance [MB/sec write]: 22.247686586525987
consumer
Consumer Performance [MB/sec read]: 56.39283169836138
simple stream performance source->process
Streams Performance [MB/sec read]: 40.33237957119899
simple stream performance source->sink
Streams Performance [MB/sec read+write]: 18.71113212350438

Process finished with exit code 0

@theduderog
Copy link
Contributor

Is there a way to register user-defined metrics?

@enothereska
Copy link
Contributor

@aartigupta would you still have time for this PR or should I have a look? Thanks.

@asfbot
Copy link

asfbot commented Jan 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/685/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Jan 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/695/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jan 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/697/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/695/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Jan 11, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/721/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Jan 11, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/721/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jan 11, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/723/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 11, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/729/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 11, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/727/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jan 11, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/727/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Jan 11, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/730/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 11, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/728/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jan 11, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/728/
Test PASSed (JDK 7 and Scala 2.10).

* Record a value with this sensor
* @param value The value to record
* @throws QuotaViolationException if recording this value moves a metric beyond its configured maximum or minimum
* Record a name with this sensor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this intentional? Ditto below.

@asfbot
Copy link

asfbot commented Jan 11, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/745/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 11, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/743/
Test FAILed (JDK 7 and Scala 2.10).

@enothereska
Copy link
Contributor

unrelated: kafka.api.SslProducerSendTest.testCloseWithZeroTimeoutFromSenderThread

@enothereska
Copy link
Contributor

The old org.apache.kafka.streams.integration.ResetIntegrationTest failure is back but shouldn't be related to PR.

@asfgit asfgit closed this in e43cf22 Jan 11, 2017
@guozhangwang
Copy link
Contributor

Merged to trunk. Many thanks to @aartigupta and @enothereska !!

asfgit pushed a commit that referenced this pull request Jan 11, 2017
… logging levels to Metrics

Kafka Streams: add granular metrics per node and per task, also expose ability to register non latency metrics in StreamsMetrics
Also added different recording levels to Metrics.

This is joint contribution from Eno Thereska and Aarti Gupta.

from #1362 (comment)
We can consider adding metrics for process / punctuate / commit rate at the granularity of each processor node in addition to the global rate mentioned above. This is very helpful in debugging.

We can consider adding rate / total cumulated metrics for context.forward indicating how many records were forwarded downstream from this processor node as well. This is helpful in debugging.

We can consider adding metrics for each stream partition timestamp.
This is helpful in debugging.
## Besides the latency metrics, we can also add throughput latency in terms of source records consumed.

More discussions here https://issues.apache.org/jira/browse/KAFKA-3715, KIP-104, KIP-105

Author: Eno Thereska <eno@confluent.io>
Author: Aarti Gupta <aartiguptaa@gmail.com>

Reviewers: Greg Fodor, Ismael Juma, Damian Guy, Guozhang Wang

Closes #1446 from aartigupta/trunk
@asfbot
Copy link

asfbot commented Jan 11, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/743/
Test FAILed (JDK 8 and Scala 2.12).

soenkeliebau pushed a commit to soenkeliebau/kafka that referenced this pull request Feb 7, 2017
… logging levels to Metrics

Kafka Streams: add granular metrics per node and per task, also expose ability to register non latency metrics in StreamsMetrics
Also added different recording levels to Metrics.

This is joint contribution from Eno Thereska and Aarti Gupta.

from apache#1362 (comment)
We can consider adding metrics for process / punctuate / commit rate at the granularity of each processor node in addition to the global rate mentioned above. This is very helpful in debugging.

We can consider adding rate / total cumulated metrics for context.forward indicating how many records were forwarded downstream from this processor node as well. This is helpful in debugging.

We can consider adding metrics for each stream partition timestamp.
This is helpful in debugging.
## Besides the latency metrics, we can also add throughput latency in terms of source records consumed.

More discussions here https://issues.apache.org/jira/browse/KAFKA-3715, KIP-104, KIP-105

Author: Eno Thereska <eno@confluent.io>
Author: Aarti Gupta <aartiguptaa@gmail.com>

Reviewers: Greg Fodor, Ismael Juma, Damian Guy, Guozhang Wang

Closes apache#1446 from aartigupta/trunk
efeg pushed a commit to efeg/kafka that referenced this pull request May 29, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.