Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-6431: Shard purgatory to mitigate lock contention #5338

Merged
merged 7 commits into from
Jan 4, 2019

Conversation

ying-zheng
Copy link
Contributor

@ying-zheng ying-zheng commented Jul 6, 2018

Shard purgatory and use ReentrantLock instead of ReentrantReadWriteLock

This fix has been deployed to Uber's production environment for several months

@ying-zheng ying-zheng changed the title Kafka 6431 [KAFKA-6431] Shard purgatory to mitigate lock contention Jul 6, 2018
@ying-zheng ying-zheng changed the title [KAFKA-6431] Shard purgatory to mitigate lock contention KAFKA-6431: Shard purgatory to mitigate lock contention Jul 6, 2018
Copy link

@harshach harshach left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch @ying-zheng . Over all it looks good to me. Left some minor nits.
build failure is not related to this patch. All unit tests pass on my local with this patch.
@guozhangwang you might want to take a look as well.

}

/* 512 shards */
private val watcherLists = Array.fill[WatcherList](512)(new WatcherList)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any specific reason for 512 shards. Should we make this configurable. If not lets declare it as a constant with a comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't want to make the configuration over complicated. 512 is just a large enough number. i have some explanation in the ticket. i will make it a constant

private class WatcherList {
val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers(key)))

val removeWatchersLock = new ReentrantLock()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we call this as watchersLock? since we are using to read as well.


/* a list of operation watching keys */
private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers(key)))
//private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers(key)))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets remove the commented out lines

@@ -282,7 +303,13 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri
* on multiple lists, and some of its watched entries may still be in the watch lists
* even when it has been completed, this number may be larger than the number of real operations watched
*/
def watched: Int = allWatchers.map(_.countWatched).sum
def watched() = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add the return type.

@guozhangwang
Copy link
Contributor

cc @junrao @rajinisivaram to take a look.

@ijuma
Copy link
Contributor

ijuma commented Jul 9, 2018

cc @cmccabe

@ijuma
Copy link
Contributor

ijuma commented Jul 9, 2018

Thanks for the PR @ying-zheng. Can you also share the improvement you have seen after deploying this change to Uber's production environment?

@ying-zheng
Copy link
Contributor Author

ying-zheng commented Jul 9, 2018

@ijuma in my simulation test, this change reduced the P50 produce latency (acks=all) from 4ms to 3ms.

You can find more details about the simulation test in https://www.slideshare.net/YingZheng35/improving-kafka-atleastonce-performance-at-uber

As we deployed serval performance improvements to the production environment together, it's hard to tell the exact improvement from this change. With these performance optimizations (mostly presented in the slides above), the produce latency (P99 and P50) of our acks=all cluster reduced by about 90%, which is more significant that the improvements we saw in the simulation test.

@harshach
Copy link

Thanks for the changes @ying-zheng. I am +1 on the patch. I'll wait for other reviewers to comment before merging it in.

@harshach
Copy link

@junrao @rajinisivaram ping for a review. Thanks.

Copy link
Contributor

@rajinisivaram rajinisivaram left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ying-zheng Thanks for the PR. Left a few minor comments. Could we also add some unit tests to verify the cases where the operations are in the same shard as well as different shards?


private val removeWatchersLock = new ReentrantReadWriteLock()
private val Shards = 512 // Shard the watcher list to reduce lock contention
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps define this in an object rather than as an instance variable?

private val removeWatchersLock = new ReentrantReadWriteLock()
private val Shards = 512 // Shard the watcher list to reduce lock contention
private val watcherLists = Array.fill[WatcherList](Shards)(new WatcherList)
private def getWatcherList(key: Any): WatcherList = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getWatcherList -> watcherList since we don't normally add get prefix?

def watched: Int = allWatchers.map(_.countWatched).sum
def watched(): Int = {
var sum = 0
for (wl <- watcherLists) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could use foldLeft instead of for loop?

@@ -424,7 +444,10 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri
// a little overestimated total number of operations.
estimatedTotalOperations.getAndSet(delayed)
debug("Begin purging watch lists")
val purged = allWatchers.map(_.purgeCompleted()).sum
var purged = 0
for (wl <- watcherLists) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could use foldLeft instead of for loop?

@cmccabe
Copy link
Contributor

cmccabe commented Jul 17, 2018

I tested this and didn't see any performance difference. However, it may be that my test case was too small-scale to see a benefit. If there is a benefit, I would expect to see it when there is a lot of contention due to many simultaneous requests.

@ying-zheng
Copy link
Contributor Author

@rajinisivaram thank you for the comments. I have updated the code diff

@ying-zheng
Copy link
Contributor Author

ying-zheng commented Jul 18, 2018

@cmccabe
In my test, each broker leads about 900 topic partitions, and there are about 300K produce request per second.

@cmccabe
Copy link
Contributor

cmccabe commented Jul 18, 2018

Thanks. Out of curiousity, how many partitions (and partitions per broker) did you use?

@ying-zheng
Copy link
Contributor Author

@cmccabe About 2800 topic-partitions per broker, 3 replicas, each broker leads ~900 topic-partitions. besides 2 followers, there are 4 consumers consuming each topic

@ying-zheng ying-zheng closed this Jul 19, 2018
@ying-zheng ying-zheng reopened this Jul 19, 2018
Copy link
Contributor

@tedyu tedyu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall.

/* a list of operation watching keys */
private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers(key)))
private class WatcherList {
val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers(key)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

watchersForKey -> watchersByKey

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@harshach
Copy link

@rajinisivaram can you review the latest changes. If looks good would like to merge into trunk and 1.1 branch

@ijuma
Copy link
Contributor

ijuma commented Jul 21, 2018

@harshach, a meta-comment: I noticed that there are a number of PRs where you mentioned merging to trunk and 1.1. There is also a 2.0 branch, if you backport to 1.1, it's essential that the change is also cherry-picked to 2.0.

An additional point is that we tend to backport low risk and/or important fixes to older branches. We want to make sure we don't introduce regressions to older releases. Please take this into consideration when considering what to backport.

@harshach
Copy link

@ijuma understood. These patches are for performance improvements and some are critical fixes for large clusters. Users who are deploying large clusters are usually don't upgrade latest versions easily hence the reason to merge critical fixes to the 1.x line

@ijuma
Copy link
Contributor

ijuma commented Jul 23, 2018

The numbers provided in this PR seemed very incremental and were for several changes (not just this one), so the case for backporting does not seem strong.

@harshach
Copy link

@ijuma fair enough. Will merge it into the trunk.
@rajinisivaram another ping for your approval. Thanks.

@rajinisivaram
Copy link
Contributor

@ying-zheng Thanks for the updates. Implementation looks good. We have a micro-benchmark for the purgatory: test.TestPurgatoryPerformance. Do you think it will be useful to update this to test to run with multiple threads to test the scenario that this PR helps with? That would help us easily test for regressions in the future. Thank you!

@ying-zheng
Copy link
Contributor Author

@rajinisivaram I think should be able to see the performance difference with 32 threads and >300kQPS. However, I don't know how the micro-benchmark test works. How do you know if there is performance regression? Run the test with different Kafka versions on the same host?

@rajinisivaram
Copy link
Contributor

@ying-zheng Yes, if you could run the benchmark on the same host with and without this PR, that would be great. Thank you!

@harshach
Copy link

@rajinisivaram can we make this PR Into upcoming releases. Unfortunately running a benchmark taking time. Is that a blocker to get this merged in.

@rajinisivaram
Copy link
Contributor

@harshach Perhaps we can merge this just to trunk? @ijuma What do you think?

@harshach
Copy link

@rajinisivaram Let me know if you are +1 to merge into trunk.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ying-zheng : Thanks for the patch. LGTM. Just a few minor comments below.

@@ -424,7 +441,7 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri
// a little overestimated total number of operations.
estimatedTotalOperations.getAndSet(delayed)
debug("Begin purging watch lists")
val purged = allWatchers.map(_.purgeCompleted()).sum
var purged = watcherLists.foldLeft(0) { _ + _.allWatchers.map(_.purgeCompleted()).sum }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does purged need to be var?

Also, could we use case inside foldLeft() so that we can use named params instead of _?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

* note that the returned watchers may be removed from the list by other threads
*/
def allWatchers = {
inLock(watchersLock) { watchersByKey.values }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an existing issue. But I am wondering if we really need the lock here. What we return from the lock is a view of the backing map, which can change after the lock is released. Since ConcurrentHashMap supports weakly consistent iterator already, it seems that we don't need the lock.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the lock
Thank you for the comments!

Happy new year!

@@ -282,7 +299,9 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri
* on multiple lists, and some of its watched entries may still be in the watch lists
* even when it has been completed, this number may be larger than the number of real operations watched
*/
def watched: Int = allWatchers.map(_.countWatched).sum
def watched(): Int = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add () to watched? It doesn't seem to have any side effect.

@harshach
Copy link

harshach commented Jan 4, 2019

@junrao can you please re-check. @yingzuber addressed your comments.

@harshach harshach merged commit 459a4dd into apache:trunk Jan 4, 2019
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
* Shard purgatory to reduce lock contention

* put constant into Object, use foldLeft instead of for loop

* watchersForKey -> watchersByKey

* Incorporate Jun's comments: use named arguments instead of _, and remove
an unnecessary lock

Reviewers: Sriharsha Chintalapani <sriharsha@apache.org>, Jun Rao <junrao@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants