Skip to content
Permalink
Browse files
Replace OutstandingOps with JDK bundled Phaser (#365)
  • Loading branch information
tisonkun committed Oct 16, 2021
1 parent ed3cc84 commit 1038aa6d03f240fb8a8de9736adc51ccbcc9f5b6
Showing 2 changed files with 13 additions and 60 deletions.
@@ -37,6 +37,7 @@
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Stream;
@@ -58,7 +59,14 @@ class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge
private final boolean clearOnClose;
private final StandardListenerManager<CuratorCacheListener> listenerManager = StandardListenerManager.standard();
private final Consumer<Exception> exceptionHandler;
private final OutstandingOps outstandingOps = new OutstandingOps(() -> callListeners(CuratorCacheListener::initialized));

private final Phaser outstandingOps = new Phaser() {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
callListeners(CuratorCacheListener::initialized);
return true;
}
};

private enum State
{
@@ -210,10 +218,10 @@ else if ( event.getResultCode() == NONODE.intValue() )
{
handleException(event);
}
outstandingOps.decrement();
outstandingOps.arriveAndDeregister();
};

outstandingOps.increment();
outstandingOps.register();
client.getChildren().inBackground(callback).forPath(fromPath);
}
catch ( Exception e )
@@ -245,10 +253,10 @@ else if ( event.getResultCode() == NONODE.intValue() )
{
handleException(event);
}
outstandingOps.decrement();
outstandingOps.arriveAndDeregister();
};

outstandingOps.increment();
outstandingOps.register();
if ( compressedData )
{
client.getData().decompressed().inBackground(callback).forPath(fromPath);

This file was deleted.

0 comments on commit 1038aa6

Please sign in to comment.