Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-4843: More efficient round-robin scheduler #2643

Closed
wants to merge 37 commits into from
Closed

KAFKA-4843: More efficient round-robin scheduler #2643

wants to merge 37 commits into from

Conversation

enothereska
Copy link
Contributor

@enothereska
Copy link
Contributor Author

@mjsax @dguy @guozhangwang have a look when you can. This led to most of the efficiency increases in streams consumption vs. simple consumer. Streams performance went from ~60MB/s -> 97MB/s (request size is 100 bytes).

@asfbot
Copy link

asfbot commented Mar 5, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2013/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Mar 5, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2012/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Mar 5, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2015/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Mar 5, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2017/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Mar 5, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2014/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Mar 5, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2015/
Test FAILed (JDK 8 and Scala 2.12).

@enothereska
Copy link
Contributor Author

@mjsax
Copy link
Member

mjsax commented Mar 5, 2017

From 60 to 100? Wow! That's pretty impressive! Great work @enothereska !

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

self.replication = 1


@cluster(num_nodes=9)
@matrix(test=["produce", "consume", "count", "processstream", "processstreamwithsink", "processstreamwithstatestore", "processstreamwithcachedstatestore", "kstreamktablejoin", "kstreamkstreamjoin", "ktablektablejoin"], scale=[1, 2, 3])
@matrix(test=["produce", "consume", "count", "processstream", "processstreamwithsink", "processstreamwithstatestore", "processstreamwithcachedstatestore", "kstreamktablejoin", "kstreamkstreamjoin", "ktablektablejoin"], scale=[1, 3])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you skip scale=2 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've increased the length of each test (by adding more records). I skip scale=2 to shorten the run of this test a bit. Also scale=3 is good enough to show performance (in small scale).

while (true) {
int numProcessed = task.process();
totalNumBuffered += numProcessed;
if (numProcessed == 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we should use java style

if() {
..
}

// process this task's records to completion before
// context switching to another task
while (true) {
int numProcessed = task.process();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: final


// process this task's records to completion before
// context switching to another task
while (true) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps this could be:

int numProcessed;
while ((numProcessed = task.process()) != 0) {
..}

I just have an aversion to while(true) ... break


// process this task's records to completion before
// context switching to another task
while (true) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to now check if we should be committing in here now? Before i think we'd process one record for each task and then check if we need to commit. Now we will process however many records that have been buffered before we commit. Could be quite a lot of tasks, which could mean that we are not committing as frequently as desired. This could obviously lead to having lots of duplicates during failure scenarios.
It probably doesn't make that much of a difference when the commit interval is large, but if it is small it might be significant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we have two options: 1) we could check if we need to commit after each record (previous) or 2) check if we need to commit after each batch of records from poll() has been processed (this patch). This patch speeds up the common case (no failures) while relying on EoS to eliminate duplicates if there is failure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - i'm cool with it either way. Just wanted to throw it out there as it is a change in behaviour.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang we will have to change the definition of the poll sensor to be for a bunch of requests for for each one individually. I think that's ok.

@miguno
Copy link
Contributor

miguno commented Mar 6, 2017

@enothereska wrote:

This led to most of the efficiency increases in streams consumption vs. simple consumer. Streams performance went from ~60MB/s -> 97MB/s (request size is 100 bytes).

Hey, great job!

@asfbot
Copy link

asfbot commented Mar 6, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2031/
Test FAILed (JDK 8 and Scala 2.12).

int numProcessed;
while ((numProcessed = task.process()) != 0) {
totalNumBuffered += numProcessed;
}
requiresPoll = requiresPoll || task.requiresPoll();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this always be true now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is still one path in task.process() where requiresPoll is set to false.

int numProcessed;
while ((numProcessed = task.process()) != 0) {
totalNumBuffered += numProcessed;
}
requiresPoll = requiresPoll || task.requiresPoll();

streamsMetrics.processTimeSensor.record(computeLatency(), timerStartedMs);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this now move into the loop above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to do stats on a batch of requests, not each one individually anymore. The reason is that now that we do the processing in a tight loop, the performance of computeLatency (which calls time.milliseconds()) matters more than before (when perf was slower to start with). Basically either that or this sensor needs to be a DEBUG sensor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I this case should we divide the latency by the number of records processed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, agreed.

@asfbot
Copy link

asfbot commented Mar 6, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2033/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Mar 6, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2030/
Test FAILed (JDK 7 and Scala 2.10).

@enothereska
Copy link
Contributor Author

Known problem: testReprocessingFromScratchAfterResetWithIntermediateUserTopic FAILED
java.lang.IllegalStateException: No entry found for connection 0
https://issues.apache.org/jira/browse/KAFKA-4842 (not streams related)

@guozhangwang
Copy link
Contributor

Some new jenkins failures worth mentioning:

org.apache.kafka.streams.integration.JoinIntegrationTest.testLeftKStreamKStream

java.lang.IllegalStateException: No entry found for connection 0
Stacktrace

java.lang.IllegalStateException: No entry found for connection 0
	at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:166)
	at org.apache.kafka.clients.ClusterConnectionStates.connectionState(ClusterConnectionStates.java:156)
	at org.apache.kafka.clients.NetworkClient.connectionFailed(NetworkClient.java:232)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.checkDisconnects(ConsumerNetworkClient.java:356)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:231)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:259)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:911)
org.apache.kafka.streams.integration.ResetIntegrationTest > testReprocessingFromScratchAfterResetWithIntermediateUserTopic FAILED
    java.lang.IllegalStateException: No entry found for connection 0

* Process one record
*
* @return number of records left in the buffer of this task's partition group after the processing is done
* @return number of records processed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case could we just return a boolean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

long processLatency = computeLatency();
streamsMetrics.processTimeSensor.record(processLatency / (double) totalProcessedSinceLastCommit,
timerStartedMs);
maybeCommit(this.timerStartedMs);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK makes sense.

@guozhangwang
Copy link
Contributor

@enothereska Just one minor comment otherwise LGTM.

@enothereska
Copy link
Contributor Author

enothereska commented Mar 27, 2017

@guozhangwang addressed, however let's hold off from committing since I've seen some unit test failures in trunk that need more testing. Thanks.

@asfbot
Copy link

asfbot commented Mar 27, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2419/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Mar 27, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2423/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Mar 27, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2419/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Mar 28, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2447/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Mar 28, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2451/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Mar 28, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2447/
Test FAILed (JDK 8 and Scala 2.12).

@enothereska
Copy link
Contributor Author

org.apache.kafka.streams.integration.ResetIntegrationTest > testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic STARTED seems to hang. This is consistent with what I see in trunk.

@enothereska
Copy link
Contributor Author

retest this please

@asfbot
Copy link

asfbot commented Mar 28, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2471/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Mar 28, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2467/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Mar 28, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2467/
Test FAILed (JDK 8 and Scala 2.12).

@enothereska
Copy link
Contributor Author

A recent commit has broken checkstyle. Not related to this PR.

@guozhangwang
Copy link
Contributor

@enothereska Jenkins failures seem to be due to another early commit on DualSocketChannel.

@asfbot
Copy link

asfbot commented Mar 28, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2474/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Mar 28, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2470/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Mar 28, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2470/
Test PASSed (JDK 7 and Scala 2.10).

@guozhangwang
Copy link
Contributor

retest this please.

@asfbot
Copy link

asfbot commented Mar 28, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2472/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Mar 28, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2476/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Mar 28, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2472/
Test PASSed (JDK 8 and Scala 2.12).

@guozhangwang
Copy link
Contributor

LGTM. Merged to trunk.

@asfgit asfgit closed this in 84a14fe Mar 29, 2017
@enothereska enothereska deleted the minor-schedule-round-robin branch March 30, 2017 06:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants