Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.WatcherRemoveCuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.BackgroundPathable;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.Pathable;
import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.api.Watchable;
import org.apache.curator.framework.listen.Listenable;
import org.apache.curator.framework.listen.ListenerContainer;
import org.apache.curator.framework.state.ConnectionState;
Expand Down Expand Up @@ -73,6 +76,7 @@ public class TreeCache implements Closeable
{
private static final Logger LOG = LoggerFactory.getLogger(TreeCache.class);
private final boolean createParentNodes;
private final boolean disableZkWatches;
private final TreeCacheSelector selector;

public static final class Builder
Expand All @@ -84,6 +88,7 @@ public static final class Builder
private ExecutorService executorService = null;
private int maxDepth = Integer.MAX_VALUE;
private boolean createParentNodes = false;
private boolean disableZkWatches = false;
private TreeCacheSelector selector = new DefaultTreeCacheSelector();

private Builder(CuratorFramework client, String path)
Expand All @@ -102,7 +107,7 @@ public TreeCache build()
{
executor = Executors.newSingleThreadExecutor(defaultThreadFactory);
}
return new TreeCache(client, path, cacheData, dataIsCompressed, maxDepth, executor, createParentNodes, selector);
return new TreeCache(client, path, cacheData, dataIsCompressed, maxDepth, executor, createParentNodes, disableZkWatches, selector);
}

/**
Expand Down Expand Up @@ -165,6 +170,18 @@ public Builder setCreateParentNodes(boolean createParentNodes)
return this;
}

/**
* By default, TreeCache creates {@link org.apache.zookeeper.ZooKeeper} watches for every created path.
* Change this behavior with this method.
* @param disableZkWatches true to disable zk watches
* @return this for chaining
*/
public Builder disableZkWatches(boolean disableZkWatches)
{
this.disableZkWatches = disableZkWatches;
return this;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and e.g. disableZkWatches(boolean)


/**
* By default, {@link DefaultTreeCacheSelector} is used. Change the selector here.
*
Expand Down Expand Up @@ -253,7 +270,7 @@ private void doRefreshChildren() throws Exception
{
if ( treeState.get() == TreeState.STARTED )
{
client.getChildren().usingWatcher(this).inBackground(this).forPath(path);
maybeWatch(client.getChildren()).forPath(path);
}
}

Expand All @@ -263,15 +280,24 @@ private void doRefreshData() throws Exception
{
if ( dataIsCompressed )
{
client.getData().decompressed().usingWatcher(this).inBackground(this).forPath(path);
maybeWatch(client.getData().decompressed()).forPath(path);
}
else
{
client.getData().usingWatcher(this).inBackground(this).forPath(path);
maybeWatch(client.getData()).forPath(path);
}
}
}

private <T, P extends Watchable<BackgroundPathable<T>> & BackgroundPathable<T>> Pathable<T> maybeWatch(
P dataBuilder) {
if (disableZkWatches) {
return dataBuilder.inBackground(this);
} else {
return dataBuilder.usingWatcher(this).inBackground(this);
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        private void doRefreshChildren() throws Exception
        {
            if ( treeState.get() == TreeState.STARTED )
            {
                maybeWatch(client.getChildren()).forPath(path);
            }
        }

        private void doRefreshData() throws Exception
        {
            if ( treeState.get() == TreeState.STARTED )
            {
                if ( dataIsCompressed )
                {
                    maybeWatch(client.getData().decompressed()).forPath(path);
                }
                else
                {
                    maybeWatch(client.getData()).forPath(path);
                }
            }
        }

        private <T, P extends Watchable<BackgroundPathable<T>> & BackgroundPathable<T>> Pathable<T> maybeWatch(P dataBuilder)
        {
            if ( disableZkWatches )
            {
                return dataBuilder.inBackground(this);
            }
            else
            {
                return dataBuilder.usingWatcher(this).inBackground(this);
            }
        }

void wasReconnected() throws Exception
{
refresh();
Expand Down Expand Up @@ -321,7 +347,7 @@ void wasDeleted() throws Exception
if ( parent == null )
{
// Root node; use an exist query to watch for existence.
client.checkExists().usingWatcher(this).inBackground(this).forPath(path);
maybeWatch(client.checkExists()).forPath(path);
}
else
{
Expand Down Expand Up @@ -535,7 +561,7 @@ public void stateChanged(CuratorFramework client, ConnectionState newState)
*/
public TreeCache(CuratorFramework client, String path)
{
this(client, path, true, false, Integer.MAX_VALUE, Executors.newSingleThreadExecutor(defaultThreadFactory), false, new DefaultTreeCacheSelector());
this(client, path, true, false, Integer.MAX_VALUE, Executors.newSingleThreadExecutor(defaultThreadFactory), false, false, new DefaultTreeCacheSelector());
}

/**
Expand All @@ -545,9 +571,10 @@ public TreeCache(CuratorFramework client, String path)
* @param dataIsCompressed if true, data in the path is compressed
* @param executorService Closeable ExecutorService to use for the TreeCache's background thread
* @param createParentNodes true to create parent nodes as containers
* @param disableZkWatches true to disable Zookeeper watches
* @param selector the selector to use
*/
TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, int maxDepth, final ExecutorService executorService, boolean createParentNodes, TreeCacheSelector selector)
TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, int maxDepth, final ExecutorService executorService, boolean createParentNodes, boolean disableZkWatches, TreeCacheSelector selector)
{
this.createParentNodes = createParentNodes;
this.selector = Preconditions.checkNotNull(selector, "selector cannot be null");
Expand All @@ -557,6 +584,7 @@ public TreeCache(CuratorFramework client, String path)
this.cacheData = cacheData;
this.dataIsCompressed = dataIsCompressed;
this.maxDepth = maxDepth;
this.disableZkWatches = disableZkWatches;
this.executorService = Preconditions.checkNotNull(executorService, "executorService cannot be null");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,31 @@ public void testBasics() throws Exception
assertNoMoreEvents();
}

@Test
public void testBasicsWithNoZkWatches() throws Exception
{
client.create().forPath("/test");
client.create().forPath("/test/one", "hey there".getBytes());


cache = buildWithListeners(TreeCache.newBuilder(client, "/test").disableZkWatches(true));

cache.start();
assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test");
assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/one");

assertEvent(TreeCacheEvent.Type.INITIALIZED);
Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of("one"));
Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there");
Assert.assertEquals(cache.getCurrentChildren("/test/one").keySet(), ImmutableSet.of());
Assert.assertNull(cache.getCurrentChildren("/test/o"));
Assert.assertNull(cache.getCurrentChildren("/test/onely"));
Assert.assertNull(cache.getCurrentChildren("/t"));
Assert.assertNull(cache.getCurrentChildren("/testing"));

assertNoMoreEvents();
}

@Test
public void testBasicsOnTwoCaches() throws Exception
{
Expand Down