Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Apache compatible version of the ZKClient bridge

  • Loading branch information...
commit 0fb7e352538e93a96f1c5e72f232fa65d839d84b 1 parent 45216a1
@Randgalt Randgalt authored
Showing with 1,898 additions and 1 deletion.
  1. +21 −0 build.gradle
  2. +257 −0 curator-x-apache-zkclient-bridge/src/main/java/com/netflix/curator/x/zkclientbridge/CuratorZKClientBridge.java
  3. +385 −0 curator-x-apache-zkclient-bridge/src/test/java/org/I0Itec/zkclient/AbstractBaseZkClientTest.java
  4. +56 −0 curator-x-apache-zkclient-bridge/src/test/java/org/I0Itec/zkclient/AbstractConnectionTest.java
  5. +160 −0 curator-x-apache-zkclient-bridge/src/test/java/org/I0Itec/zkclient/ContentWatcherTest.java
  6. +37 −0 curator-x-apache-zkclient-bridge/src/test/java/org/I0Itec/zkclient/DeferredGatewayStarter.java
  7. +131 −0 curator-x-apache-zkclient-bridge/src/test/java/org/I0Itec/zkclient/DistributedQueueTest.java
  8. +25 −0 curator-x-apache-zkclient-bridge/src/test/java/org/I0Itec/zkclient/InMemoryConnectionTest.java
  9. +49 −0 curator-x-apache-zkclient-bridge/src/test/java/org/I0Itec/zkclient/MemoryZkClientTest.java
  10. +292 −0 curator-x-apache-zkclient-bridge/src/test/java/org/I0Itec/zkclient/ServerZkClientTest.java
  11. +95 −0 curator-x-apache-zkclient-bridge/src/test/java/org/I0Itec/zkclient/TestUtil.java
  12. +54 −0 curator-x-apache-zkclient-bridge/src/test/java/org/I0Itec/zkclient/ZkClientSerializationTest.java
  13. +36 −0 curator-x-apache-zkclient-bridge/src/test/java/org/I0Itec/zkclient/ZkConnectionTest.java
  14. +80 −0 curator-x-apache-zkclient-bridge/src/test/java/org/I0Itec/zkclient/testutil/ZkPathUtil.java
  15. +147 −0 curator-x-apache-zkclient-bridge/src/test/java/org/I0Itec/zkclient/testutil/ZkTestSystem.java
  16. +72 −0 curator-x-apache-zkclient-bridge/src/test/java/org/I0Itec/zkclient/util/ZkPathUtilTest.java
  17. +1 −1  settings.gradle
View
21 build.gradle
@@ -61,3 +61,24 @@ project(':curator-x-zkclient-bridge')
}
}
+project(':curator-x-apache-zkclient-bridge')
+{
+ dependencies
+ {
+ compile 'org.apache.curator:curator-client:2.0.0-incubating'
+ compile 'org.apache.curator:curator-framework:2.0.0-incubating'
+ compile ('com.github.sgroschupf:zkclient:0.1')
+ {
+ exclude group: 'com.sun.jmx', module: 'jmxri'
+ exclude group: 'com.sun.jdmk', module: 'jmxtools'
+ exclude group: 'javax.jms', module: 'jms'
+ }
+ testCompile 'org.apache.curator:curator-test:2.0.0-incubating'
+ testCompile 'org.slf4j:slf4j-api:1.6.4'
+ testCompile 'org.javassist:javassist:3.15.0-GA'
+ testCompile 'commons-io:commons-io:1.4'
+ testCompile 'org.mockito:mockito-core:1.8.0'
+ testCompile 'junit:junit:4.7'
+ }
+}
+
View
257 ...r-x-apache-zkclient-bridge/src/main/java/com/netflix/curator/x/zkclientbridge/CuratorZKClientBridge.java
@@ -0,0 +1,257 @@
+/*
+ * Copyright 2012 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.netflix.curator.x.zkclientbridge;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorListener;
+import org.I0Itec.zkclient.IZkConnection;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * <p>
+ * Bridge between ZKClient and Curator. Accomplished via an implementation for
+ * {@link IZkConnection} which is the abstraction ZKClient uses to wrap the raw ZooKeeper handle
+ * </p>
+ *
+ * <p>
+ * Once allocated, bridge to ZKClient via:
+ * <code><pre>
+ * ZKClient zkClient = new ZkClient(new CuratorZKClientBridge(curatorInstance, timeout));
+ * </pre></code>
+ * </p>
+ */
+public class CuratorZKClientBridge implements IZkConnection
+{
+ private final CuratorFramework curator;
+ private final AtomicReference<CuratorListener> listener = new AtomicReference<CuratorListener>(null);
+
+ /**
+ * @param curator Curator instance to bridge
+ */
+ public CuratorZKClientBridge(CuratorFramework curator)
+ {
+ this.curator = curator;
+ }
+
+ /**
+ * Return the client
+ *
+ * @return client
+ */
+ public CuratorFramework getCurator()
+ {
+ return curator;
+ }
+
+ @Override
+ public void connect(final Watcher watcher)
+ {
+ if ( watcher != null )
+ {
+ CuratorListener localListener = new CuratorListener()
+ {
+ @Override
+ public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception
+ {
+ if ( event.getWatchedEvent() != null )
+ {
+ watcher.process(event.getWatchedEvent());
+ }
+ }
+ };
+ curator.getCuratorListenable().addListener(localListener);
+ listener.set(localListener);
+
+ try
+ {
+ BackgroundCallback callback = new BackgroundCallback()
+ {
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+ {
+ WatchedEvent fakeEvent = new WatchedEvent(Watcher.Event.EventType.None, curator.getZookeeperClient().isConnected() ? Watcher.Event.KeeperState.SyncConnected : Watcher.Event.KeeperState.Disconnected, null);
+ watcher.process(fakeEvent);
+ }
+ };
+ curator.checkExists().inBackground(callback).forPath("/foo");
+ }
+ catch ( Exception e )
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws InterruptedException
+ {
+ // NOTE: the curator instance is NOT closed here
+
+ CuratorListener localListener = listener.getAndSet(null);
+ if ( localListener != null )
+ {
+ curator.getCuratorListenable().removeListener(localListener);
+ }
+ }
+
+ @Override
+ public String create(String path, byte[] data, CreateMode mode) throws KeeperException, InterruptedException
+ {
+ try
+ {
+ return curator.create().withMode(mode).forPath(path, data);
+ }
+ catch ( Exception e )
+ {
+ adjustException(e);
+ }
+ return null; // will never execute
+ }
+
+ @Override
+ public void delete(String path) throws InterruptedException, KeeperException
+ {
+ try
+ {
+ curator.delete().forPath(path);
+ }
+ catch ( Exception e )
+ {
+ adjustException(e);
+ }
+ }
+
+ @Override
+ public boolean exists(String path, boolean watch) throws KeeperException, InterruptedException
+ {
+ try
+ {
+ return watch ? (curator.checkExists().watched().forPath(path) != null) : (curator.checkExists().forPath(path) != null);
+ }
+ catch ( Exception e )
+ {
+ adjustException(e);
+ }
+ return false; // will never execute
+ }
+
+ @Override
+ public List<String> getChildren(String path, boolean watch) throws KeeperException, InterruptedException
+ {
+ try
+ {
+ return watch ? curator.getChildren().watched().forPath(path) : curator.getChildren().forPath(path);
+ }
+ catch ( Exception e )
+ {
+ adjustException(e);
+ }
+ return null; // will never execute
+ }
+
+ @Override
+ public byte[] readData(String path, Stat stat, boolean watch) throws KeeperException, InterruptedException
+ {
+ try
+ {
+ if ( stat != null )
+ {
+ return watch ? curator.getData().storingStatIn(stat).watched().forPath(path) : curator.getData().storingStatIn(stat).forPath(path);
+ }
+ else
+ {
+ return watch ? curator.getData().watched().forPath(path) : curator.getData().forPath(path);
+ }
+ }
+ catch ( Exception e )
+ {
+ adjustException(e);
+ }
+ return null; // will never execute
+ }
+
+ @Override
+ public void writeData(String path, byte[] data, int expectedVersion) throws KeeperException, InterruptedException
+ {
+ try
+ {
+ curator.setData().withVersion(expectedVersion).forPath(path, data);
+ }
+ catch ( Exception e )
+ {
+ adjustException(e);
+ }
+ }
+
+ @Override
+ public ZooKeeper.States getZookeeperState()
+ {
+ try
+ {
+ return curator.getZookeeperClient().getZooKeeper().getState();
+ }
+ catch ( Exception e )
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public long getCreateTime(String path) throws KeeperException, InterruptedException
+ {
+ try
+ {
+ Stat stat = curator.checkExists().forPath(path);
+ return (stat != null) ? stat.getCtime() : 0;
+ }
+ catch ( Exception e )
+ {
+ adjustException(e);
+ }
+ return 0;
+ }
+
+ @Override
+ public String getServers()
+ {
+ return curator.getZookeeperClient().getCurrentConnectionString();
+ }
+
+ private void adjustException(Exception e) throws KeeperException, InterruptedException
+ {
+ if ( e instanceof KeeperException )
+ {
+ throw (KeeperException)e;
+ }
+
+ if ( e instanceof InterruptedException )
+ {
+ throw (InterruptedException)e;
+ }
+
+ throw new RuntimeException(e);
+ }
+}
View
385 curator-x-apache-zkclient-bridge/src/test/java/org/I0Itec/zkclient/AbstractBaseZkClientTest.java
@@ -0,0 +1,385 @@
+/**
+ * Copyright 2010 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.I0Itec.zkclient;
+
+import org.I0Itec.zkclient.testutil.ZkTestSystem;
+import org.apache.curator.test.TestingServer;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+public abstract class AbstractBaseZkClientTest {
+
+ protected static final Logger LOG = Logger.getLogger(AbstractBaseZkClientTest.class);
+ protected TestingServer _zkServer;
+ protected ZkClient _client;
+
+ @Before
+ public void setUp() throws Exception {
+ LOG.info("------------ BEFORE -------------");
+
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ LOG.info("------------ AFTER -------------");
+ }
+
+ @Test(expected = RuntimeException.class, timeout = 15000)
+ public void testUnableToConnect() throws Exception {
+ LOG.info("--- testUnableToConnect");
+ // we are using port 4711 to avoid conflicts with the zk server that is
+ // started by the Spring context
+ ZkTestSystem.createZkClient("localhost:4712");
+ }
+
+ @Test
+ public void testWriteAndRead() throws Exception {
+ LOG.info("--- testWriteAndRead");
+ String data = "something";
+ String path = "/a";
+ _client.createPersistent(path, data);
+ String data2 = _client.readData(path);
+ Assert.assertEquals(data, data2);
+ _client.delete(path);
+ }
+
+ @Test
+ public void testDelete() throws Exception {
+ LOG.info("--- testDelete");
+ String path = "/a";
+ assertFalse(_client.delete(path));
+ _client.createPersistent(path, null);
+ assertTrue(_client.delete(path));
+ assertFalse(_client.delete(path));
+ }
+
+ @Test
+ public void testDeleteRecursive() throws Exception {
+ LOG.info("--- testDeleteRecursive");
+
+ // should be able to call this on a not existing directory
+ _client.deleteRecursive("/doesNotExist");
+ }
+
+ @Test
+ public void testWaitUntilExists() {
+ LOG.info("--- testWaitUntilExists");
+ // create /gaga node asynchronously
+ new Thread() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(100);
+ _client.createPersistent("/gaga");
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ }.start();
+
+ // wait until this was created
+ assertTrue(_client.waitUntilExists("/gaga", TimeUnit.SECONDS, 5));
+ assertTrue(_client.exists("/gaga"));
+
+ // waiting for /neverCreated should timeout
+ assertFalse(_client.waitUntilExists("/neverCreated", TimeUnit.MILLISECONDS, 100));
+ }
+
+ @Test
+ public void testDataChanges1() throws Exception {
+ LOG.info("--- testDataChanges1");
+ String path = "/a";
+ final Holder<String> holder = new Holder<String>();
+
+ IZkDataListener listener = new IZkDataListener() {
+
+ @Override
+ public void handleDataChange(String dataPath, Object data) throws Exception {
+ holder.set((String) data);
+ }
+
+ @Override
+ public void handleDataDeleted(String dataPath) throws Exception {
+ holder.set(null);
+ }
+ };
+ _client.subscribeDataChanges(path, listener);
+ _client.createPersistent(path, "aaa");
+
+ // wait some time to make sure the event was triggered
+ String contentFromHolder = TestUtil.waitUntil("b", new Callable<String>() {
+
+ @Override
+ public String call() throws Exception {
+ return holder.get();
+ }
+ }, TimeUnit.SECONDS, 5);
+
+ assertEquals("aaa", contentFromHolder);
+ }
+
+ @Test
+ public void testDataChanges2() throws Exception {
+ LOG.info("--- testDataChanges2");
+ String path = "/a";
+ final AtomicInteger countChanged = new AtomicInteger(0);
+ final AtomicInteger countDeleted = new AtomicInteger(0);
+
+ IZkDataListener listener = new IZkDataListener() {
+
+ @Override
+ public void handleDataChange(String dataPath, Object data) throws Exception {
+ countChanged.incrementAndGet();
+ }
+
+ @Override
+ public void handleDataDeleted(String dataPath) throws Exception {
+ countDeleted.incrementAndGet();
+ }
+ };
+ _client.subscribeDataChanges(path, listener);
+
+ // create node
+ _client.createPersistent(path, "aaa");
+
+ // wait some time to make sure the event was triggered
+ TestUtil.waitUntil(1, new Callable<Integer>() {
+
+ @Override
+ public Integer call() throws Exception {
+ return countChanged.get();
+ }
+ }, TimeUnit.SECONDS, 5);
+ assertEquals(1, countChanged.get());
+ assertEquals(0, countDeleted.get());
+
+ countChanged.set(0);
+ countDeleted.set(0);
+ // delete node, this should trigger a delete event
+ _client.delete(path);
+ // wait some time to make sure the event was triggered
+ TestUtil.waitUntil(1, new Callable<Integer>() {
+
+ @Override
+ public Integer call() throws Exception {
+ return countDeleted.get();
+ }
+ }, TimeUnit.SECONDS, 5);
+ assertEquals(0, countChanged.get());
+ assertEquals(1, countDeleted.get());
+
+ // test if watch was reinstalled after the file got deleted
+ countChanged.set(0);
+ _client.createPersistent(path, "aaa");
+
+ // wait some time to make sure the event was triggered
+ TestUtil.waitUntil(1, new Callable<Integer>() {
+
+ @Override
+ public Integer call() throws Exception {
+ return countChanged.get();
+ }
+ }, TimeUnit.SECONDS, 5);
+ assertEquals(1, countChanged.get());
+
+ // test if changing the contents notifies the listener
+ _client.writeData(path, "bbb");
+
+ // wait some time to make sure the event was triggered
+ TestUtil.waitUntil(2, new Callable<Integer>() {
+
+ @Override
+ public Integer call() throws Exception {
+ return countChanged.get();
+ }
+ }, TimeUnit.SECONDS, 5);
+ assertEquals(2, countChanged.get());
+ }
+
+ @Test(timeout = 15000)
+ public void testHandleChildChanges() throws Exception {
+ LOG.info("--- testHandleChildChanges");
+ String path = "/a";
+ final AtomicInteger count = new AtomicInteger(0);
+ final Holder<List<String>> children = new Holder<List<String>>();
+
+ IZkChildListener listener = new IZkChildListener() {
+
+ @Override
+ public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
+ count.incrementAndGet();
+ children.set(currentChilds);
+ }
+ };
+ _client.subscribeChildChanges(path, listener);
+
+ // ----
+ // Create the root node should throw the first child change event
+ // ----
+ _client.createPersistent(path);
+
+ // wait some time to make sure the event was triggered
+ TestUtil.waitUntil(1, new Callable<Integer>() {
+
+ @Override
+ public Integer call() throws Exception {
+ return count.get();
+ }
+ }, TimeUnit.SECONDS, 5);
+ assertEquals(1, count.get());
+ assertEquals(0, children.get().size());
+
+ // ----
+ // Creating a child node should throw another event
+ // ----
+ count.set(0);
+ _client.createPersistent(path + "/child1");
+
+ // wait some time to make sure the event was triggered
+ TestUtil.waitUntil(1, new Callable<Integer>() {
+
+ @Override
+ public Integer call() throws Exception {
+ return count.get();
+ }
+ }, TimeUnit.SECONDS, 5);
+ assertEquals(1, count.get());
+ assertEquals(1, children.get().size());
+ assertEquals("child1", children.get().get(0));
+
+ // ----
+ // Creating another child and deleting the node should also throw an event
+ // ----
+ count.set(0);
+ _client.createPersistent(path + "/child2");
+ _client.deleteRecursive(path);
+
+ // wait some time to make sure the event was triggered
+ Boolean eventReceived = TestUtil.waitUntil(true, new Callable<Boolean>() {
+
+ @Override
+ public Boolean call() throws Exception {
+ return count.get() > 0 && children.get() == null;
+ }
+ }, TimeUnit.SECONDS, 5);
+ assertTrue(eventReceived);
+ assertNull(children.get());
+
+ // ----
+ // Creating root again should throw an event
+ // ----
+ count.set(0);
+ _client.createPersistent(path);
+
+ // wait some time to make sure the event was triggered
+ eventReceived = TestUtil.waitUntil(true, new Callable<Boolean>() {
+
+ @Override
+ public Boolean call() throws Exception {
+ return count.get() > 0 && children.get() != null;
+ }
+ }, TimeUnit.SECONDS, 5);
+ assertTrue(eventReceived);
+ assertEquals(0, children.get().size());
+
+ // ----
+ // Creating child now should throw an event
+ // ----
+ count.set(0);
+ _client.createPersistent(path + "/child");
+
+ // wait some time to make sure the event was triggered
+ eventReceived = TestUtil.waitUntil(true, new Callable<Boolean>() {
+
+ @Override
+ public Boolean call() throws Exception {
+ return count.get() > 0;
+ }
+ }, TimeUnit.SECONDS, 5);
+ assertTrue(eventReceived);
+ assertEquals(1, children.get().size());
+ assertEquals("child", children.get().get(0));
+
+ // ----
+ // Deleting root node should throw an event
+ // ----
+ count.set(0);
+ _client.deleteRecursive(path);
+
+ // wait some time to make sure the event was triggered
+ eventReceived = TestUtil.waitUntil(true, new Callable<Boolean>() {
+
+ @Override
+ public Boolean call() throws Exception {
+ return count.get() > 0 && children.get() == null;
+ }
+ }, TimeUnit.SECONDS, 5);
+ assertTrue(eventReceived);
+ assertNull(children.get());
+ }
+
+ @Test
+ public void testGetCreationTime() throws Exception {
+ long start = System.currentTimeMillis();
+ Thread.sleep(100);
+ String path = "/a";
+ _client.createPersistent(path);
+ Thread.sleep(100);
+ long end = System.currentTimeMillis();
+ long creationTime = _client.getCreationTime(path);
+ assertTrue(start < creationTime && end > creationTime);
+ }
+
+ @Test
+ public void testNumberOfListeners() {
+ IZkChildListener zkChildListener = mock(IZkChildListener.class);
+ _client.subscribeChildChanges("/", zkChildListener);
+ assertEquals(1, _client.numberOfListeners());
+
+ IZkDataListener zkDataListener = mock(IZkDataListener.class);
+ _client.subscribeDataChanges("/a", zkDataListener);
+ assertEquals(2, _client.numberOfListeners());
+
+ _client.subscribeDataChanges("/b", zkDataListener);
+ assertEquals(3, _client.numberOfListeners());
+
+ IZkStateListener zkStateListener = mock(IZkStateListener.class);
+ _client.subscribeStateChanges(zkStateListener);
+ assertEquals(4, _client.numberOfListeners());
+
+ _client.unsubscribeChildChanges("/", zkChildListener);
+ assertEquals(3, _client.numberOfListeners());
+
+ _client.unsubscribeDataChanges("/b", zkDataListener);
+ assertEquals(2, _client.numberOfListeners());
+
+ _client.unsubscribeDataChanges("/a", zkDataListener);
+ assertEquals(1, _client.numberOfListeners());
+
+ _client.unsubscribeStateChanges(zkStateListener);
+ assertEquals(0, _client.numberOfListeners());
+ }
+}
View
56 curator-x-apache-zkclient-bridge/src/test/java/org/I0Itec/zkclient/AbstractConnectionTest.java
@@ -0,0 +1,56 @@
+/**
+ * Copyright 2010 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.I0Itec.zkclient;
+
+import java.util.List;
+
+import org.I0Itec.zkclient.testutil.ZkPathUtil;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public abstract class AbstractConnectionTest {
+
+ private final IZkConnection _connection;
+
+ public AbstractConnectionTest(IZkConnection connection) {
+ _connection = connection;
+ }
+
+ @Test
+ public void testGetChildren_OnEmptyFileSystem() throws KeeperException, InterruptedException {
+ InMemoryConnection connection = new InMemoryConnection();
+ List<String> children = connection.getChildren("/", false);
+ assertEquals(0, children.size());
+ }
+
+ @Test
+ @Ignore("I don't understand this test -JZ")
+ public void testSequentials() throws KeeperException, InterruptedException {
+ String sequentialPath = _connection.create("/a", new byte[0], CreateMode.EPHEMERAL_SEQUENTIAL);
+ int firstSequential = Integer.parseInt(sequentialPath.substring(2));
+ assertEquals("/a" + ZkPathUtil.leadingZeros(firstSequential++, 10), sequentialPath);
+ assertEquals("/a" + ZkPathUtil.leadingZeros(firstSequential++, 10), _connection.create("/a", new byte[0], CreateMode.EPHEMERAL_SEQUENTIAL));
+ assertEquals("/a" + ZkPathUtil.leadingZeros(firstSequential++, 10), _connection.create("/a", new byte[0], CreateMode.PERSISTENT_SEQUENTIAL));
+ assertEquals("/b" + ZkPathUtil.leadingZeros(firstSequential++, 10), _connection.create("/b", new byte[0], CreateMode.EPHEMERAL_SEQUENTIAL));
+ assertEquals("/b" + ZkPathUtil.leadingZeros(firstSequential++, 10), _connection.create("/b", new byte[0], CreateMode.PERSISTENT_SEQUENTIAL));
+ assertEquals("/a" + ZkPathUtil.leadingZeros(firstSequential++, 10), _connection.create("/a", new byte[0], CreateMode.EPHEMERAL_SEQUENTIAL));
+ }
+
+}
View
160 curator-x-apache-zkclient-bridge/src/test/java/org/I0Itec/zkclient/ContentWatcherTest.java
@@ -0,0 +1,160 @@
+/**
+ * Copyright 2010 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.I0Itec.zkclient;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.curator.test.TestingServer;
+import org.I0Itec.zkclient.testutil.ZkTestSystem;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ContentWatcherTest {
+
+ private static final Logger LOG = Logger.getLogger(ContentWatcherTest.class);
+
+ private static final String FILE_NAME = "/ContentWatcherTest";
+ private TestingServer _zkServer;
+ private ZkClient _zkClient;
+
+ @Before
+ public void setUp() throws Exception {
+ LOG.info("------------ BEFORE -------------");
+ _zkServer = new TestingServer(4711);
+ _zkClient = ZkTestSystem.createZkClient(_zkServer.getConnectString());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (_zkClient != null) {
+ _zkClient.close();
+ }
+ if (_zkServer != null) {
+ _zkServer.close();
+ }
+ LOG.info("------------ AFTER -------------");
+ }
+
+ @Test
+ public void testGetContent() throws Exception {
+ LOG.info("--- testGetContent");
+ _zkClient.createPersistent(FILE_NAME, "a");
+ final ContentWatcher<String> watcher = new ContentWatcher<String>(_zkClient, FILE_NAME);
+ watcher.start();
+ assertEquals("a", watcher.getContent());
+
+ // update the content
+ _zkClient.writeData(FILE_NAME, "b");
+
+ String contentFromWatcher = TestUtil.waitUntil("b", new Callable<String>() {
+
+ @Override
+ public String call() throws Exception {
+ return watcher.getContent();
+ }
+ }, TimeUnit.SECONDS, 5);
+
+ assertEquals("b", contentFromWatcher);
+ watcher.stop();
+ }
+
+ @Test
+ public void testGetContentWaitTillCreated() throws InterruptedException {
+ LOG.info("--- testGetContentWaitTillCreated");
+ final Holder<String> contentHolder = new Holder<String>();
+
+ Thread thread = new Thread() {
+ @Override
+ public void run() {
+ ContentWatcher<String> watcher = new ContentWatcher<String>(_zkClient, FILE_NAME);
+ try {
+ watcher.start();
+ contentHolder.set(watcher.getContent());
+ watcher.stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ thread.start();
+
+ // create content after 200ms
+ Thread.sleep(200);
+ _zkClient.createPersistent(FILE_NAME, "aaa");
+
+ // we give the thread some time to pick up the change
+ thread.join(1000);
+ assertEquals("aaa", contentHolder.get());
+ }
+
+ @Test
+ public void testHandlingNullContent() throws InterruptedException {
+ LOG.info("--- testHandlingNullContent");
+ _zkClient.createPersistent(FILE_NAME, null);
+ ContentWatcher<String> watcher = new ContentWatcher<String>(_zkClient, FILE_NAME);
+ watcher.start();
+ assertEquals(null, watcher.getContent());
+ watcher.stop();
+ }
+
+ @Test(timeout = 20000)
+ public void testHandlingOfConnectionLoss() throws Exception {
+ LOG.info("--- testHandlingOfConnectionLoss");
+ final Gateway gateway = new Gateway(4712, 4711);
+ gateway.start();
+ final ZkClient zkClient = ZkTestSystem.createZkClient("localhost:4712");
+
+ // disconnect
+ gateway.stop();
+
+ // reconnect after 250ms and create file with content
+ new Thread() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(250);
+ gateway.start();
+ zkClient.createPersistent(FILE_NAME, "aaa");
+ zkClient.writeData(FILE_NAME, "b");
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ }.start();
+
+ final ContentWatcher<String> watcher = new ContentWatcher<String>(zkClient, FILE_NAME);
+ watcher.start();
+
+ TestUtil.waitUntil("b", new Callable<String>() {
+
+ @Override
+ public String call() throws Exception {
+ return watcher.getContent();
+ }
+ }, TimeUnit.SECONDS, 5);
+ assertEquals("b", watcher.getContent());
+
+ watcher.stop();
+ zkClient.close();
+ gateway.stop();
+ }
+}
View
37 curator-x-apache-zkclient-bridge/src/test/java/org/I0Itec/zkclient/DeferredGatewayStarter.java
@@ -0,0 +1,37 @@
+/**
+ * Copyright 2010 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.I0Itec.zkclient;
+
+public class DeferredGatewayStarter extends Thread {
+
+ private final Gateway _zkServer;
+ private int _delay;
+
+ public DeferredGatewayStarter(Gateway gateway, int delay) {
+ _zkServer = gateway;
+ _delay = delay;
+ }
+
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(_delay);
+ _zkServer.start();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+}
View
131 curator-x-apache-zkclient-bridge/src/test/java/org/I0Itec/zkclient/DistributedQueueTest.java
@@ -0,0 +1,131 @@
+/**
+ * Copyright 2010 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.I0Itec.zkclient;
+
+import org.apache.curator.test.TestingServer;
+import org.I0Itec.zkclient.testutil.ZkTestSystem;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.Vector;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class DistributedQueueTest {
+
+ private TestingServer _zkServer;
+ private ZkClient _zkClient;
+
+ @Before
+ public void setUp() throws Exception {
+ _zkServer = new TestingServer(4711);
+ _zkClient = ZkTestSystem.createZkClient(_zkServer.getConnectString());
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ if (_zkClient != null) {
+ _zkClient.close();
+ }
+ if (_zkServer != null) {
+ _zkServer.close();
+ }
+ }
+
+ @Test(timeout = 15000)
+ public void testDistributedQueue() {
+ _zkClient.createPersistent("/queue");
+
+ DistributedQueue<Long> distributedQueue = new DistributedQueue<Long>(_zkClient, "/queue");
+ distributedQueue.offer(17L);
+ distributedQueue.offer(18L);
+ distributedQueue.offer(19L);
+
+ assertEquals(Long.valueOf(17L), distributedQueue.poll());
+ assertEquals(Long.valueOf(18L), distributedQueue.poll());
+ assertEquals(Long.valueOf(19L), distributedQueue.poll());
+ assertNull(distributedQueue.poll());
+ }
+
+ @Test(timeout = 15000)
+ public void testPeek() {
+ _zkClient.createPersistent("/queue");
+
+ DistributedQueue<Long> distributedQueue = new DistributedQueue<Long>(_zkClient, "/queue");
+ distributedQueue.offer(17L);
+ distributedQueue.offer(18L);
+
+ assertEquals(Long.valueOf(17L), distributedQueue.peek());
+ assertEquals(Long.valueOf(17L), distributedQueue.peek());
+ assertEquals(Long.valueOf(17L), distributedQueue.poll());
+ assertEquals(Long.valueOf(18L), distributedQueue.peek());
+ assertEquals(Long.valueOf(18L), distributedQueue.poll());
+ assertNull(distributedQueue.peek());
+ }
+
+ @Test(timeout = 30000)
+ public void testMultipleReadingThreads() throws InterruptedException {
+ _zkClient.createPersistent("/queue");
+
+ final DistributedQueue<Long> distributedQueue = new DistributedQueue<Long>(_zkClient, "/queue");
+
+ // insert 100 elements
+ for (int i = 0; i < 100; i++) {
+ distributedQueue.offer(new Long(i));
+ }
+
+ // 3 reading threads
+ final Set<Long> readElements = Collections.synchronizedSet(new HashSet<Long>());
+ List<Thread> threads = new ArrayList<Thread>();
+ final List<Exception> exceptions = new Vector<Exception>();
+
+ for (int i = 0; i < 3; i++) {
+ Thread thread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ Long value = distributedQueue.poll();
+ if (value == null) {
+ return;
+ }
+ readElements.add(value);
+ }
+ } catch (Exception e) {
+ exceptions.add(e);
+ e.printStackTrace();
+ }
+ }
+ };
+ threads.add(thread);
+ thread.start();
+ }
+
+ for (Thread thread : threads) {
+ thread.join();
+ }
+
+ assertEquals(0, exceptions.size());
+ assertEquals(100, readElements.size());
+ }
+}
View
25 curator-x-apache-zkclient-bridge/src/test/java/org/I0Itec/zkclient/InMemoryConnectionTest.java
@@ -0,0 +1,25 @@
+/**
+ * Copyright 2010 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.I0Itec.zkclient;
+
+
+public class InMemoryConnectionTest extends AbstractConnectionTest {
+
+ public InMemoryConnectionTest() {
+ super(new InMemoryConnection());
+ }
+
+}
View
49 curator-x-apache-zkclient-bridge/src/test/java/org/I0Itec/zkclient/MemoryZkClientTest.java
@@ -0,0 +1,49 @@
+/**
+ * Copyright 2010 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.I0Itec.zkclient;
+
+import org.apache.zookeeper.CreateMode;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class MemoryZkClientTest extends AbstractBaseZkClientTest {
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ _client = new ZkClient(new InMemoryConnection());
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ _client.close();
+ }
+
+ @Test
+ public void testGetChildren() throws Exception {
+ String path1 = "/a";
+ String path2 = "/a/a";
+ String path3 = "/a/a/a";
+
+ _client.create(path1, null, CreateMode.PERSISTENT);
+ _client.create(path2, null, CreateMode.PERSISTENT);
+ _client.create(path3, null, CreateMode.PERSISTENT);
+ Assert.assertEquals(1, _client.getChildren(path1).size());
+ Assert.assertEquals(1, _client.getChildren(path2).size());
+ Assert.assertEquals(0, _client.getChildren(path3).size());
+ }
+}
View
292 curator-x-apache-zkclient-bridge/src/test/java/org/I0Itec/zkclient/ServerZkClientTest.java
@@ -0,0 +1,292 @@
+/**
+ * Copyright 2010 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.I0Itec.zkclient;
+
+import org.apache.curator.test.TestingServer;
+import org.I0Itec.zkclient.exception.ZkBadVersionException;
+import org.I0Itec.zkclient.exception.ZkInterruptedException;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.I0Itec.zkclient.testutil.ZkTestSystem;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.data.Stat;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.*;
+
+public class ServerZkClientTest extends AbstractBaseZkClientTest {
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ _zkServer = new TestingServer(4711);
+ _client = ZkTestSystem.createZkClient("localhost:4711");
+ }
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ super.tearDown();
+ _client.close();
+ _zkServer.close();
+ }
+
+ @Test(timeout = 15000)
+ public void testRetryUntilConnected() throws Exception {
+ LOG.info("--- testRetryUntilConnected");
+ Gateway gateway = new Gateway(4712, 4711);
+ gateway.start();
+ final IZkConnection zkConnection = ZkTestSystem.createZkConnection("localhost:4712");
+ final ZkClient zkClient = new ZkClient(zkConnection, 5000);
+
+ gateway.stop();
+
+ // start server in 250ms
+ new DeferredGatewayStarter(gateway, 250).start();
+
+ // this should work as soon as the connection is reestablished, if it
+ // fails it throws a ConnectionLossException
+ zkClient.retryUntilConnected(new Callable<Object>() {
+
+ @Override
+ public Object call() throws Exception {
+ zkConnection.exists("/a", false);
+ return null;
+ }
+ });
+
+ zkClient.close();
+ gateway.stop();
+ }
+
+ @Test(timeout = 15000)
+ public void testWaitUntilConnected() throws Exception {
+ LOG.info("--- testWaitUntilConnected");
+ ZkClient _client = ZkTestSystem.createZkClient("localhost:4711");
+
+ _zkServer.close();
+
+ // the _client state should change to KeeperState.Disconnected
+ assertTrue(_client.waitForKeeperState(KeeperState.Disconnected, 1, TimeUnit.SECONDS));
+
+ // connection should not be possible and timeout after 100ms
+ assertFalse(_client.waitUntilConnected(100, TimeUnit.MILLISECONDS));
+ }
+
+/*
+ JLZ - can't emulate
+
+ @Test(timeout = 15000)
+ public void testRetryUntilConnected_SessionExpiredException() {
+ LOG.info("--- testRetryUntilConnected_SessionExpiredException");
+
+ // Use a tick time of 100ms, because the minimum session timeout is 2 x tick-time.
+ // ZkServer zkServer = TestUtil.startZkServer("ZkClientTest-testSessionExpiredException", 4711, 100);
+ Gateway gateway = new Gateway(4712, 4711);
+ gateway.start();
+
+ // Use a session timeout of 200ms
+ final ZkClient zkClient = ZkTestSystem.createZkClient("localhost:4712", 200, 5000);
+
+ gateway.stop();
+
+ // Start server in 600ms, the session should have expired by then
+ new DeferredGatewayStarter(gateway, 600).start();
+
+ // This should work as soon as a new session has been created (and the connection is reestablished), if it fails
+ // it throws a SessionExpiredException
+ zkClient.retryUntilConnected(new Callable<Object>() {
+
+ @Override
+ public Object call() throws Exception {
+ zkClient.exists("/a");
+ return null;
+ }
+ });
+
+ zkClient.close();
+ // zkServer.shutdown();
+ gateway.stop();
+ }
+*/
+
+/*
+ JLZ - can't emulate
+
+ @Test(timeout = 15000)
+ public void testChildListenerAfterSessionExpiredException() throws Exception {
+ LOG.info("--- testChildListenerAfterSessionExpiredException");
+
+ int sessionTimeout = 200;
+ ZkClient connectedClient = _zkServer.getZkClient();
+ connectedClient.createPersistent("/root");
+
+ Gateway gateway = new Gateway(4712, 4711);
+ gateway.start();
+
+ final ZkClient disconnectedZkClient = new ZkClient("localhost:4712", sessionTimeout, 5000);
+ final Holder<List<String>> children = new Holder<List<String>>();
+ disconnectedZkClient.subscribeChildChanges("/root", new IZkChildListener() {
+
+ @Override
+ public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
+ children.set(currentChilds);
+ }
+ });
+
+ gateway.stop();
+
+ // The connected client now created a new child node
+ connectedClient.createPersistent("/root/node");
+
+ // Wait for 3 x sessionTimeout, the session should have expired by then and start the gateway again
+ Thread.sleep(sessionTimeout * 3);
+ gateway.start();
+
+ Boolean hasOneChild = TestUtil.waitUntil(true, new Callable<Boolean>() {
+
+ @Override
+ public Boolean call() throws Exception {
+ return children.get() != null && children.get().size() == 1;
+ }
+ }, TimeUnit.SECONDS, 5);
+
+ assertTrue(hasOneChild);
+
+ disconnectedZkClient.close();
+ gateway.stop();
+ }
+*/
+
+ @Test(timeout = 10000)
+ public void testZkClientConnectedToGatewayClosesQuickly() throws Exception {
+ LOG.info("--- testZkClientConnectedToGatewayClosesQuickly");
+ final Gateway gateway = new Gateway(4712, 4711);
+ gateway.start();
+
+ ZkClient zkClient = ZkTestSystem.createZkClient("localhost:4712");
+ zkClient.close();
+
+ gateway.stop();
+ }
+
+ @Test
+ public void testCountChildren() throws InterruptedException {
+ assertEquals(0, _client.countChildren("/a"));
+ _client.createPersistent("/a");
+ assertEquals(0, _client.countChildren("/a"));
+ _client.createPersistent("/a/b");
+ assertEquals(1, _client.countChildren("/a"));
+
+ // test concurrent access
+ Thread thread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ while (!isInterrupted()) {
+ _client.createPersistent("/test");
+ _client.delete("/test");
+ }
+ } catch (ZkInterruptedException e) {
+ // ignore and finish
+ }
+ }
+ };
+
+ thread.start();
+ for (int i = 0; i < 1000; i++) {
+ assertEquals(0, _client.countChildren("/test"));
+ }
+ thread.interrupt();
+ thread.join();
+ }
+
+ @Test
+ public void testReadDataWithStat() {
+ _client.createPersistent("/a", "data");
+ Stat stat = new Stat();
+ _client.readData("/a", stat);
+ assertEquals(0, stat.getVersion());
+ assertTrue(stat.getDataLength() > 0);
+ }
+
+ @Test
+ public void testWriteDataWithExpectedVersion() {
+ _client.createPersistent("/a", "data");
+ _client.writeData("/a", "data2", 0);
+
+ try {
+ _client.writeData("/a", "data3", 0);
+ fail("expected exception");
+ } catch (ZkBadVersionException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testCreateWithParentDirs() {
+ String path = "/a/b";
+ try {
+ _client.createPersistent(path, false);
+ fail("should throw exception");
+ } catch (ZkNoNodeException e) {
+ assertFalse(_client.exists(path));
+ }
+
+ _client.createPersistent(path, true);
+ assertTrue(_client.exists(path));
+ }
+
+ @Test
+ public void testUpdateSerialized() throws InterruptedException {
+ _client.createPersistent("/a", 0);
+
+ int numberOfThreads = 2;
+ final int numberOfIncrementsPerThread = 100;
+
+ List<Thread> threads = new ArrayList<Thread>();
+ for (int i = 0; i < numberOfThreads; i++) {
+ Thread thread = new Thread() {
+ @Override
+ public void run() {
+ for (int j = 0; j < numberOfIncrementsPerThread; j++) {
+ _client.updateDataSerialized("/a", new DataUpdater<Integer>() {
+
+ @Override
+ public Integer update(Integer integer) {
+ return integer + 1;
+ }
+ });
+ }
+ }
+ };
+ thread.start();
+ threads.add(thread);
+ }
+
+ for (Thread thread : threads) {
+ thread.join();
+ }
+
+ Integer finalValue = _client.readData("/a");
+ assertEquals(numberOfIncrementsPerThread * numberOfThreads, finalValue.intValue());
+ }
+}
View
95 curator-x-apache-zkclient-bridge/src/test/java/org/I0Itec/zkclient/TestUtil.java
@@ -0,0 +1,95 @@
+/**
+ * Copyright 2010 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.I0Itec.zkclient;
+
+import static org.mockito.Mockito.mock;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.curator.test.TestingServer;
+import org.apache.commons.io.FileUtils;
+import org.mockito.exceptions.base.MockitoAssertionError;
+
+public class TestUtil {
+
+ /**
+ * This waits until the provided {@link Callable} returns an object that is equals to the given expected value or
+ * the timeout has been reached. In both cases this method will return the return value of the latest
+ * {@link Callable} execution.
+ *
+ * @param expectedValue
+ * The expected value of the callable.
+ * @param callable
+ * The callable.
+ * @param <T>
+ * The return type of the callable.
+ * @param timeUnit
+ * The timeout timeunit.
+ * @param timeout
+ * The timeout.
+ * @return the return value of the latest {@link Callable} execution.
+ * @throws Exception
+ * @throws InterruptedException
+ */
+ public static <T> T waitUntil(T expectedValue, Callable<T> callable, TimeUnit timeUnit, long timeout) throws Exception {
+ long startTime = System.currentTimeMillis();
+ do {
+ T actual = callable.call();
+ if (expectedValue.equals(actual)) {
+ return actual;
+ }
+ if (System.currentTimeMillis() > startTime + timeUnit.toMillis(timeout)) {
+ return actual;
+ }
+ Thread.sleep(50);
+ } while (true);
+ }
+
+ /**
+ * This waits until a mockito verification passed (which is provided in the runnable). This waits until the
+ * virification passed or the timeout has been reached. If the timeout has been reached this method will rethrow the
+ * {@link MockitoAssertionError} that comes from the mockito verification code.
+ *
+ * @param runnable
+ * The runnable containing the mockito verification.
+ * @param timeUnit
+ * The timeout timeunit.
+ * @param timeout
+ * The timeout.
+ * @throws InterruptedException
+ */
+ public static void waitUntilVerified(Runnable runnable, TimeUnit timeUnit, int timeout) throws InterruptedException {
+ long startTime = System.currentTimeMillis();
+ do {
+ MockitoAssertionError exception = null;
+ try {
+ runnable.run();
+ } catch (MockitoAssertionError e) {
+ exception = e;
+ }
+ if (exception == null) {
+ return;
+ }
+ if (System.currentTimeMillis() > startTime + timeUnit.toMillis(timeout)) {
+ throw exception;
+ }
+ Thread.sleep(50);
+ } while (true);
+ }
+}
View
54 curator-x-apache-zkclient-bridge/src/test/java/org/I0Itec/zkclient/ZkClientSerializationTest.java
@@ -0,0 +1,54 @@
+/**
+ * Copyright 2010 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.I0Itec.zkclient;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Random;
+
+import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer;
+import org.I0Itec.zkclient.serialize.SerializableSerializer;
+import org.I0Itec.zkclient.testutil.ZkTestSystem;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class ZkClientSerializationTest {
+
+ @Rule
+ public ZkTestSystem _zk = ZkTestSystem.getInstance();
+
+ @Test
+ public void testBytes() throws Exception {
+ ZkClient zkClient = ZkTestSystem.createZkClient(_zk.getZkServerAddress());
+ zkClient.setZkSerializer(new BytesPushThroughSerializer());
+ byte[] bytes = new byte[100];
+ new Random().nextBytes(bytes);
+ zkClient.createPersistent("/a", bytes);
+ byte[] readBytes = zkClient.readData("/a");
+ assertArrayEquals(bytes, readBytes);
+ }
+
+ @Test
+ public void testSerializables() throws Exception {
+ ZkClient zkClient = ZkTestSystem.createZkClient(_zk.getZkServerAddress());
+ zkClient.setZkSerializer(new SerializableSerializer());
+ String data = "hello world";
+ zkClient.createPersistent("/a", data);
+ String readData = zkClient.readData("/a");
+ assertEquals(data, readData);
+ }
+}
View
36 curator-x-apache-zkclient-bridge/src/test/java/org/I0Itec/zkclient/ZkConnectionTest.java
@@ -0,0 +1,36 @@
+/**
+ * Copyright 2010 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.I0Itec.zkclient;
+
+import org.I0Itec.zkclient.testutil.ZkTestSystem;
+import org.junit.Rule;
+
+public class ZkConnectionTest extends AbstractConnectionTest {
+
+ @Rule
+ public ZkTestSystem _zk = ZkTestSystem.getInstance();
+
+ public ZkConnectionTest() {
+ super(establishConnection());
+ }
+
+ private static IZkConnection establishConnection() {
+ IZkConnection zkConnection = ZkTestSystem.createZkConnection("localhost:" + ZkTestSystem.getInstance().getZkServer().getPort());
+ new ZkClient(zkConnection);// connect
+ return zkConnection;
+ }
+
+}
View
80 curator-x-apache-zkclient-bridge/src/test/java/org/I0Itec/zkclient/testutil/ZkPathUtil.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2012 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.I0Itec.zkclient.testutil;
+
+import java.util.List;
+
+import org.I0Itec.zkclient.ZkClient;
+
+// JLZ - copied from https://raw.github.com/sgroschupf/zkclient/master/src/main/java/org/I0Itec/zkclient/util/ZkPathUtil.java
+public class ZkPathUtil {
+
+ public static String leadingZeros(long number, int numberOfLeadingZeros) {
+ return String.format("%0" + numberOfLeadingZeros + "d", number);
+ }
+
+ public static String toString(ZkClient zkClient) {
+ return toString(zkClient, "/", PathFilter.ALL);
+ }
+
+ public static String toString(ZkClient zkClient, String startPath, PathFilter pathFilter) {
+ final int level = 1;
+ final StringBuilder builder = new StringBuilder("+ (" + startPath + ")");
+ builder.append("\n");
+ addChildrenToStringBuilder(zkClient, pathFilter, level, builder, startPath);
+ return builder.toString();
+ }
+
+ private static void addChildrenToStringBuilder(ZkClient zkClient, PathFilter pathFilter, final int level, final StringBuilder builder, final String startPath) {
+ final List<String> children = zkClient.getChildren(startPath);
+ for (final String node : children) {
+ String nestedPath;
+ if (startPath.endsWith("/")) {
+ nestedPath = startPath + node;
+ } else {
+ nestedPath = startPath + "/" + node;
+ }
+ if (pathFilter.showChilds(nestedPath)) {
+ builder.append(getSpaces(level - 1) + "'-" + "+" + node + "\n");
+ addChildrenToStringBuilder(zkClient, pathFilter, level + 1, builder, nestedPath);
+ } else {
+ builder.append(getSpaces(level - 1) + "'-" + "-" + node + " (contents hidden)\n");
+ }
+ }
+ }
+
+ private static String getSpaces(final int level) {
+ String s = "";
+ for (int i = 0; i < level; i++) {
+ s += " ";
+ }
+ return s;
+ }
+
+ public static interface PathFilter {
+
+ public static PathFilter ALL = new PathFilter() {
+
+ @Override
+ public boolean showChilds(String path) {
+ return true;
+ }
+ };
+
+ boolean showChilds(String path);
+ }
+
+}
View
147 curator-x-apache-zkclient-bridge/src/test/java/org/I0Itec/zkclient/testutil/ZkTestSystem.java
@@ -0,0 +1,147 @@
+/**
+ * Copyright 2010 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.I0Itec.zkclient.testutil;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.test.Timing;
+import com.netflix.curator.x.zkclientbridge.CuratorZKClientBridge;
+import org.I0Itec.zkclient.IZkConnection;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.log4j.Logger;
+import org.junit.rules.ExternalResource;
+import java.io.IOException;
+import java.util.List;
+
+public class ZkTestSystem extends ExternalResource {
+
+ protected static final Logger LOG = Logger.getLogger(ZkTestSystem.class);
+
+ private static int PORT = 10002;
+ private static ZkTestSystem _instance;
+ private TestingServer _zkServer;
+ private ZkClient _zkClient;
+
+ private ZkTestSystem() {
+ LOG.info("~~~~~~~~~~~~~~~ starting zk system ~~~~~~~~~~~~~~~");
+ try {
+ _zkServer = new TestingServer(PORT);
+ _zkClient = ZkTestSystem.createZkClient(_zkServer.getConnectString());
+ }
+ catch ( Exception e ) {
+ throw new RuntimeException(e);
+ }
+ LOG.info("~~~~~~~~~~~~~~~ zk system started ~~~~~~~~~~~~~~~");
+ }
+
+ @Override
+ // executed before every test method
+ protected void before() throws Throwable {
+ cleanupZk();
+ }
+
+ @Override
+ // executed after every test method
+ protected void after() {
+ cleanupZk();
+ }
+
+ private void cleanupZk() {
+ LOG.info("cleanup zk namespace");
+ List<String> children = getZkClient().getChildren("/");
+ for (String child : children) {
+ if (!child.equals("zookeeper")) {
+ getZkClient().deleteRecursive("/" + child);
+ }
+ }
+ LOG.info("unsubscribing " + getZkClient().numberOfListeners() + " listeners");
+ getZkClient().unsubscribeAll();
+ }
+
+ public static ZkTestSystem getInstance() {
+ if (_instance == null) {
+ _instance = new ZkTestSystem();
+ _instance.cleanupZk();
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ LOG.info("shutting zk down");
+ try {
+ getInstance().getZkClient().close();
+ getInstance().getZkServer().close();
+ }
+ catch ( IOException e ) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ }
+ return _instance;
+ }
+
+ public TestingServer getZkServer() {
+ return _zkServer;
+ }
+
+ public String getZkServerAddress() {
+ return "localhost:" + getServerPort();
+ }
+
+ public ZkClient getZkClient() {
+ return _zkClient;
+ }
+
+ public int getServerPort() {
+ return PORT;
+ }
+
+ public static IZkConnection createZkConnection(String connectString) {
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, timing.session(), timing.connection(), new RetryOneTime(1));
+ client.start();
+ try
+ {
+ return new CuratorZKClientBridge(client);
+ }
+ catch ( Exception e )
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static ZkClient createZkClient(String connectString) {
+ try
+ {
+ Timing timing = new Timing();
+ return new ZkClient(createZkConnection(connectString), timing.connection());
+ }
+ catch ( Exception e )
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public ZkClient createZkClient() {
+ return createZkClient("localhost:" + PORT);
+ }
+
+ public void showStructure() {
+ getZkClient().showFolders(System.out);
+ }
+
+}
View
72 curator-x-apache-zkclient-bridge/src/test/java/org/I0Itec/zkclient/util/ZkPathUtilTest.java
@@ -0,0 +1,72 @@
+/**
+ * Copyright 2010 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.I0Itec.zkclient.util;
+
+import org.apache.curator.test.TestingServer;
+import junit.framework.TestCase;
+
+import org.I0Itec.zkclient.TestUtil;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
+import org.I0Itec.zkclient.testutil.ZkPathUtil;
+import org.I0Itec.zkclient.testutil.ZkTestSystem;
+
+public class ZkPathUtilTest extends TestCase {
+
+ protected TestingServer _zkServer;
+ protected ZkClient _client;
+
+ public void testToString() throws Exception {
+ _zkServer = new TestingServer(4711);
+ _client = ZkTestSystem.createZkClient("localhost:4711");
+ final String file1 = "/files/file1";
+ final String file2 = "/files/file2";
+ final String file3 = "/files/file2/file3";
+ _client.createPersistent(file1, true);
+ _client.createPersistent(file2, true);
+ _client.createPersistent(file3, true);
+
+ String stringRepresentation = ZkPathUtil.toString(_client);
+ System.out.println(stringRepresentation);
+ System.out.println("-------------------------");
+ assertTrue(stringRepresentation.contains("file1"));
+ assertTrue(stringRepresentation.contains("file2"));
+ assertTrue(stringRepresentation.contains("file3"));
+
+ // path filtering
+ stringRepresentation = ZkPathUtil.toString(_client, "/", new ZkPathUtil.PathFilter() {
+ @Override
+ public boolean showChilds(String path) {
+ return !file2.equals(path);
+ }
+ });
+ assertTrue(stringRepresentation.contains("file1"));
+ assertTrue(stringRepresentation.contains("file2"));
+ assertFalse(stringRepresentation.contains("file3"));
+
+ // start path
+ stringRepresentation = ZkPathUtil.toString(_client, file2, ZkPathUtil.PathFilter.ALL);
+ assertFalse(stringRepresentation.contains("file1"));
+ assertTrue(stringRepresentation.contains("file2"));
+ assertTrue(stringRepresentation.contains("file3"));
+
+ _zkServer.close();
+ }
+
+ public void testLeadingZeros() throws Exception {
+ assertEquals("0000000001", ZkPathUtil.leadingZeros(1, 10));
+ }
+}
View
2  settings.gradle
@@ -1,2 +1,2 @@
rootProject.name='curator'
-include 'curator-x-zkclient-bridge'
+include 'curator-x-zkclient-bridge', 'curator-x-apache-zkclient-bridge'
Please sign in to comment.
Something went wrong with that request. Please try again.