-
Notifications
You must be signed in to change notification settings - Fork 7.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ZOOKEEPER-1177] Add the memory optimized watch manager for concentrate watches scenario #590
Conversation
d4f996f
to
04fad10
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hanm the code complained by Findbug are all correct and expected:
|
@lvfangmin In which case you need to add exceptions to |
@lvfangmin interesting.where can we find a benchmark which shows how this pr improves the memory? |
Based on internal benchmark, this may save more than 95% memory usage on concentrate watches scenario. I'm working on adding some micro benchmark. In the original Jira, @phunt also added some basic test, which could show you some ideas about how much memory it's going to save. The current HashMap based watch manager uses lots of memory due to the overhead of storing each entry (32 Bytes for each entry), the object reference and the duplicated path strings. Use BitSet without revert index could reduce those overhead and make it more memory efficient. |
There is a PR in the Jira few years ago, which uses BitMap as well, but it sacrificed the performance on triggering watches, this patch improved that and uses lazy clean up for cleaning those watchers who has closed the session. |
This is going to be a huge improvement for Accumulo for example that is a heavy user of watchers. I'm going to allocate some capacity to review these new patches. |
04fad10
to
88bfdab
Compare
Added JMH micro benchmark for the watch manager:
Here are more result about the throughput/latency related with WatchManager:
You can try the following command to run the micro benchmark:
@maoling @anmolnar hope this gives you a more vivid comparison between the old and new watch manager implementation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lvfangmin Please take a look at my initial feedback on the patch. Didn't have time to review the testing side, but it generally looks good.
@nkalmar There's a new project being added here: bench
. Please take a quick look at it and advise on what would be the best place to put in terms of Maven migration. Thanks.
build.xml
Outdated
@@ -119,6 +119,7 @@ xmlns:cs="antlib:com.puppycrawl.tools.checkstyle.ant"> | |||
<property name="test.java.classes" value="${test.java.build.dir}/classes"/> | |||
<property name="test.src.dir" value="${src.dir}/java/test"/> | |||
<property name="systest.src.dir" value="${src.dir}/java/systest"/> | |||
<property name="bench.src.dir" value="${src.dir}/java/bench"/> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this new dir should be added to classpath of eclipse
task too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do.
src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java
Show resolved
Hide resolved
@Override | ||
public void removeWatcher(Watcher watcher) { | ||
Integer watcherBit; | ||
addRemovePathRWLock.writeLock().lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need to acquire write lock here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need the exclusive lock with addWatch, otherwise addWatch may still add a dead watch which won't be cleaned up in the WatchCleaner when it started to clean up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In which case you need to move addDeadWatcher()
call inside the critical block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missed this comment last time, what we need here is that, as long as we called addDeadWatcher, there will be no watches added related with this dead watcher. Code executed after line 136 means the watcher will be marked as stale, after we release this lock, any on flying addWatcher for this dead watcher will be rejected, so it guarantees when we call addDeadWatcher there will be no race condition between removing and adding watch.
And I need to move addDeadWatcher out of the locking block, since the WatchCleaner might block on it to avoid OOM issue if the cleaner cannot catch up of cleaning the dead watchers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where do you mark the watcher as stale inside the critical block?
It only calls a getter on the BitIdMap
, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the caller of removeWatcher, which is in NIOServerCnxn.close and NettyServerCnxn.close.
When the cnxn is closed, it will set stale before call removeCnxn on zkServer, which calls this function sequentially, if we grabbed this lock, it means the cnxn has been marked as stale.
We can explicitly setStale here as well, but I don't think that's necessary.
src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java
Show resolved
Hide resolved
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class WatchManagerFactory { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A quick javadoc would be awesome here.
|
||
import java.util.Set; | ||
|
||
public interface DeadWatcherListener { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a few words javadoc here.
import java.util.BitSet; | ||
import java.util.concurrent.locks.ReentrantReadWriteLock; | ||
|
||
public class BitMap<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a short javadoc similar to BitHashSet's would useful here.
src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java
Show resolved
Hide resolved
import org.apache.zookeeper.WatchedEvent; | ||
import org.apache.zookeeper.proto.ReplyHeader; | ||
|
||
public class DumbWatcher extends ServerCnxn { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please consider using mockito.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree from unit test case mock object is easier to maintain than stub ones, but I also need this DumbWatcher in the micro benchmark, I'll put this class somewhere in the code, so the micro benchmark and unit test can share it.
@lvfangmin Docs for the new caching parameter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On the new project bench:
I'm not sure we need a top level module for this bench test (although more could come in the future...)
From my side, this can stay in an org.apache.zookeeper project, but then it should go to org.apache.zookeeper.bench in my opinion, and directory structure should be src/java/org/apache/zookeeper/bench/**
It will be moved with maven migration anyway (it can't stay in java/bench/org/..
@lvfangmin Can you please move the directory, and possibly also create a new package "branch" for it, under zookeeper?
Otherwise, looking at the patch, really great job! Thanks!
Thanks @nkalmar for the suggestion of bench project position, I was following the same directory as the src/java/systest for now, do you think we can move them together later? I'm not against to move it now. If we want to move now, just to confirm the directory is src/java/org/apache/zookeeper/bench/ without main folder, right? |
systest will go to src/test/java , from my side, you can put the bench in org.apache.zookeeper.test.system . Thinking about it, that's a pretty good place. No need to create main directory or anything, I will move all the files anyway. Just move the files amongst the others in systest. (Hopefully no package level dependency, I didn't check) But there's a chance you will have to rebase if this this PR cannot be merged before the movement of all the remaining files as the directory refactor's last step. Sorry about that in advance... I'm not going to be the most popular with that PR :( |
Update based on the comments:
@nkalmar I moved the bench to src/test/java/bench, which seems more reasonable to me, let me know if you think that's not a good position based on your plan. |
4043700
to
f212cfd
Compare
Resolve conflict with latest code on master. |
f212cfd
to
c9962c9
Compare
@lvfangmin Thanks. Sorry for the delay. I'd like to check one more thing before accepting. Bear with me please. |
@lvfangmin Just to wrap up the difference between this and original 6-year-old patch on Jira: you've added Is that correct? |
@anmolnar Thanks for reviewing, take your time. Here are the main differences between this version and the old one on the Jira:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, thanks for the summary. It's very useful for the records.
+1 approved
Refer to this link for build results (access rights to CI server needed): |
@@ -0,0 +1,12 @@ | |||
package org.apache.zookeeper; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file is missing apache license header. This triggers a -1 in last jenkins build.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will add it.
@anmolnar I'll sign this off by end of next Monday if no other issues. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some nitpicks on spelling, a couple of questions, and some comments around code i see suspicious..
import org.apache.zookeeper.server.ServerCnxn; | ||
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove all imports here except these three since rest of those were not used (my guess is this file was copied pasted?)
import java.io.PrintWriter; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType;
public boolean addWatch(String path, Watcher watcher); | ||
|
||
/** | ||
* Checks the specified watcher exists for the given path |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: missing full stop at end of sentence.
public boolean containsWatcher(String path, Watcher watcher); | ||
|
||
/** | ||
* Removes the specified watcher for the given path |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: missing full stop at end of sentence.
|
||
/** | ||
* Distribute the watch event for the given path, but ignore those | ||
* supressed ones. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spell check: suppressed
instead ofsupressed
* @return the watchers have been notified | ||
*/ | ||
public WatcherOrBitSet triggerWatch( | ||
String path, EventType type, WatcherOrBitSet supress); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar spelling issue for supress
/** | ||
* Interface used to process the dead watchers related to closed cnxns. | ||
*/ | ||
public interface DeadWatcherListener { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be good to rename this to IDeadWatchListner
, which makes it obvious this is an interface. We already do this for IWatchManager
.
<ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> The new watcher | ||
manager WatchManagerOptimized will clean up the dead watchers lazily, this config is used to decide how | ||
many thread is used in the WatcherCleaner. More thread usually means larger clean up throughput. The | ||
default value is 2, which is good enough even for heavy and continuous session closing/receating cases.</para> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
closing/recreating
try { | ||
if (deadWatchers.size() < watcherCleanThreshold) { | ||
int maxWaitMs = (watcherCleanIntervalInSeconds + | ||
r.nextInt(watcherCleanIntervalInSeconds / 2 + 1)) * 1000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a particular reason of choosing this versus, say exponential backoff?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clean up dead watches on large watches ensemble is a heavy work, which might affect the performance, so add jitter to make sure we don't do the lazily clean up at the same time on all the servers in the ensemble.
deadWatchers.clear(); | ||
int total = snapshot.size(); | ||
LOG.info("Processing {} dead watchers", total); | ||
cleaners.schedule(new WorkRequest() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation issue on this line...
synchronized (this) { | ||
// Snapshot of the current dead watchers | ||
final Set<Integer> snapshot = new HashSet<Integer>(deadWatchers); | ||
deadWatchers.clear(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a particular reason to copy the deadWatchers
, clear it immediately and then do the work, instead of just operate on deadWatchers
directly and only clear it after the work is done? I assume the motivation was to free deadWatchers
earlier so we can pipeline the work: adding more dead watchers while the previous pipeline of cleaning was in progress, but it looks like the new dead watchers will block on totalDeadWatchers
, which will only be reset after previous dead watchers were cleaned up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clean the dead watchers need to go through all the current watches, which is pretty heavy and may take a second if there are millions of watches, that's why we're doing lazily batch clean up here, we don't want to block addDeadWatcher, which is called from the Cnxn.close while we're doing the clean work.
The totalDeadWatchers is used to avoid OOM when the watcher cleaner cannot catch up (we haven't seen this problem even with heavy reconnecting scenario), it is suggested to be set to something like 1000 * watcherCleanThreshold. The watcherCleanThreshold is used to control the batch size when doing the clean up, there is trade off between GC the dead watcher memory and the time complexity of cleaning up, so we cannot set this too large.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @hanm for the detailed review, I have made comments to address your concerns, let me know if there is anything unclear to you.
Meanwhile, I'll remove the unused imports and correct the typo as you suggested.
if (elementBit == null) { | ||
return false; | ||
} | ||
return elementBits.get(elementBit); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BitSet.get is O(1), check cache doesn't may actually more expensive.
HashSet is used to optimize the iterating, for example, if there is a single element in this BitHashSet, but the bit is very large, without HashSet we need to go through all the words before return that element, which is not efficient.
* iterate through this set. | ||
*/ | ||
@Override | ||
public Iterator<Integer> iterator() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's used in the triggerWatcher with for iterator.
try { | ||
if (deadWatchers.size() < watcherCleanThreshold) { | ||
int maxWaitMs = (watcherCleanIntervalInSeconds + | ||
r.nextInt(watcherCleanIntervalInSeconds / 2 + 1)) * 1000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clean up dead watches on large watches ensemble is a heavy work, which might affect the performance, so add jitter to make sure we don't do the lazily clean up at the same time on all the servers in the ensemble.
Integer bit = getBit(value); | ||
if (bit != null) { | ||
return bit; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This BitMap is used by WatchManagerOptimized.watcherBitIdMap, which is used to store watcher to bit mapping.
Add might be called a lot if the same client connection is watching on thousands of even millions of nodes, remove only called once when the session is closed, that's why we optimized to check read lock first in add, but use write lock directly in remove.
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.zookeeper.server.watch; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the beginning when we added this class, it was bound with Watcher, but not anymore after refactoring, we can move this to server.util, I'll do that.
*/ | ||
public class BitHashSet implements Iterable<Integer> { | ||
|
||
static final long serialVersionUID = 6382565447128283568L; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously, it was using inheritance instead of composition with HashSet, at that time we added this serialVersionUID, didn't remove this after changing to composition, will remove it.
synchronized (this) { | ||
// Snapshot of the current dead watchers | ||
final Set<Integer> snapshot = new HashSet<Integer>(deadWatchers); | ||
deadWatchers.clear(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clean the dead watchers need to go through all the current watches, which is pretty heavy and may take a second if there are millions of watches, that's why we're doing lazily batch clean up here, we don't want to block addDeadWatcher, which is called from the Cnxn.close while we're doing the clean work.
The totalDeadWatchers is used to avoid OOM when the watcher cleaner cannot catch up (we haven't seen this problem even with heavy reconnecting scenario), it is suggested to be set to something like 1000 * watcherCleanThreshold. The watcherCleanThreshold is used to control the batch size when doing the clean up, there is trade off between GC the dead watcher memory and the time complexity of cleaning up, so we cannot set this too large.
@@ -0,0 +1,12 @@ | |||
package org.apache.zookeeper; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will add it.
// this is will slow down the socket packet processing and | ||
// the adding watches in the ZK pipeline. | ||
while (maxInProcessingDeadWatchers > 0 && !stopped && | ||
totalDeadWatchers.get() >= maxInProcessingDeadWatchers) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be maxInProcessingDeadWatchers != -1 && totalDeadWatchers.get() >= maxInProcessingDeadWatchers
. Otherwise we'll always wait on totalDeadWatchers
if user use default configuration value of maxInProcessingDeadWatchers
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hanm I'm not sure I understand this correctly, the default value of maxInProcessingDeadWatchers is -1, in this case it will skip checking the totalDeadWatchers, am I missing anything?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops, did not see maxInProcessingDeadWatchers > 0
. we are good here.
import org.apache.zookeeper.server.ServerCnxn; | ||
import org.apache.zookeeper.server.util.BitMap; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
couple of unused imports here
* Optimized in memory and time complexity, compared to WatchManager, both the | ||
* memory consumption and time complexity improved a lot, but it cannot | ||
* efficiently remove the watcher when the session or socket is closed, for | ||
* majority usecase this is not a problem. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use case
return null; | ||
} | ||
|
||
int triggeredWatches = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can remove this - it's assigned later but never used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We had metrics for this, I removed those metrics because #580 was still in review, I'll add those metrics back since that patch has been merged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I knew it's the metrics. It's fine to leave this variable and we can add metrics in another patch, since this patch is already big enough and almost ready to land.
|
||
@Override | ||
public WatcherOrBitSet triggerWatch( | ||
String path, EventType type, WatcherOrBitSet supress) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit - suppress
@Override | ||
public boolean addWatch(String path, Watcher watcher) { | ||
boolean result = false; | ||
addRemovePathRWLock.readLock().lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the purpose of using a read lock here is to optimize for addWatch
heavy workloads? Would be good to add a comment here about why choose use a read lock instead of write lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's used to improve the read throughput, creating new watcher bit and adding it to the BitHashSet has it's own lock to minimize the lock scope. I'll add some comments here.
if (watchers == null) { | ||
watchers = new BitHashSet(); | ||
BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers); | ||
if (existingWatchers != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this check necessary, because we are using a read lock here so it's possible for another thread to modify the pathWatches
while we are here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's correct, reading requests are processed concurrently in CommitProcessor worker service, so it's possible multiple thread might add to pathWatches while we're holding read lock, that's why we need this check here.
|
||
@Override | ||
public boolean containsWatcher(String path, Watcher watcher) { | ||
BitHashSet watchers = pathWatches.get(path); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be good to add a comment here regarding why no synchronization is required here.
thanks @lvfangmin for detailed reply. |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @lvfangmin . Will commit today after running through jenkins for a few time.
got a "green" build (minors the known failed test |
merged to master. great work @lvfangmin ! |
Is it worth it to back port this to 3.4? @hanm @lvfangmin |
@vivekpatani sorry but 3.4 is end of life . |
…e watches scenario The current HashSet based WatcherManager will consume more than 40GB memory when creating 300M watches. This patch optimized the memory and time complexity for concentrate watches scenario, compared to WatchManager, both the memory consumption and time complexity improved a lot. I'll post more data later with micro benchmark result. Changed made compared to WatchManager: * Only keep path to watches map * Use BitSet to save the memory used to store watches * Use ConcurrentHashMap and ReadWriteLock instead of synchronized to reduce lock retention * Lazily clean up the closed watchers Author: Fangmin Lyu <allenlyu@fb.com> Reviewers: Andor Molnár <andor@apache.org>, Norbert Kalmar <nkalmar@yahoo.com>, Michael Han <hanm@apache.org> Closes apache#590 from lvfangmin/ZOOKEEPER-1177
The current HashSet based WatcherManager will consume more than 40GB memory when
creating 300M watches.
This patch optimized the memory and time complexity for concentrate watches scenario, compared to WatchManager, both the memory consumption and time complexity improved a lot. I'll post more data later with micro benchmark result.
Changed made compared to WatchManager: