Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-9753: Add active tasks process ratio #8370

Merged
merged 24 commits into from Mar 31, 2020

Conversation

guozhangwang
Copy link
Contributor

@guozhangwang guozhangwang commented Mar 27, 2020

Measure the percentage ratio the stream thread spent on processing each task among all assigned active tasks (KIP-444).

Also add unit tests to cover the added metrics in this PR and the previous #8358

Also trying to fix the flaky test reported in KAFKA-5842

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Contributor Author

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only be reviewed after #8358

@@ -192,6 +192,10 @@ public boolean isValidTransition(final State newState) {

void markChangelogAsCorrupted(final Collection<TopicPartition> partitions);

void recordProcessLatency(final long elapsedLatencyMs);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit awkward.. if anyone have a more elegant solution I'm all ears.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prop: You could record the time spent in processing in Task#process() and just have a method Task#recordProcessRatio(final long totalProcessTime) in the interface.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @guozhangwang , I did have some ideas (similar to @cadonna 's), but it might be easier to just present it as a diff: guozhangwang#12

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally did this, but I'm concerned that the latency for processing a single record is sub-milliseconds and hence the cumulated latency would be zero.

@guozhangwang
Copy link
Contributor Author

call @vvcephei @cadonna for reviews

@vvcephei vvcephei self-requested a review March 27, 2020 18:27
@guozhangwang
Copy link
Contributor Author

test this please

1 similar comment
@guozhangwang
Copy link
Contributor Author

test this please

Copy link
Contributor Author

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed unit tests, cc @vvcephei

@@ -646,8 +645,8 @@ public void concurrentAccesses() throws Exception {
producerThread.start();

try {
waitUntilAtLeastNumRecordProcessed(outputTopicConcurrent, numberOfWordsPerIteration);
waitUntilAtLeastNumRecordProcessed(outputTopicConcurrentWindowed, numberOfWordsPerIteration);
waitUntilAtLeastNumRecordProcessed(outputTopicConcurrent, 1);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test becomes more flaky than before after the main loop refactoring, shown as the following:

java.lang.AssertionError: Did not receive all 48 records from topic output-concurrent-2 within 120000 ms
Expected: is a value equal to or greater than <48>
     but: <38> was less than <48>

Each iteration has 48 words, and 38 unique words, and when it fails it reported only 38 records ever consumed from the output topic. I suspect this is because of the caching effects, we processed all 500000 iterations without sending out any updates to the resulted count table output topic (seems we process too fast?).

I piggy-backed a fix for this flakiness by reducing the requirement that we start the verification as long as we've seen one record in the output, similar as we do in other tests, and then in verifyGreaterOrEqual we do not require all keys been populated already.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prop: I would prefer to have this fix in its own PR, because the commit would be easier to retrieve and to reference on failure tickets.


// we record the ratio out of the while loop so that the accumulated latency spans over
// multiple iterations with reasonably large max.num.records and hence is less vulnerable to outliers
taskManager.recordTaskProcessRatio(totalProcessLatency);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note we should only call this inside the if (state == State.RUNNING) condition otherwise we may get ConcurrentModification exception.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Could you elaborate on this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not know exactly what's happening here, my guess is that in PARTITION_REVOKED state the tasks might be modified.

@guozhangwang
Copy link
Contributor Author

test this please

Copy link
Contributor

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang Thank you for the PR!

Here my feedback

@@ -96,6 +103,22 @@ public static Sensor processLatencySensor(final String threadId,
return emptySensor(threadId, taskId, PROCESS_LATENCY, RecordingLevel.DEBUG, streamsMetrics);
}

public static Sensor activeProcessRatioSensor(final String threadId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

req: Could you please add tests for this new sensor to TaskMetricsTest.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack.

Comment on lines 111 to 118
sensor.add(
new MetricName(
name,
TASK_LEVEL_GROUP,
PROCESS_RATIO_DESCRIPTION,
streamsMetrics.taskLevelTagMap(threadId, taskId)),
new Value()
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
sensor.add(
new MetricName(
name,
TASK_LEVEL_GROUP,
PROCESS_RATIO_DESCRIPTION,
streamsMetrics.taskLevelTagMap(threadId, taskId)),
new Value()
);
addValueMetricToSensor(
sensor,
TASK_LEVEL_GROUP,
streamsMetrics.taskLevelTagMap(threadId, taskId),
name,
PROCESS_RATIO_DESCRIPTION
);

req: Please use StreamsMetricsImpl#addValueMetricToSensor()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack.

@@ -68,6 +70,17 @@ public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions
stateMgr.markChangelogAsCorrupted(partitions);
}

@Override
public void recordProcessLatency(final long elapsedLatencyMs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prop: I find processLatency and totalLatency a bit misleading. What about processTime and totalProcessTime.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack.

@@ -616,6 +619,12 @@ public boolean process(final long wallClockTime) {
return true;
}

@Override
public void recordTotalLatency(final long elapsedLatencyMs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prop: Could you rename recordTotalLatency() to recordProcessRatioAndReset().


// we record the ratio out of the while loop so that the accumulated latency spans over
// multiple iterations with reasonably large max.num.records and hence is less vulnerable to outliers
taskManager.recordTaskProcessRatio(totalProcessLatency);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Could you elaborate on this?

@@ -646,8 +645,8 @@ public void concurrentAccesses() throws Exception {
producerThread.start();

try {
waitUntilAtLeastNumRecordProcessed(outputTopicConcurrent, numberOfWordsPerIteration);
waitUntilAtLeastNumRecordProcessed(outputTopicConcurrentWindowed, numberOfWordsPerIteration);
waitUntilAtLeastNumRecordProcessed(outputTopicConcurrent, 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prop: I would prefer to have this fix in its own PR, because the commit would be easier to retrieve and to reference on failure tickets.

@@ -192,6 +192,10 @@ public boolean isValidTransition(final State newState) {

void markChangelogAsCorrupted(final Collection<TopicPartition> partitions);

void recordProcessLatency(final long elapsedLatencyMs);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prop: You could record the time spent in processing in Task#process() and just have a method Task#recordProcessRatio(final long totalProcessTime) in the interface.

@@ -616,6 +619,12 @@ public boolean process(final long wallClockTime) {
return true;
}

@Override
public void recordTotalLatency(final long elapsedLatencyMs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

req: Could you please add a unit test for this method?

@@ -68,6 +70,17 @@ public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions
stateMgr.markChangelogAsCorrupted(partitions);
}

@Override
public void recordProcessLatency(final long elapsedLatencyMs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

req: Could you please add a unit test for this method?

@vvcephei vvcephei mentioned this pull request Mar 31, 2020
3 tasks
Copy link
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, modulo the remainder of @cadonna 's feedback.

@guozhangwang guozhangwang merged commit 353aa62 into apache:trunk Mar 31, 2020
@guozhangwang guozhangwang deleted the K9753-active-process-ratio branch March 31, 2020 23:39
@mjsax mjsax added the kip Requires or implements a KIP label Jun 12, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kip Requires or implements a KIP streams
Projects
None yet
4 participants