Skip to content
Permalink
Browse files
CURATOR-574 DiscoveryService fatal error on deserializing an empty by…
…te[] as JSON
  • Loading branch information
eolivelli committed Jun 23, 2020
1 parent 095deaf commit edf9f40f81b9a806ba1123181df40b45faf80238
Showing 5 changed files with 41 additions and 6 deletions.
@@ -86,11 +86,12 @@
* behaves differently than {@link org.apache.curator.framework.recipes.cache.PathChildrenCache} so
* things such as event ordering will likely be different.
*
* @param rootPath the root path. The listener needs this information in order to bridge only events for children of this path and not the path itself
* @param client the curator client
* @param listener the listener to wrap
* @return a CuratorCacheListener that forwards to the given listener
*/
CuratorCacheListenerBuilder forPathChildrenCache(CuratorFramework client, PathChildrenCacheListener listener);
CuratorCacheListenerBuilder forPathChildrenCache(String rootPath, CuratorFramework client, PathChildrenCacheListener listener);

/**
* Bridge listener. You can reuse old-style {@link org.apache.curator.framework.recipes.cache.TreeCacheListener}s
@@ -106,9 +106,9 @@ public void initialized()
}

@Override
public CuratorCacheListenerBuilder forPathChildrenCache(CuratorFramework client, PathChildrenCacheListener listener)
public CuratorCacheListenerBuilder forPathChildrenCache(String rootPath, CuratorFramework client, PathChildrenCacheListener listener)
{
listeners.add(new PathChildrenCacheListenerWrapper(client, listener));
listeners.add(new PathChildrenCacheListenerWrapper(rootPath, client, listener));
return this;
}

@@ -19,15 +19,19 @@

package org.apache.curator.framework.recipes.cache;

import java.util.Objects;
import org.apache.curator.framework.CuratorFramework;

class PathChildrenCacheListenerWrapper implements CuratorCacheListener
{
private final PathChildrenCacheListener listener;
private final CuratorFramework client;
private final String rootPath;

PathChildrenCacheListenerWrapper(CuratorFramework client, PathChildrenCacheListener listener)
PathChildrenCacheListenerWrapper(String rootPath, CuratorFramework client, PathChildrenCacheListener listener)
{
Objects.requireNonNull(rootPath,"rootPath cannot be null");
this.rootPath = rootPath;
this.listener = listener;
this.client = client;
}
@@ -39,18 +43,27 @@ public void event(Type type, ChildData oldData, ChildData data)
{
case NODE_CREATED:
{
if (rootPath.equals(data.getPath())) {
return;
}
sendEvent(data, PathChildrenCacheEvent.Type.CHILD_ADDED);
break;
}

case NODE_CHANGED:
{
if (rootPath.equals(data.getPath())) {
return;
}
sendEvent(data, PathChildrenCacheEvent.Type.CHILD_UPDATED);
break;
}

case NODE_DELETED:
{
if (rootPath.equals(oldData.getPath())) {
return;
}
sendEvent(oldData, PathChildrenCacheEvent.Type.CHILD_REMOVED);
break;
}
@@ -39,6 +39,7 @@
import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.SINGLE_NODE_CACHE;
import static org.apache.curator.framework.recipes.cache.CuratorCacheAccessor.parentPathFilter;
import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.builder;
import java.util.concurrent.CopyOnWriteArrayList;

@Test(groups = CuratorTestBase.zk36Group)
public class TestCuratorCacheWrappers extends CuratorTestBase
@@ -51,16 +52,18 @@ public void testPathChildrenCache() throws Exception // copied from TestPathC
client.start();
client.create().forPath("/test");

final CopyOnWriteArrayList<PathChildrenCacheEvent> eventsTrace = new CopyOnWriteArrayList<>();
final BlockingQueue<PathChildrenCacheEvent.Type> events = new LinkedBlockingQueue<>();
try (CuratorCache cache = CuratorCache.build(client, "/test"))
{
PathChildrenCacheListener listener = (__, event) -> {
eventsTrace.add(event);
if ( event.getData().getPath().equals("/test/one") )
{
events.offer(event.getType());
}
};
cache.listenable().addListener(builder().forPathChildrenCache(client, listener).build());
cache.listenable().addListener(builder().forPathChildrenCache("/test", client, listener).build());
cache.start();

client.create().forPath("/test/one", "hey there".getBytes());
@@ -72,6 +75,24 @@ public void testPathChildrenCache() throws Exception // copied from TestPathC

client.delete().forPath("/test/one");
Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED);

// Please note that there is not guarantee on the order of events
// For instance INITIALIZED event can appear in the middle of the observed sequence.
for (PathChildrenCacheEvent event : eventsTrace) {
switch (event.getType()) {
case CHILD_ADDED:
case CHILD_REMOVED:
case CHILD_UPDATED:
Assert.assertEquals("/test/one", event.getData().getPath());
break;
case INITIALIZED:
Assert.assertNull(event.getData());
break;
default:
Assert.fail();
}
}
Assert.assertEquals(eventsTrace.size(), 4);
}
}
}
@@ -84,7 +84,7 @@ private static ExecutorService convertThreadFactory(ThreadFactory threadFactory)
.withDataNotCached()
.build();
CuratorCacheListener listener = CuratorCacheListener.builder()
.forPathChildrenCache(discovery.getClient(), this)
.forPathChildrenCache(path, discovery.getClient(), this)
.forInitialized(this::initialized)
.build();
cache.listenable().addListener(listener);

0 comments on commit edf9f40

Please sign in to comment.