Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-4402: make the KafkaProducer true round robin per topic #2128

Conversation

yaojuncn
Copy link
Contributor

No description provided.

Copy link
Contributor

@ewencp ewencp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yaojuncn Not sure the changes are quite right, but it looks like you'll get round robin (approximately) per topic.

This can be a hot section of the code, so it'd be good to also get some sort of benchmark before making a change like this. If we have highly concurrent producer access, are we going to take a performance hit (whether from a new synchronized block or from the ConcurrentHashMap)? I ask because the other option is to simply clarify what the guarantee is on the DefaultPartitioner.

It might also help to understand how you are hitting an issue where the round robin behavior doesn't work. I think so many users produce to a single topic that the incorrect behavior is easy to miss and a better choice might be to clarify and provide an alternative Partitioner with behavior that works correctly across multiple topics but doesn't affect performance for the common case. Is your case something like producing to two topics where you always produce to both in succession, such that, e.g., one topic always produces to even partitions and the other always to odd partitions?

@@ -35,7 +36,8 @@
*/
public class DefaultPartitioner implements Partitioner {

private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());
//private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should get rid of this as it's no longer needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, deleted.

@@ -68,6 +70,15 @@ public int partition(String topic, Object key, byte[] keyBytes, Object value, by
}
}

public int getNextValue(String topic) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be public

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to private

@@ -68,6 +70,15 @@ public int partition(String topic, Object key, byte[] keyBytes, Object value, by
}
}

public int getNextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't strictly thread safe since you might overwrite values in the map. It won't cause a crash, but not perfectly consistent behavior. Was this intentional since it only matters for concurrent access the first time a topic is accessed? Or should this entire method be synchronized to guarantee the behavior you're expecting (in which case the Map itself doesn't need to be a concurrent implementation)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I was aware of it and chose to do it in this way.
As this method is a hot method which may be called by every producer.send() if no key is provided, put it as 'synchronized' may potentially impact the performance among the threads.

while the null check may be both true for the initial concurrent threads, as you said, it only impact that very first few calls. further calls are not impacted.

@Test
public void testRoundRobin() throws InterruptedException {
final String topicA = "topicA";
String topicB = "topicB";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could also be final

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, updated

if (null == count) count = 0;
partitionCount.put(partition, count + 1);

if(i%5 == 0 ){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This formatting is going to fail checkstyle. There seem to be other failures as well. For the changes in this patch, run ./gradlew clean clients:test to make sure everything will pass.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, passed the style check for test module now.

@yaojuncn
Copy link
Contributor Author

@ewencp
in terms of performance,
while the Concurrenthashmap contains the AtomicInteger, the write only happens for first few concurrent calls, further calls are only increasing the AtomicInteger itself, there is no write to the ConcurrentHashMap later.

we could do some perf test on this, so far my understanding is the impact is very very tiny.

the perf test result on my own laptop is: "avg partition time(ms)=0.000109", so it's pretty fast with very low overhead that can be ignored,
anyway, from code perspective it only added one ConcurrentHashMap.get(topic)
@yaojuncn yaojuncn force-pushed the KAFKA-4402-client-producer-round-robin-fix branch from 94b3bba to f978b50 Compare November 14, 2016 05:27
@yaojuncn
Copy link
Contributor Author

@ewencp I added a performance test in the test codes,
it's pretty fast.
the perf test result on my own laptop is: "avg partition time(ms)=0.000109", so it's pretty fast with very low overhead that can be ignored,
anyway, from code perspective it only added one ConcurrentHashMap.get(topic)

btw, the initial commit did not pass Jenkins test because there is a failure in some other codes in trunk.

do you know how to trigger the build again?
why would new commits did not trigger the build?

@guozhangwang
Copy link
Contributor

@yaojuncn New commits should trigger a new build. You can also close-reopen to force it. As for the issue itself, I think the reason we do not observe that behavior is that in common practices it was even-out with a large number of clients.

latch.await();

double avg = (System.currentTimeMillis() - start)/ (double)(threadCount * loopCount);
System.out.println("avg partition time(ms)=" + String.format("%5f", avg));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this line intentional? Unit tests normally don't need to print out to console.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, since this method is just to get an idea of the performance of Partition.partition method, something like method benchmark

@yaojuncn
Copy link
Contributor Author

@guozhangwang thanks, I agree in practice it will mostly be random or evenly distributed, but that depends on luck... sometimes it will not.
one thing I am wondering is the existing code is trying to do some 'round-robin' but actually not doing what it plans to do.

@yaojuncn yaojuncn closed this Nov 21, 2016
@yaojuncn yaojuncn reopened this Nov 21, 2016
@ewencp
Copy link
Contributor

ewencp commented Dec 27, 2016

retest this please

@asfbot
Copy link

asfbot commented Dec 27, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/393/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Dec 27, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/391/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Dec 27, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/392/
Test FAILed (JDK 8 and Scala 2.12).

@ewencp
Copy link
Contributor

ewencp commented Dec 27, 2016

@yaojuncn The perf numbers might be a bit misleading. The test uses a bunch of threads. In that case you might get better performance for cases where you're mostly only reading from the ConcurrentHashmap (so it parallelizes well), and by writing to different topics you now actually get some benefit from the parallelism since they are using different counters.

In contrast, a single threaded version might not do as well. I modified the test to use just 1 thread (and more iterations to get a more stable number) and I see ~30% performance hit.

That said, this is just one part of what the producer does when enqueueing a message. I still think this might be a worthwhile fix, but just getting comparison perf numbers using producer performance might be more useful -- I really have no idea what fraction of the cost per-message can be attributed to partitioning (although I guess it's small). We don't necessarily need something that can be run on every commit, just some confirmation we're not going to take a significant hit for common use cases.

Also, current version is failing checkstyle in a couple of places in the test. The clients tests run quickly, so even if you leave the rest of the tests to Jenkins, you might want to run ./gradlew clients:test before pushing an update to make sure it'll pass the checkstyle tests as well as unit tests.

@asfbot
Copy link

asfbot commented Jan 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/414/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Jan 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/416/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/415/
Test PASSed (JDK 8 and Scala 2.12).

@yaojuncn yaojuncn closed this Jan 2, 2017
@yaojuncn yaojuncn reopened this Jan 2, 2017
@asfbot
Copy link

asfbot commented Jan 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/416/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Jan 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/418/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/417/
Test FAILed (JDK 8 and Scala 2.12).

@yaojuncn yaojuncn closed this Jan 2, 2017
@yaojuncn yaojuncn reopened this Jan 2, 2017
@asfbot
Copy link

asfbot commented Jan 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/418/
Test FAILed (JDK 7 and Scala 2.10).

…ing error: in my local it still print (avg partition time(ms)=0.003703)
@asfbot
Copy link

asfbot commented Jan 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/420/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jan 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/419/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Jan 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/421/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/420/
Test FAILed (JDK 7 and Scala 2.10).

AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(new Random().nextInt());
topicCounterMap.put(topic, counter);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getNextValue is not synchronised. There is a race in this line. Should be replaced with currentCounter = topicCounterMap.putIfAbsent(topic, counter); if (currentCounter != null) {counter = currntCounter}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch.
I was aware of the non-thread safe over first few calls, but did not realize the fix.
thanks.

@asfbot
Copy link

asfbot commented Jan 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/422/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/421/
Test PASSed (JDK 8 and Scala 2.12).

}

@Test
public void testPartitionPerf() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not include this along with the unit tests since it's not a unit test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine to delete it, where do you guys usually put such perf test codes to test method performance?

@@ -68,6 +70,18 @@ public int partition(String topic, Object key, byte[] keyBytes, Object value, by
}
}

private int getNextValue(String topic) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Kafka, we generally avoid the get prefix. So nextValue would probably be better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, updated.

@asfbot
Copy link

asfbot commented Jan 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/426/
Test PASSed (JDK 8 and Scala 2.11).

@yaojuncn
Copy link
Contributor Author

yaojuncn commented Jan 2, 2017

@ewencp

I did some fix over the wrong calculation of perf measurement codes (see the commit list)
and also compared the perf of DefaultPartitioner.partition() method,

below are some test result on my local machine, which shows there's almost no impact ( or very small that is going to impact even this is a heavy called function).

anyway after first call, it's just one additional ConcurrentHashMap.get(topic), which is a in memory call and highly optimized within JDK.

I do not think it will impact any.

how do you see the 30% performance hit? could you share your test codes?

I am testing on this commit:
c5d0bb0
JDK 1.8.0_65

new partitioner, 100 thread, 1000000 loop, avg partition time(ms)=0.002445
old partitioner, 100 thread, 1000000 loop, avg partition time(ms)=0.002488

new partitioner, 10 thread, 1000000 loop, avg partition time(ms)=0.000322
old partitioner, 10 thread, 1000000 loop, avg partition time(ms)=0.000473

new partitioner, 1 thread, 1000000 loop, avg partition time(ms)=0.000122
old partitioner, 1 thread, 1000000 loop, avg partition time(ms)=0.000114

@yaojuncn
Copy link
Contributor Author

yaojuncn commented Jan 2, 2017

@ewencp
in terms of the test failure, I fixed the initial checkstyle failures there.
However, for other failures, it looks to me that some tests are not stable themselves.
I looked the test history of this PR, on the same commit,
sometimes the same JDK version/scala version test will fail and sometimes they will pass.

and taking some quick look and local test, it seems the failures are not related to my changes.
let me know if I am wrong and I will try to fix the failures if due to my changes.

@asfbot
Copy link

asfbot commented Jan 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/425/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jan 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/424/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Jan 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/429/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/428/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jan 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/427/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Jan 8, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/605/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Jan 8, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/607/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 8, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/606/
Test PASSed (JDK 8 and Scala 2.12).

@ewencp
Copy link
Contributor

ewencp commented Jan 9, 2017

LGTM.

@yaojuncn The 30% hit I got was by changing your microbenchmark code to a single thread and made it long enough to get more reliable numbers out of it. But I've also taken the code and run it against ProducerPerformance. I used settings that would stress the partitioner -- tiny messages, fast throughput. Seems to have no impact.

Thanks for the patch!

@asfgit asfgit closed this in 42a6b71 Jan 9, 2017
soenkeliebau pushed a commit to soenkeliebau/kafka that referenced this pull request Feb 7, 2017
Author: yaojuncn <yaojuncn@users.noreply.github.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Vahid Hashemian <vahidhashemian@us.ibm.com>, Konstantin <konstantin@tubemogul.com>, Ismael Juma <ismael@juma.me.uk>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes apache#2128 from yaojuncn/KAFKA-4402-client-producer-round-robin-fix
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(new Random().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like, there is a race condition putting the counter to the map

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
8 participants