-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CURATOR-549] Recipes based on Persistent Recursive Watchers #335
Conversation
931ec04
to
40478c5
Compare
4771b72
to
f1d6989
Compare
8718b8c
to
67072a7
Compare
40478c5
to
4eab363
Compare
67072a7
to
9eb0f6f
Compare
0892328
to
ed9b143
Compare
9eb0f6f
to
97f5d6e
Compare
97f5d6e
to
b950c0a
Compare
ed9b143
to
4b4f2e5
Compare
4b4f2e5
to
d2d3287
Compare
b950c0a
to
b42bef6
Compare
86b30bb
to
6c2a3da
Compare
b42bef6
to
4ddd26a
Compare
2135981
to
cc22df4
Compare
a77db4f
to
8974831
Compare
Thanks for your work! I'm reviewing now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Beautiful work! Only minor comments.
* {@inheritDoc} | ||
*/ | ||
@Override | ||
Stream<ChildData> streamImmediateChildren(String fromParent); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this method for compatibility of PathChildrenCache#getCurrentData
? If so, shall we mention it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not happy with this API. I'm going to change it. Please refer to the next commit.
* @param runSafeProc runSafe proc | ||
* @return this | ||
*/ | ||
CuratorCacheBuilder withListenerRunProc(Function<Runnable, CompletableFuture<Void>> runSafeProc); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we add a test or example for the use case? I don't see the motivation here in practice.
8974831
to
c10559c
Compare
@tisonkun 2 comments have been addressed. |
a8cf09b
to
2928665
Compare
class OutstandingOps | ||
{ | ||
private final Runnable completionProc; | ||
private volatile AtomicLong count = new AtomicLong(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What not final?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It gets set to null after completionProc
is called to signal decrement/increment are no longer necessary.
{ | ||
if ( (localCount.decrementAndGet() == 0) ) | ||
{ | ||
count = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure this trick works.
You can be here when another thread calls increment function.
You are not inside a lock
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Look at the next line if ( localCount.compareAndSet(0, Long.MIN_VALUE) )
- that ensures consistency (please review and confirm).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm - I think there's a race here - I'm going to address.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@eolivelli I reworked it - please re-check
7f8c525
to
3af2974
Compare
import java.util.concurrent.atomic.AtomicLong; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
|
||
class OutstandingOps |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An alternative is that java.util.concurrent.Phaser
which already implements semantic like OutstandingOps
.
Runnable r = () -> callListeners(CuratorCacheListener::initialized);
Phaser p = new Phaser() {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
r.run();
return true;
}
}
and replace increment
as register
while decrement
as arriveAndDeregister
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ooh - TIL - I didn't know about that. Thanks. Looking into it now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Phaser looks to be a kind of barrier. How would it be used in this context @tisonkun ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems OutstandingOps
is a one-shot barrier, so that as the code snippet above when we have new outstanding op we register a party and terminate the phaser at the first time it is advanced. Is it the semantic of OutstandingOps
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's the same idea as in TreeCache. When the cache is started each outstanding operation increments a counter and when the op completes it decrements the counter. It's not exact, but it provides a way to handle the "cache is initialized" feature. Once the outstanding ops counter gets back to 0 it's no longer used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that is the same as things in my mind. Since OutstandingOps
implemented correctly now, we can have a follow-up to see if it's worth we replace it with Java builtins and I will share a patch with description there. At most it is an "improvement" and should not block this PR to merge :)
For the "how", it seems to me that #335 (comment) is clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The patch is so huge.
I don't feel I can give a real +1
I like this work.
I would suggest to create little patches for the future. this way reviews can be narrower.
|
||
void decrement() | ||
{ | ||
if ( active && (count.decrementAndGet() == 0) ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Much better!
* </p> | ||
* | ||
* <p> | ||
* <b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment was never very clear to me, and if it is not only me, maybe it is good opportunity to rephrase it or give more details.
By "false positive" here do you mean that we might get events that do not reflect anymore the latest state of the zknode?
These events are still true, just a bit late. Soon there should follow the subsequent events until latest state is reflected. This behavior is general in ZK clients, not just the cache.
By "false negative" do you mean that we can miss events because ZK only sends the latest event (e.g. while cache might have been disconnected for a while, missing some zknode events)? Again, I feel like it is true for any ZK client, not just the cache. Though in Persistent Watchers I guess we potentially miss less momentarily events, because the Watcher is always there.
@Override | ||
public void close() | ||
{ | ||
if ( state.compareAndSet(State.STARTED, State.CLOSED) ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor corner, but calling close()
on cache that wasn't started is a no-op. Intuitively to me it should remain CLOSED
in this case. If you agree, you could instead use:
if (state.getAndSet(CLOSED) == STARTED) ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This pattern is used throughout Curator. It's actually an error to call close()
without first calling start()
. Maybe it should be an exception. We can deal with all instances of this pattern in a separate issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If that's the pattern everyone already used to, then I think it is better to leave it without change. It is minor consideration anyways, and always can be overridden with a boolean paired to the cache from the outside.
} | ||
|
||
// rebuild from the root first | ||
nodeChanged(path); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This call is going to also get all data of children recursively, and update the storage accordingly, so why do we need to pass the storage again as below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider the case where the cache is in sync and then there is a network partition for a period of time. When the connection is restored, every node in the cache has to be re-considered as the new state of the server is not known. You can't just rebuild the cache from zero either as the client still needs to get events for changes. Neither can we just rebuild from the root node as checkChildrenChanged() stops if the cVersion is the same. E.g.
Pre network partition:
root
Node A1
Node B1
Node C1
Node A2
Post network partition:
root
Node A1
Node B1
Node C2 --- C1 deleted, C2 is new
Node A2
Rebuilding only from the Root would not detect that C1 was deleted and C2 was added because the cVersion of A1 would be the same.
import static org.apache.zookeeper.KeeperException.Code.NONODE; | ||
import static org.apache.zookeeper.KeeperException.Code.OK; | ||
|
||
class CuratorCacheImpl implements CuratorCache |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This cache does not allow to maintain only the immediate children? I noticed parentPathFilter
, but shouldn't we allow that for efficiancy? (for cases of many grandchildren and so deeper that their updates are not interesting in this cache context.) Maybe not useful enough to bother, just asking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If someone really needs to emulate PathChildrenCache it could be done with a custom implementation of CuratorCacheStorage
. I thought it added too much weight to have APIs that only cache 1 level.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also - the new CuratorCache uses a Persistent watcher internally (which I added to ZK for 3.6). Those watchers only have 2 modes - single node watch or entire tree watch. So, the new CuratorCache matches this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it is kind of limitation of the Persistent Watcher itself - not be able to be Persistent only for immediate children of the node. Potentially for client that needs only immediate children, it can make both ZK server and client work harder for potentially many/large not-interesting sub-trees events. But maybe in real life it is not useful enough.
outstandingOps.increment(); | ||
if ( compressedData ) | ||
{ | ||
client.getData().decompressed().inBackground(callback).forPath(fromPath); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
General question about the inBackground callbacks - they are all going to be serialized on the single ZK event thread?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, background callbacks are called from the single ZK event thread. Is there a concern?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No concern, just wanted to make sure. addListener(T listener, Executor executor)
can be used to offload heavy listeners to another single threaded Executor
.
|
||
private void handleException(Exception e) | ||
{ | ||
ThreadUtils.checkInterrupted(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case of InterruptedException
you both reset the interrupt on the thread and propagate the exception to user. Shouldn't you do only one of them, but not both?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's important to reset the interrupted flag. I don't think we should count on the end user doing that. Curator has had this behavior for a long time. Maybe we can re-visit it in another issue.
This is already a broken up patch. There was a PR that preceded it. There will be another PR after this. So, this is as small as I could get this portion. Though, I guess I could have done just the PersistentWatcher class and then this. |
3af2974
to
f0f6e9b
Compare
@shayshim I reworked the doc for |
* @param listener listener to add | ||
* @return this | ||
*/ | ||
CuratorCacheListenerBuilder forCreatesAndChanges(ChangeListener listener); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of having these multiple combinations (and the user will still need to check type for each event e.g. type == NODE_CHANGED
) you could introduce interface that allows the user implement only the methods that are interesting for him/her, and have the burden of type -> action mapping specified in one location for all users:
public interface CacheListener {
void onCacheInitialized();
void onNodeCreate(ChildData data);
void onNodeChanged(ChildData oldData, ChildData data);
void onNodeDeleted(ChildData data);
}
(you could introduce empty implementation for this interface for convenience).
It is approach that doesn't embrace lambdas, but in this case of multiple event types it might be more comfortable for users.
Note that you could do CuratorCacheListener
-> this listener mapping as implementation of CuratorCacheListener
(user gives you impl of this interface and you build around it CuratorCacheListener
wrapper that does the mapping).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could have both actually. We can add a CacheListenerAdaptor
that looks like what you have. However, since Java 8, haven't most devs moved to the neo-functional style? The benefit of how I did the listener builder is that you don't have to use cumbersome anonymous classes. i.e. you can do:
CuratorCacheListener.builder()
.forDeletes(node -> handleDelete(node))
.forInitialized(() -> handleInitialized())
... etc ...
This is superior to anonymous inner classes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The adaptor would look like this. Should I add this? Is it gilding the Lilly? https://gist.github.com/Randgalt/cbe38cb3cdf1825f73dd165d997a2248
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I take it back. Didn't understand fully your builder until now. We Don't need this interface and adapter.
Adds several recipes that use the new ZOOKEEPER-1416 Persistent Recursive watches from ZooKeeper 3.6.0. PersistentWatcher - A wrapper recipe that keeps a persistent (single or recursive) watch set and active through disconnections, etc. CuratorCache - Completely re-written cache recipe that will replace TreeCache, NodeCache and PathChildrenCache. With the benefit of persistent recursive watchers, the implementation is far simpler, will use significantly less resources and network calls, be easier to support and should be more stable and performant. Wrappers for the older cache's listeners are provided to help with transitions.
f0f6e9b
to
ba859bd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work!
Adds several recipes that use the new ZOOKEEPER-1416 Persistent Recursive watches from ZooKeeper 3.6.0. PersistentWatcher - A wrapper recipe that keeps a persistent (single or recursive) watch set and active through disconnections, etc. CuratorCache - Completely re-written cache recipe that will replace TreeCache, NodeCache and PathChildrenCache. With the benefit of persistent recursive watchers, the implementation is far simpler, will use significantly less resources and network calls, be easier to support and should be more stable and performant. Wrappers for the older cache's listeners are provided to help with transitions. Co-authored-by: randgalt <randgalt@apache.org>
See: apache/zookeeper#1106
This PR adds several recipes that use the new ZOOKEEPER-1416 Persistent Recursive watches from ZooKeeper 3.6.0.
PersistentWatcher
A wrapper recipe that keeps a persistent (single or recursive) watch set and active through disconnections, etc.
See: PersistentWatcher.java
CuratorCache
Completely re-written cache recipe that will replace
TreeCache
,NodeCache
andPathChildrenCache
. With the benefit of persistent recursive watchers and the new PersistentWatcher recipe, the implementation is far simpler, will use significantly less resources and network calls, be easier to support and should be more stable and performant.Wrappers for the older cache's listeners are provided to help with transitions.
See: