Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-9983: KIP-613, add INFO level e2e latency metrics #8697

Merged
merged 12 commits into from
May 27, 2020

Conversation

ableegoldman
Copy link
Contributor

@ableegoldman ableegoldman commented May 20, 2020

Moved all metrics to processor-node-level, but the INOF level metrics are recorded only at the source and "sink" nodes (in quotations as this may be a non-sink terminal node)

@ableegoldman
Copy link
Contributor Author

call for review @guozhangwang @mjsax @vvcephei

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call for review @cadonna

private static final String RECORD_E2E_LATENCY = "record-e2e-latency";
static final String RECORD_E2E_LATENCY_MAX_DESCRIPTION =
"The maximum end-to-end latency of a record, measuring by comparing the record timestamp with the "
+ "system time when it has been fully processed by the task";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming that a task might have a cache, is this correct, ie, has been fully processed by the task)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right, I put the record in the wrong place but this description is correct. It should record at the RecordCollector for the task-level metrics

getConsumerRecord(partition1, 0L),
getConsumerRecord(partition1, 10L),
getConsumerRecord(partition1, 5L),
getConsumerRecord(partition1, 20L)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We we increase this ts to 35? This would allow to test min in the last step better

@@ -14,15 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals.metrics;
package org.apache.kafka.streams.processor.internals.metrics;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice one!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, very nice!

@mjsax mjsax added the streams label May 21, 2020
@mjsax
Copy link
Member

mjsax commented May 21, 2020

Retest this please.

@ableegoldman
Copy link
Contributor Author

Alright I fixed the latency measurement to record the right thing @mjsax

@ableegoldman ableegoldman changed the title KAFKA-9983: KIP-613, add task-level e2e latency metrics (min and max) [WIP] KAFKA-9983: KIP-613, add task-level e2e latency metrics (min and max) May 22, 2020
@ableegoldman
Copy link
Contributor Author

Actually this needs to be reworked to account for the case there is no sink node. Also got the Percentiles working so I'll add them back to this PR and call for review again when ready

Copy link
Contributor

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @ableegoldman, Thank you for the PR!

Here my feedback.

@@ -14,15 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals.metrics;
package org.apache.kafka.streams.processor.internals.metrics;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, very nice!

@@ -137,6 +138,7 @@ public StreamTask(final TaskId id,
}
processRatioSensor = TaskMetrics.activeProcessRatioSensor(threadId, taskId, streamsMetrics);
processLatencySensor = TaskMetrics.processLatencySensor(threadId, taskId, streamsMetrics);
recordE2ELatencySensor = TaskMetrics.recordE2ELatencySensor(threadId, taskId, streamsMetrics);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Is there a specific reason to init the sensor here and not in SinkNode? You can init and store it there. That was one motivation to make *Metrics classes (e.g. TaskMetrics) static, so that you do not need any code in the processor context to get specific sensors. If there is not specific reason, you could get rid of the changes in the *Context* classes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I modified the proposal slightly to make these all processor-node level (will push the changes in a minute) but this question is still relevant, so here's the answer:
We can't record the e2e latency in the sink node because not all topologies have a sink node. For that reason we also can't record at the record collector. We need to figure out the terminal nodes when processing the topology, then record this metric after child.process in ProcessorContext#forward

@mjsax
Copy link
Member

mjsax commented May 22, 2020

Build failed with checkstyle:

[ant:checkstyle] [ERROR] /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java:23:8: Unused import - org.apache.kafka.common.metrics.Measurable. [UnusedImports]

@ableegoldman ableegoldman changed the title [WIP] KAFKA-9983: KIP-613, add task-level e2e latency metrics (min and max) KAFKA-9983: KIP-613, add task-level e2e latency metrics (min and max) May 22, 2020
@ableegoldman ableegoldman changed the title KAFKA-9983: KIP-613, add task-level e2e latency metrics (min and max) KAFKA-9983: KIP-613, add source & sink e2e latency metrics May 22, 2020
@ableegoldman ableegoldman changed the title KAFKA-9983: KIP-613, add source & sink e2e latency metrics KAFKA-9983: KIP-613, add INFO level e2e latency metrics May 22, 2020
@@ -183,6 +183,8 @@
files="StreamsPartitionAssignor.java"/>
<suppress checks="JavaNCSS"
files="EosBetaUpgradeIntegrationTest.java"/>
<suppress checks="StaticVariableName"
Copy link
Contributor Author

@ableegoldman ableegoldman May 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checkstyle won't allow you to have a letter and number next to each other, but P90 and E2E_LATENCY seem preferable to P_90 and E_2_E_LATENCY

double expectedP90 = values.get(p90Index - 1);
double expectedP99 = values.get(p99Index - 1);

assertEquals(expectedP90, (Double) p90.metricValue(), expectedP90 / 10);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to build confidence in the Percentiles implementation and gauge the accuracy with a more complicated test.
I found it was accurate to within 5% maybe 2/3 or 3/4 of the time, but it seems reasonable to expect it to be accurate to within 10%

Copy link
Member

@mjsax mjsax May 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's see...

Comment on lines 157 to 158
public static final int PERCENTILES_SIZE_IN_BYTES = 1000 * 1000; // 1 MB
public static double MAXIMUM_E2E_LATENCY = 100 * 24 * 60 * 60 * 1000d; // maximum latency is 1000 days
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Want to call attention to these...do they seem reasonable? The size is the bytes per each percentile sensor, so 2 per source or terminal node. The minimum has to be 0 for the linear bucketing (which I found significantly more accurate than constant bucketing in my tests).
On the other hand, the maximum is obviously not representative of the maximum difference between the current and record timestamp. If someone's processing historical data, it can exceed this. But I figure if you're processing historical data than the e2e latency isn't really going to be at all useful anyways, so we may as well set it to something reasonable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we might want to make it configurable though? Or just pick a number and see if anyone complains?

@ableegoldman ableegoldman force-pushed the KIP-613-add-task-level-metrics branch 2 times, most recently from 8b83617 to a8f1ac8 Compare May 22, 2020 22:30
@ableegoldman ableegoldman force-pushed the KIP-613-add-task-level-metrics branch from 0c7241c to a31d4f9 Compare May 23, 2020 00:03
throw new IllegalStateException("Requested to record e2e latency but could not find sensor for node " + nodeName);
} else if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) {
final long e2eLatency = now - recordTimestamp;
if (e2eLatency > MAXIMUM_E2E_LATENCY) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about this... Why do we need/want to have a limit?

Nit: double space

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack (limit explanation on comment below)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Wouldn't it be better to count measurements beyond the maximum latency towards the highest bucket as the Percentiles metric does?
Admittedly, the measured value would be quite wrong in the case of a lot of measurements greater than the maximum latency. However, with the sizes of the buckets that increase linearly, the reported values would be quite wrong anyways due to the increased approximation error. Furthermore, I guess users would put an alert on substantially smaller values.
OTOH, not counting measurements beyond the maximum latency would falsify a bit the metric because they would not count towards the remaining 1% or 10% (for p99 and p90, respectively). Additionally, the max metric would also be falsified by not counting those measurements.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meta-review procedural question: In the future, can we try to avoid making the same comment in multiple places in the PR, since it leads to split discussions like this one?

if (e2eLatency > MAXIMUM_E2E_LATENCY) {
log.warn("Skipped recording e2e latency for node {} because {} is higher than maximum allowed latency {}",
nodeName, e2eLatency, MAXIMUM_E2E_LATENCY);
} else if (e2eLatency < MINIMUM_E2E_LATENCY) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this case, should we record "zero" instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was debating this...my thinking here was that a negative value probably means you're processing some records with "future" timestamps, for whatever reason, in which case the e2e latency isn't meaningful and they shouldn't affect the statistics.
Or, your clocks are out of sync. I suppose we could add a separate metric that counts the number of records with negative e2e latency?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ableegoldman I agree with your thinking here. IMO, we should just log the warning for now. If we see that there is a demand for such a metric, we can add it later on.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with this as well, although I think it makes more sense either to pin to zero and warn or to just record the negative latency and warn. It feels like we're overthinking it. If the clocks are drifting a little and we report small negative numbers, the e2e latency is still low, which is still meaningful information. I really don't see a problem with just naively reporting it and not even bothering with a warning.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not record a negative latency. That seems to be kinda weird. I am fine with skipping and warning, too. Just wanted to clarify.

@@ -149,6 +154,10 @@ public int hashCode() {
public static final String RATE_DESCRIPTION_PREFIX = "The average number of ";
public static final String RATE_DESCRIPTION_SUFFIX = " per second";

public static final int PERCENTILES_SIZE_IN_BYTES = 1000 * 1000; // 1 MB
public static long MAXIMUM_E2E_LATENCY = 10 * 24 * 60 * 60 * 1000L; // maximum latency is 10 days
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about this. Why do we need a maximum to begin with? And why pick 10 days? Rather arbitrary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10 days was just rounding up from the 7 day default retention limit. The maximum is due to the percentiles calculation which is based on incrementally sized buckets. It's a tradeoff with accuracy

For example if I increase it by a factor of 1000, the StreamTask percentiles test is off by almost 20% (p99 is 82.9 instead of 99). This test uses values between 0 and 100, which is probably considerably lower than most e2e latencies will be.If you look at the MetricsTest percentiles test I added, this uses random values up to the max value and can maintain the 10% accuracy up to a higher max value.

Of course we don't know what the distribution will be, but it seems likely to be somewhere in the middle (not in the 100s of ms, not in the 10s or 1000s of days) so for reasonable accuracy we need to pick a reasonable maximum. We can definitely go higher than 10 days, but I reasoned that if you have records earlier than 10 days you're probably processing historical data and in that case the e2e latency isn't that meaningful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that we need a maximum due to the way the percentiles are approximated. Since the e2e latency depends on user requirements, it would make sense to consider a config for the max latency. I see two reasons for such a config.

  1. We always think about near-realtime use cases, but there could also be use cases that are allowed to provide a much higher latency but the latency should still be within a certain limit. For example, one were producers are not always online. Admittedly, 10 days is already quite high.

  2. OTOH, decreasing the max latency would also make the metric more accurate, AFAIU. That would also be a reason for a config that users can tweak.

For both cases, we could leave it like it is for now and see if there is really the need for such a config. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This necessity makes me think that our Percentiles metric algorithm needs to be improved. Admittedly, I haven't looked at the math, but it seems like it should be possible to be more adaptive.

I'm in favor of not adding a config and just leaving it alone for now, so that we can take the option in the future to fix the problem by fixing the algorithm.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, I do not think we should restrict the max value for other metrics than the percentiles one. E.g., there's no reason to restrict the value we record for the max and min metrics. You should be able to update the Percentiles implementation to apply the maximum bound in the metric record method. Otherwise, I'd recommend recording two sensors separately; one for the bounded metrics, and one for the unbounded ones.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the details. Avoiding a config for now sounds good to me. This leave the path open to add a config later, or as John suggested to maybe change the algorithm (that might not need a max). I am fine with a hard coded max of 10 days.

Also +1 to John's suggestion to split percentiles and min/max to avoid applying the hard coded limed to min/max metric.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cadonna @vvcephei @mjsax We have several related discussions going on across this PR so I'm just going to try and summarize here: let me know if I miss anything you still feel is important

The plan is to pin large/small values in the percentiles to the min/max for now and just log a warning. Since we're the only users of the Percentiles class, we can just modify it directly and avoid restricting the values for the min/max metrics as John mentioned above. If a user is experiencing small negative e2e latencies it's likely due to clock drift, and approximating as 0 seems reasonable. If they're experiencing large negative e2e latencies, there's clearly something odd going on and the e2e latency percentiles aren't meaningful. But it will still show up in the min metric and alert them to this. Presumably users may be interested to know.

I'd like to avoid introducing a config in particular because the maximum isn't an inherent mathematical property of percentiles (obviously), it's just an artifact of the existing percentiles algorithm. We can improve this and presumably remove the requirement to set a static max, but I felt the algorithm was "good enough" for now (and didn't want to make large scale changes and/or rewrite it entirely right before the 2.6 deadline).

In sum I'd say the guiding principle for this PR and the initial metrics was to be useful without being misleadingly wrong. I think pinning the percentiles to the bounds but reporting the min/max as is achieves this, and allows us flexibility in improving the situation later

@mjsax
Copy link
Member

mjsax commented May 23, 2020

Retest this please.

1 similar comment
@mjsax
Copy link
Member

mjsax commented May 23, 2020

Retest this please.

@ableegoldman
Copy link
Contributor Author

spotBugs is currently failing with a mysterious and uninformative exception so there's no use kicking off the tests again until I figure out what's going on. Might be a spotBugs bug ... in local testing I have seen it failing non deterministically... 😒

Copy link
Contributor

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ableegoldman Thanks for the updates!

Verifications in MetricsIntegrationTest are missing.

Here my feedback!

@@ -149,6 +154,10 @@ public int hashCode() {
public static final String RATE_DESCRIPTION_PREFIX = "The average number of ";
public static final String RATE_DESCRIPTION_SUFFIX = " per second";

public static final int PERCENTILES_SIZE_IN_BYTES = 1000 * 1000; // 1 MB
public static long MAXIMUM_E2E_LATENCY = 10 * 24 * 60 * 60 * 1000L; // maximum latency is 10 days
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that we need a maximum due to the way the percentiles are approximated. Since the e2e latency depends on user requirements, it would make sense to consider a config for the max latency. I see two reasons for such a config.

  1. We always think about near-realtime use cases, but there could also be use cases that are allowed to provide a much higher latency but the latency should still be within a certain limit. For example, one were producers are not always online. Admittedly, 10 days is already quite high.

  2. OTOH, decreasing the max latency would also make the metric more accurate, AFAIU. That would also be a reason for a config that users can tweak.

For both cases, we could leave it like it is for now and see if there is really the need for such a config. WDYT?

if (e2eLatency > MAXIMUM_E2E_LATENCY) {
log.warn("Skipped recording e2e latency for node {} because {} is higher than maximum allowed latency {}",
nodeName, e2eLatency, MAXIMUM_E2E_LATENCY);
} else if (e2eLatency < MINIMUM_E2E_LATENCY) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ableegoldman I agree with your thinking here. IMO, we should just log the warning for now. If we see that there is a demand for such a metric, we can add it later on.

throw new IllegalStateException("Requested to record e2e latency but could not find sensor for node " + nodeName);
} else if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) {
final long e2eLatency = now - recordTimestamp;
if (e2eLatency > MAXIMUM_E2E_LATENCY) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Wouldn't it be better to count measurements beyond the maximum latency towards the highest bucket as the Percentiles metric does?
Admittedly, the measured value would be quite wrong in the case of a lot of measurements greater than the maximum latency. However, with the sizes of the buckets that increase linearly, the reported values would be quite wrong anyways due to the increased approximation error. Furthermore, I guess users would put an alert on substantially smaller values.
OTOH, not counting measurements beyond the maximum latency would falsify a bit the metric because they would not count towards the remaining 1% or 10% (for p99 and p90, respectively). Additionally, the max metric would also be falsified by not counting those measurements.

Copy link
Contributor

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ableegoldman Thanks for the updates!

Verifications in MetricsIntegrationTest are missing.

Here my feedback!

Copy link
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this, @ableegoldman !

if (e2eLatency > MAXIMUM_E2E_LATENCY) {
log.warn("Skipped recording e2e latency for node {} because {} is higher than maximum allowed latency {}",
nodeName, e2eLatency, MAXIMUM_E2E_LATENCY);
} else if (e2eLatency < MINIMUM_E2E_LATENCY) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with this as well, although I think it makes more sense either to pin to zero and warn or to just record the negative latency and warn. It feels like we're overthinking it. If the clocks are drifting a little and we report small negative numbers, the e2e latency is still low, which is still meaningful information. I really don't see a problem with just naively reporting it and not even bothering with a warning.

throw new IllegalStateException("Requested to record e2e latency but could not find sensor for node " + nodeName);
} else if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) {
final long e2eLatency = now - recordTimestamp;
if (e2eLatency > MAXIMUM_E2E_LATENCY) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meta-review procedural question: In the future, can we try to avoid making the same comment in multiple places in the PR, since it leads to split discussions like this one?

@@ -149,6 +154,10 @@ public int hashCode() {
public static final String RATE_DESCRIPTION_PREFIX = "The average number of ";
public static final String RATE_DESCRIPTION_SUFFIX = " per second";

public static final int PERCENTILES_SIZE_IN_BYTES = 1000 * 1000; // 1 MB
public static long MAXIMUM_E2E_LATENCY = 10 * 24 * 60 * 60 * 1000L; // maximum latency is 10 days
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This necessity makes me think that our Percentiles metric algorithm needs to be improved. Admittedly, I haven't looked at the math, but it seems like it should be possible to be more adaptive.

I'm in favor of not adding a config and just leaving it alone for now, so that we can take the option in the future to fix the problem by fixing the algorithm.

@@ -149,6 +154,10 @@ public int hashCode() {
public static final String RATE_DESCRIPTION_PREFIX = "The average number of ";
public static final String RATE_DESCRIPTION_SUFFIX = " per second";

public static final int PERCENTILES_SIZE_IN_BYTES = 1000 * 1000; // 1 MB
public static long MAXIMUM_E2E_LATENCY = 10 * 24 * 60 * 60 * 1000L; // maximum latency is 10 days
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, I do not think we should restrict the max value for other metrics than the percentiles one. E.g., there's no reason to restrict the value we record for the max and min metrics. You should be able to update the Percentiles implementation to apply the maximum bound in the metric record method. Otherwise, I'd recommend recording two sensors separately; one for the bounded metrics, and one for the unbounded ones.

@@ -586,6 +606,7 @@ public boolean process(final long wallClockTime) {
log.trace("Start processing one record [{}]", record);

updateProcessorContext(record, currNode, wallClockTime);
maybeRecordE2ELatency(record.timestamp, wallClockTime, currNode.name());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand this right, we are recording sink latencies after processing, but source latencies before processing. This nicely avoids the problem with recording non-sink latencies after processing, but is it accurate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed this offline, but in case anyone else was wondering:

Yes. We can't record the latency after processing for source nodes due to our recursive DFS approach to processing, as the source node's #process actually doesn't complete until the record has been processed by every other node in the subtopology. And anyways, the intent of the source node metric is to gauge the e2e latency when the record arrives at the subtopology, which is what we are recording here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @ableegoldman , I think this is a fine tradeoff. Also helping is the fact that we know all "source nodes" are actually instances of SourceNode, which specifically do nothing except forward every record, so whether we measure these nodes before or "after" their processing logic should make no practical difference at all.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

o whether we measure these nodes before or "after" their processing logic should make no practical difference at all.

I think it make a big difference, and only recording before processing is what we want (according to what the KIP says). Otherwise, the latency includes the processing time for one or more processors (in the worst case even all processors).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for my ambiguity. Please let me clarify my terms. Currently if you wait until the end of the "process" method, you wind up including the call to forward, which recursively calls process on all descendents of the source node. This is not what I was talking about. I meant only the time spent just in processing the SourceNode, excluding the time in "forward". What shall we call this? Maybe "actual", or "proper", or "internal" processing time?

So, my comment was that, given that we know the implementation of SourceNode, we know that it's "actual", "proper", "internal" processing time is going to be very small, probably far less than a single millisecond. So it doesn't make any practical difference whether we measure before the call for just the special case of source nodes, or magically solve the problem of measuring the e2e latency after internal processing, but not including the calls to "forward".

This is why I think it's fine to measure SourceNodes before the call to process, even though the KIP technically specifies that processors' end-to-end latencies should include processing latency. We're making a simplifying assumption that for source nodes specifically, the processing latency would be << 1, so we can ignore it.

@mjsax
Copy link
Member

mjsax commented May 26, 2020

Retest this please.

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not review the test code. Overall LGTM. Leave it up to @cadonna and @vvcephei to push it over the finish line.

@mjsax
Copy link
Member

mjsax commented May 26, 2020

Retest this please.

Copy link
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update, @ableegoldman !

I'll wait for @cadonna to do another pass, if he likes.

@vvcephei
Copy link
Contributor

Test this please

@vvcephei
Copy link
Contributor

The only failing tests were unrelated:
org.apache.kafka.connect.integration.InternalTopicsIntegrationTest.testCreateInternalTopicsWithFewerReplicasThanBrokers
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false]
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]

Note that the java 8 build is not actually running.

@vvcephei
Copy link
Contributor

Unbelievable. The java 8 build started running right after I mentioned that it's not running.

@ableegoldman
Copy link
Contributor Author

The java 8 build started running right after I mentioned that it's not running

I hope you'll choose to use these new powers for good

@vvcephei vvcephei merged commit 83c616f into apache:trunk May 27, 2020
@ableegoldman ableegoldman deleted the KIP-613-add-task-level-metrics branch May 28, 2020 02:47
@mjsax mjsax added the kip Requires or implements a KIP label Jun 12, 2020
ableegoldman pushed a commit to ableegoldman/kafka that referenced this pull request Jun 13, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kip Requires or implements a KIP streams
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants