Skip to content
Permalink
Browse files
CURATOR-549
Creates a simple bridge that, when using ZK 3.6.0 creates a CuratorCache, and for earlier versions creates a TreeCache. The curator-test-zk35 module ensures that both code paths are tested.
  • Loading branch information
randgalt authored and Randgalt committed Apr 9, 2020
1 parent 844c0ad commit b29bb010ada7f17626438b828de1f9e122e5d7bf
Showing 28 changed files with 595 additions and 195 deletions.
@@ -0,0 +1,144 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.curator.framework.recipes.cache;

import com.google.common.collect.Sets;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.listen.Listenable;
import org.apache.curator.framework.listen.StandardListenerManager;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.*;

/**
* Version of CuratorCacheBridge for pre-ZK 3.6 - uses TreeCache instead of CuratorCache
*/
@SuppressWarnings("deprecation")
class CompatibleCuratorCacheBridge implements CuratorCacheBridge, TreeCacheListener
{
private final TreeCache cache;
private final StandardListenerManager<CuratorCacheListener> listenerManager = StandardListenerManager.standard();

CompatibleCuratorCacheBridge(CuratorFramework client, String path, CuratorCache.Options[] optionsArg, ExecutorService executorService, boolean cacheData)
{
Set<CuratorCache.Options> options = (optionsArg != null) ? Sets.newHashSet(optionsArg) : Collections.emptySet();
TreeCache.Builder builder = TreeCache.newBuilder(client, path).setCacheData(cacheData);
if ( options.contains(CuratorCache.Options.SINGLE_NODE_CACHE) )
{
builder.setMaxDepth(0);
}
if ( options.contains(CuratorCache.Options.COMPRESSED_DATA) )
{
builder.setDataIsCompressed(true);
}
if ( executorService != null )
{
builder.setExecutor(executorService);
}
cache = builder.build();
}

@Override
public void start()
{
try
{
cache.getListenable().addListener(this);

cache.start();
}
catch ( Exception e )
{
throw new RuntimeException(e);
}
}

@Override
public void close()
{
cache.close();
}

@Override
public boolean isCuratorCache()
{
return false;
}

@Override
public Listenable<CuratorCacheListener> listenable()
{
return listenerManager;
}

@Override
public Optional<ChildData> get(String path)
{
return Optional.ofNullable(cache.getCurrentData(path));
}

@Override
public int size()
{
return cache.size();
}

@Override
public Stream<ChildData> stream()
{
Iterable<ChildData> iterable = cache::iterator;
return StreamSupport.stream(iterable.spliterator(), false);
}

@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception
{
switch ( event.getType() )
{
case NODE_ADDED:
{
listenerManager.forEach(listener -> listener.event(NODE_CREATED, null, event.getData()));
break;
}

case NODE_REMOVED:
{
listenerManager.forEach(listener -> listener.event(NODE_DELETED, event.getData(), null));
break;
}

case NODE_UPDATED:
{
listenerManager.forEach(listener -> listener.event(NODE_CHANGED, event.getOldData(), event.getData()));
break;
}

case INITIALIZED:
{
listenerManager.forEach(CuratorCacheListener::initialized);
break;
}
}
}
}
@@ -97,6 +97,21 @@ static CuratorCacheBuilder builder(CuratorFramework client, String path)
return new CuratorCacheBuilderImpl(client, path);
}

/**
* Start a Curator Cache Bridge builder. A Curator Cache Bridge is
* a facade that uses {@link org.apache.curator.framework.recipes.cache.CuratorCache} if
* persistent watches are available or {@link org.apache.curator.framework.recipes.cache.TreeCache}
* otherwise (i.e. if you are using ZooKeeper 3.5.x).
*
* @param client Curator client
* @param path path to cache
* @return bridge builder
*/
static CuratorCacheBridgeBuilder bridgeBuilder(CuratorFramework client, String path)
{
return new CuratorCacheBridgeBuilderImpl(client, path);
}

/**
* Start the cache. This will cause a complete refresh from the cache's root node and generate
* events for all nodes found, etc.
@@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.curator.framework.recipes.cache;

/**
* A facade that uses {@link org.apache.curator.framework.recipes.cache.CuratorCache} if
* persistent watches are available or a {@link org.apache.curator.framework.recipes.cache.TreeCache}
* otherwise
*/
@SuppressWarnings("deprecation")
public interface CuratorCacheBridge extends CuratorCache
{
/**
* Returns true if the underlying cache is {@link org.apache.curator.framework.recipes.cache.CuratorCache} (i.e. ZooKeeper 3.6+).
* Otherwise it is {@link org.apache.curator.framework.recipes.cache.TreeCache} (i.e. ZooKeeper 3.5.x)
*
* @return true/false
*/
boolean isCuratorCache();
}
@@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.curator.framework.recipes.cache;

import java.util.concurrent.ExecutorService;

public interface CuratorCacheBridgeBuilder
{
/**
* @param options any options
* @return this
*/
CuratorCacheBridgeBuilder withOptions(CuratorCache.Options... options);

/**
* The bridge cache will not retain the data bytes. i.e. ChildData objects
* returned by the cache will always return {@code null} for {@link ChildData#getData()}
*
* @return this
*/
CuratorCacheBridgeBuilder withDataNotCached();

/**
* If the old {@link org.apache.curator.framework.recipes.cache.TreeCache} is used by the bridge
* (i.e. you are using ZooKeeper 3.5.x) then this executor service is passed to {@link org.apache.curator.framework.recipes.cache.TreeCache.Builder#setExecutor(java.util.concurrent.ExecutorService)}.
* For {@link org.apache.curator.framework.recipes.cache.CuratorCache} this is not used and will be ignored (a warning will be logged).
*
* @param executorService executor to use for ZooKeeper 3.5.x
* @return this
*/
@SuppressWarnings("deprecation")
CuratorCacheBridgeBuilder withExecutorService(ExecutorService executorService);

/**
* Return a new Curator Cache Bridge based on the builder methods that have been called
*
* @return new Curator Cache Bridge
*/
CuratorCacheBridge build();
}
@@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.curator.framework.recipes.cache;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.Compatibility;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService;

class CuratorCacheBridgeBuilderImpl implements CuratorCacheBridgeBuilder
{
private final CuratorFramework client;
private final String path;
private CuratorCache.Options[] options;
private boolean cacheData = true;
private ExecutorService executorService = null;
private final boolean forceTreeCache = Boolean.getBoolean("curator-cache-bridge-force-tree-cache");

CuratorCacheBridgeBuilderImpl(CuratorFramework client, String path)
{
this.client = client;
this.path = path;
}

@Override
public CuratorCacheBridgeBuilder withOptions(CuratorCache.Options... options)
{
this.options = options;
return this;
}

@Override
public CuratorCacheBridgeBuilder withDataNotCached()
{
cacheData = false;
return this;
}

@Override
public CuratorCacheBridgeBuilder withExecutorService(ExecutorService executorService)
{
this.executorService = executorService;
return this;
}

@Override
public CuratorCacheBridge build()
{
if ( !forceTreeCache && Compatibility.hasPersistentWatchers() )
{
if ( executorService != null )
{
LoggerFactory.getLogger(getClass()).warn("CuratorCache does not support custom ExecutorService");
}
CuratorCacheStorage storage = cacheData ? CuratorCacheStorage.standard() : CuratorCacheStorage.dataNotCached();
return new CuratorCacheImpl(client, storage, path, options, null);
}
return new CompatibleCuratorCacheBridge(client, path, options, executorService, cacheData);
}
}
@@ -45,7 +45,7 @@
import static org.apache.zookeeper.KeeperException.Code.NONODE;
import static org.apache.zookeeper.KeeperException.Code.OK;

class CuratorCacheImpl implements CuratorCache
class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge
{
private final Logger log = LoggerFactory.getLogger(getClass());
private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
@@ -102,6 +102,12 @@ public void close()
}
}

@Override
public boolean isCuratorCache()
{
return true;
}

@Override
public Listenable<CuratorCacheListener> listenable()
{
@@ -42,7 +42,7 @@ static CuratorCacheStorage standard()
*
* @return storage instance that does not retain data bytes
*/
static CuratorCacheStorage bytesNotCached()
static CuratorCacheStorage dataNotCached()
{
return new StandardCuratorCacheStorage(false);
}

0 comments on commit b29bb01

Please sign in to comment.