-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-4843: More efficient round-robin scheduler #2643
Conversation
@mjsax @dguy @guozhangwang have a look when you can. This led to most of the efficiency increases in streams consumption vs. simple consumer. Streams performance went from ~60MB/s -> 97MB/s (request size is 100 bytes). |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Full benchmark run: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/748/console |
From 60 to 100? Wow! That's pretty impressive! Great work @enothereska ! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
self.replication = 1 | ||
|
||
|
||
@cluster(num_nodes=9) | ||
@matrix(test=["produce", "consume", "count", "processstream", "processstreamwithsink", "processstreamwithstatestore", "processstreamwithcachedstatestore", "kstreamktablejoin", "kstreamkstreamjoin", "ktablektablejoin"], scale=[1, 2, 3]) | ||
@matrix(test=["produce", "consume", "count", "processstream", "processstreamwithsink", "processstreamwithstatestore", "processstreamwithcachedstatestore", "kstreamktablejoin", "kstreamkstreamjoin", "ktablektablejoin"], scale=[1, 3]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you skip scale=2 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've increased the length of each test (by adding more records). I skip scale=2 to shorten the run of this test a bit. Also scale=3 is good enough to show performance (in small scale).
while (true) { | ||
int numProcessed = task.process(); | ||
totalNumBuffered += numProcessed; | ||
if (numProcessed == 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we should use java style
if() {
..
}
// process this task's records to completion before | ||
// context switching to another task | ||
while (true) { | ||
int numProcessed = task.process(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: final
|
||
// process this task's records to completion before | ||
// context switching to another task | ||
while (true) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps this could be:
int numProcessed;
while ((numProcessed = task.process()) != 0) {
..}
I just have an aversion to while(true) ... break
|
||
// process this task's records to completion before | ||
// context switching to another task | ||
while (true) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to now check if we should be committing in here now? Before i think we'd process one record for each task and then check if we need to commit. Now we will process however many records that have been buffered before we commit. Could be quite a lot of tasks, which could mean that we are not committing as frequently as desired. This could obviously lead to having lots of duplicates during failure scenarios.
It probably doesn't make that much of a difference when the commit interval is large, but if it is small it might be significant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we have two options: 1) we could check if we need to commit after each record (previous) or 2) check if we need to commit after each batch of records from poll() has been processed (this patch). This patch speeds up the common case (no failures) while relying on EoS to eliminate duplicates if there is failure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah - i'm cool with it either way. Just wanted to throw it out there as it is a change in behaviour.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@guozhangwang we will have to change the definition of the poll sensor to be for a bunch of requests for for each one individually. I think that's ok.
@enothereska wrote:
Hey, great job! |
Refer to this link for build results (access rights to CI server needed): |
int numProcessed; | ||
while ((numProcessed = task.process()) != 0) { | ||
totalNumBuffered += numProcessed; | ||
} | ||
requiresPoll = requiresPoll || task.requiresPoll(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this always be true now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is still one path in task.process() where requiresPoll is set to false.
int numProcessed; | ||
while ((numProcessed = task.process()) != 0) { | ||
totalNumBuffered += numProcessed; | ||
} | ||
requiresPoll = requiresPoll || task.requiresPoll(); | ||
|
||
streamsMetrics.processTimeSensor.record(computeLatency(), timerStartedMs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this now move into the loop above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to do stats on a batch of requests, not each one individually anymore. The reason is that now that we do the processing in a tight loop, the performance of computeLatency (which calls time.milliseconds()) matters more than before (when perf was slower to start with). Basically either that or this sensor needs to be a DEBUG sensor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I this case should we divide the latency by the number of records processed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, agreed.
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Known problem: testReprocessingFromScratchAfterResetWithIntermediateUserTopic FAILED |
Some new jenkins failures worth mentioning:
|
* Process one record | ||
* | ||
* @return number of records left in the buffer of this task's partition group after the processing is done | ||
* @return number of records processed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case could we just return a boolean
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok
long processLatency = computeLatency(); | ||
streamsMetrics.processTimeSensor.record(processLatency / (double) totalProcessedSinceLastCommit, | ||
timerStartedMs); | ||
maybeCommit(this.timerStartedMs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK makes sense.
@enothereska Just one minor comment otherwise LGTM. |
@guozhangwang addressed, however let's hold off from committing since I've seen some unit test failures in trunk that need more testing. Thanks. |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
org.apache.kafka.streams.integration.ResetIntegrationTest > testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic STARTED seems to hang. This is consistent with what I see in trunk. |
retest this please |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
A recent commit has broken checkstyle. Not related to this PR. |
@enothereska Jenkins failures seem to be due to another early commit on |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
retest this please. |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
LGTM. Merged to trunk. |
Improves streams efficiency by more than 200K requests/second (small 100 byte requests)
Gets streams efficiency very close to pure consumer (see results in https://jenkins.confluent.io/job/system-test-kafka-branch-builder/746/console)
Maintains same fairness across tasks
Schedules all records in the queue in-between poll() calls, not just one per task.