From 2b097fad87350ba65c3f02613461812336b2ddb0 Mon Sep 17 00:00:00 2001 From: Jordan Zimmerman Date: Mon, 3 Sep 2012 13:22:48 -0700 Subject: [PATCH] First pass at a ZKClient bridge --- build.gradle | 13 + .../zkclientbridge/CuratorZKClientBridge.java | 206 ++++++++++ .../zkclient/AbstractBaseZkClientTest.java | 388 ++++++++++++++++++ .../zkclient/AbstractConnectionTest.java | 54 +++ .../I0Itec/zkclient/ContentWatcherTest.java | 155 +++++++ .../zkclient/DeferredGatewayStarter.java | 37 ++ .../I0Itec/zkclient/DistributedQueueTest.java | 127 ++++++ .../zkclient/InMemoryConnectionTest.java | 25 ++ .../I0Itec/zkclient/MemoryZkClientTest.java | 49 +++ .../I0Itec/zkclient/ServerZkClientTest.java | 289 +++++++++++++ .../java/org/I0Itec/zkclient/TestUtil.java | 104 +++++ .../zkclient/ZkClientSerializationTest.java | 52 +++ .../org/I0Itec/zkclient/ZkConnectionTest.java | 36 ++ .../I0Itec/zkclient/testutil/ZkPathUtil.java | 80 ++++ .../zkclient/testutil/ZkTestSystem.java | 117 ++++++ .../I0Itec/zkclient/util/ZkPathUtilTest.java | 70 ++++ settings.gradle | 2 +- 17 files changed, 1803 insertions(+), 1 deletion(-) create mode 100644 curator-x-zkclient-bridge/src/main/java/com/netflix/curator/x/zkclientbridge/CuratorZKClientBridge.java create mode 100644 curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/AbstractBaseZkClientTest.java create mode 100644 curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/AbstractConnectionTest.java create mode 100644 curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/ContentWatcherTest.java create mode 100644 curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/DeferredGatewayStarter.java create mode 100644 curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/DistributedQueueTest.java create mode 100644 curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/InMemoryConnectionTest.java create mode 100644 curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/MemoryZkClientTest.java create mode 100644 curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/ServerZkClientTest.java create mode 100644 curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/TestUtil.java create mode 100644 curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/ZkClientSerializationTest.java create mode 100644 curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/ZkConnectionTest.java create mode 100644 curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/testutil/ZkPathUtil.java create mode 100644 curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/testutil/ZkTestSystem.java create mode 100644 curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/util/ZkPathUtilTest.java diff --git a/build.gradle b/build.gradle index 7ba4971b0..3cb92aa92 100644 --- a/build.gradle +++ b/build.gradle @@ -94,6 +94,19 @@ project(':curator-recipes') } } +project(':curator-x-zkclient-bridge') +{ + dependencies + { + compile project(':curator-client') + compile project(':curator-framework') + compile 'com.github.sgroschupf:zkclient:0.1' + testCompile 'commons-io:commons-io:1.4' + testCompile 'org.mockito:mockito-core:1.8.0' + testCompile 'junit:junit:4.7' + } +} + project(':curator-x-discovery') { dependencies diff --git a/curator-x-zkclient-bridge/src/main/java/com/netflix/curator/x/zkclientbridge/CuratorZKClientBridge.java b/curator-x-zkclient-bridge/src/main/java/com/netflix/curator/x/zkclientbridge/CuratorZKClientBridge.java new file mode 100644 index 000000000..884442c8f --- /dev/null +++ b/curator-x-zkclient-bridge/src/main/java/com/netflix/curator/x/zkclientbridge/CuratorZKClientBridge.java @@ -0,0 +1,206 @@ +/* + * Copyright 2012 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.curator.x.zkclientbridge; + +import com.netflix.curator.framework.CuratorFramework; +import com.netflix.curator.framework.api.CuratorEvent; +import com.netflix.curator.framework.api.CuratorListener; +import org.I0Itec.zkclient.IZkConnection; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; +import java.util.List; + +public class CuratorZKClientBridge implements IZkConnection +{ + private final CuratorFramework curator; + + public CuratorZKClientBridge(CuratorFramework curator) + { + this.curator = curator; + } + + @Override + public void connect(final Watcher watcher) + { + if ( watcher != null ) + { + curator.getCuratorListenable().addListener + ( + new CuratorListener() + { + @Override + public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception + { + if ( event.getWatchedEvent() != null ) + { + watcher.process(event.getWatchedEvent()); + } + } + } + ); + + curator.sync("/", null); // force an event so that ZKClient can see the connect + } + } + + @Override + public void close() throws InterruptedException + { + curator.close(); + } + + @Override + public String create(String path, byte[] data, CreateMode mode) throws KeeperException, InterruptedException + { + try + { + return curator.create().withMode(mode).forPath(path, data); + } + catch ( Exception e ) + { + adjustException(e); + } + return null; // will never execute + } + + @Override + public void delete(String path) throws InterruptedException, KeeperException + { + try + { + curator.delete().forPath(path); + } + catch ( Exception e ) + { + adjustException(e); + } + } + + @Override + public boolean exists(String path, boolean watch) throws KeeperException, InterruptedException + { + try + { + return watch ? (curator.checkExists().watched().forPath(path) != null) : (curator.checkExists().forPath(path) != null); + } + catch ( Exception e ) + { + adjustException(e); + } + return false; // will never execute + } + + @Override + public List getChildren(String path, boolean watch) throws KeeperException, InterruptedException + { + try + { + return watch ? curator.getChildren().forPath(path) : curator.getChildren().watched().forPath(path); + } + catch ( Exception e ) + { + adjustException(e); + } + return null; // will never execute + } + + @Override + public byte[] readData(String path, Stat stat, boolean watch) throws KeeperException, InterruptedException + { + try + { + if ( stat != null ) + { + return watch ? curator.getData().storingStatIn(stat).watched().forPath(path) : curator.getData().storingStatIn(stat).forPath(path); + } + else + { + return watch ? curator.getData().watched().forPath(path) : curator.getData().forPath(path); + } + } + catch ( Exception e ) + { + adjustException(e); + } + return null; // will never execute + } + + @Override + public void writeData(String path, byte[] data, int expectedVersion) throws KeeperException, InterruptedException + { + try + { + curator.setData().withVersion(expectedVersion).forPath(path, data); + } + catch ( Exception e ) + { + adjustException(e); + } + } + + @Override + public ZooKeeper.States getZookeeperState() + { + try + { + return curator.getZookeeperClient().getZooKeeper().getState(); + } + catch ( Exception e ) + { + throw new RuntimeException(e); + } + } + + @Override + public long getCreateTime(String path) throws KeeperException, InterruptedException + { + try + { + Stat stat = curator.checkExists().forPath(path); + return (stat != null) ? stat.getCtime() : 0; + } + catch ( Exception e ) + { + adjustException(e); + } + return 0; + } + + @Override + public String getServers() + { + throw new UnsupportedOperationException(); + } + + private void adjustException(Exception e) throws KeeperException, InterruptedException + { + if ( e instanceof KeeperException ) + { + throw (KeeperException)e; + } + + if ( e instanceof InterruptedException ) + { + throw (InterruptedException)e; + } + + throw new RuntimeException(e); + } +} diff --git a/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/AbstractBaseZkClientTest.java b/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/AbstractBaseZkClientTest.java new file mode 100644 index 000000000..93abef77d --- /dev/null +++ b/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/AbstractBaseZkClientTest.java @@ -0,0 +1,388 @@ +/** + * Copyright 2010 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.I0Itec.zkclient; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.I0Itec.zkclient.exception.ZkTimeoutException; +import org.apache.log4j.Logger; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public abstract class AbstractBaseZkClientTest { + + protected static final Logger LOG = Logger.getLogger(AbstractBaseZkClientTest.class); + protected ZkServer _zkServer; + protected ZkClient _client; + + @Before + public void setUp() throws Exception { + LOG.info("------------ BEFORE -------------"); + + } + + @After + public void tearDown() throws Exception { + LOG.info("------------ AFTER -------------"); + } + + @Test(expected = ZkTimeoutException.class, timeout = 5000) + public void testUnableToConnect() throws Exception { + LOG.info("--- testUnableToConnect"); + // we are using port 4711 to avoid conflicts with the zk server that is + // started by the Spring context + new ZkClient("localhost:4712", 1000); + } + + @Test + public void testWriteAndRead() throws Exception { + LOG.info("--- testWriteAndRead"); + String data = "something"; + String path = "/a"; + _client.createPersistent(path, data); + String data2 = _client.readData(path); + Assert.assertEquals(data, data2); + _client.delete(path); + } + + @Test + public void testDelete() throws Exception { + LOG.info("--- testDelete"); + String path = "/a"; + assertFalse(_client.delete(path)); + _client.createPersistent(path, null); + assertTrue(_client.delete(path)); + assertFalse(_client.delete(path)); + } + + @Test + public void testDeleteRecursive() throws Exception { + LOG.info("--- testDeleteRecursive"); + + // should be able to call this on a not existing directory + _client.deleteRecursive("/doesNotExist"); + } + + @Test + public void testWaitUntilExists() { + LOG.info("--- testWaitUntilExists"); + // create /gaga node asynchronously + new Thread() { + @Override + public void run() { + try { + Thread.sleep(100); + _client.createPersistent("/gaga"); + } catch (Exception e) { + // ignore + } + } + }.start(); + + // wait until this was created + assertTrue(_client.waitUntilExists("/gaga", TimeUnit.SECONDS, 5)); + assertTrue(_client.exists("/gaga")); + + // waiting for /neverCreated should timeout + assertFalse(_client.waitUntilExists("/neverCreated", TimeUnit.MILLISECONDS, 100)); + } + + @Test + public void testDataChanges1() throws Exception { + LOG.info("--- testDataChanges1"); + String path = "/a"; + final Holder holder = new Holder(); + + IZkDataListener listener = new IZkDataListener() { + + @Override + public void handleDataChange(String dataPath, Object data) throws Exception { + holder.set((String) data); + } + + @Override + public void handleDataDeleted(String dataPath) throws Exception { + holder.set(null); + } + }; + _client.subscribeDataChanges(path, listener); + _client.createPersistent(path, "aaa"); + + // wait some time to make sure the event was triggered + String contentFromHolder = TestUtil.waitUntil("b", new Callable() { + + @Override + public String call() throws Exception { + return holder.get(); + } + }, TimeUnit.SECONDS, 5); + + assertEquals("aaa", contentFromHolder); + } + + @Test + public void testDataChanges2() throws Exception { + LOG.info("--- testDataChanges2"); + String path = "/a"; + final AtomicInteger countChanged = new AtomicInteger(0); + final AtomicInteger countDeleted = new AtomicInteger(0); + + IZkDataListener listener = new IZkDataListener() { + + @Override + public void handleDataChange(String dataPath, Object data) throws Exception { + countChanged.incrementAndGet(); + } + + @Override + public void handleDataDeleted(String dataPath) throws Exception { + countDeleted.incrementAndGet(); + } + }; + _client.subscribeDataChanges(path, listener); + + // create node + _client.createPersistent(path, "aaa"); + + // wait some time to make sure the event was triggered + TestUtil.waitUntil(1, new Callable() { + + @Override + public Integer call() throws Exception { + return countChanged.get(); + } + }, TimeUnit.SECONDS, 5); + assertEquals(1, countChanged.get()); + assertEquals(0, countDeleted.get()); + + countChanged.set(0); + countDeleted.set(0); + // delete node, this should trigger a delete event + _client.delete(path); + // wait some time to make sure the event was triggered + TestUtil.waitUntil(1, new Callable() { + + @Override + public Integer call() throws Exception { + return countDeleted.get(); + } + }, TimeUnit.SECONDS, 5); + assertEquals(0, countChanged.get()); + assertEquals(1, countDeleted.get()); + + // test if watch was reinstalled after the file got deleted + countChanged.set(0); + _client.createPersistent(path, "aaa"); + + // wait some time to make sure the event was triggered + TestUtil.waitUntil(1, new Callable() { + + @Override + public Integer call() throws Exception { + return countChanged.get(); + } + }, TimeUnit.SECONDS, 5); + assertEquals(1, countChanged.get()); + + // test if changing the contents notifies the listener + _client.writeData(path, "bbb"); + + // wait some time to make sure the event was triggered + TestUtil.waitUntil(2, new Callable() { + + @Override + public Integer call() throws Exception { + return countChanged.get(); + } + }, TimeUnit.SECONDS, 5); + assertEquals(2, countChanged.get()); + } + + @Test(timeout = 15000) + public void testHandleChildChanges() throws Exception { + LOG.info("--- testHandleChildChanges"); + String path = "/a"; + final AtomicInteger count = new AtomicInteger(0); + final Holder> children = new Holder>(); + + IZkChildListener listener = new IZkChildListener() { + + @Override + public void handleChildChange(String parentPath, List currentChilds) throws Exception { + count.incrementAndGet(); + children.set(currentChilds); + } + }; + _client.subscribeChildChanges(path, listener); + + // ---- + // Create the root node should throw the first child change event + // ---- + _client.createPersistent(path); + + // wait some time to make sure the event was triggered + TestUtil.waitUntil(1, new Callable() { + + @Override + public Integer call() throws Exception { + return count.get(); + } + }, TimeUnit.SECONDS, 5); + assertEquals(1, count.get()); + assertEquals(0, children.get().size()); + + // ---- + // Creating a child node should throw another event + // ---- + count.set(0); + _client.createPersistent(path + "/child1"); + + // wait some time to make sure the event was triggered + TestUtil.waitUntil(1, new Callable() { + + @Override + public Integer call() throws Exception { + return count.get(); + } + }, TimeUnit.SECONDS, 5); + assertEquals(1, count.get()); + assertEquals(1, children.get().size()); + assertEquals("child1", children.get().get(0)); + + // ---- + // Creating another child and deleting the node should also throw an event + // ---- + count.set(0); + _client.createPersistent(path + "/child2"); + _client.deleteRecursive(path); + + // wait some time to make sure the event was triggered + Boolean eventReceived = TestUtil.waitUntil(true, new Callable() { + + @Override + public Boolean call() throws Exception { + return count.get() > 0 && children.get() == null; + } + }, TimeUnit.SECONDS, 5); + assertTrue(eventReceived); + assertNull(children.get()); + + // ---- + // Creating root again should throw an event + // ---- + count.set(0); + _client.createPersistent(path); + + // wait some time to make sure the event was triggered + eventReceived = TestUtil.waitUntil(true, new Callable() { + + @Override + public Boolean call() throws Exception { + return count.get() > 0 && children.get() != null; + } + }, TimeUnit.SECONDS, 5); + assertTrue(eventReceived); + assertEquals(0, children.get().size()); + + // ---- + // Creating child now should throw an event + // ---- + count.set(0); + _client.createPersistent(path + "/child"); + + // wait some time to make sure the event was triggered + eventReceived = TestUtil.waitUntil(true, new Callable() { + + @Override + public Boolean call() throws Exception { + return count.get() > 0; + } + }, TimeUnit.SECONDS, 5); + assertTrue(eventReceived); + assertEquals(1, children.get().size()); + assertEquals("child", children.get().get(0)); + + // ---- + // Deleting root node should throw an event + // ---- + count.set(0); + _client.deleteRecursive(path); + + // wait some time to make sure the event was triggered + eventReceived = TestUtil.waitUntil(true, new Callable() { + + @Override + public Boolean call() throws Exception { + return count.get() > 0 && children.get() == null; + } + }, TimeUnit.SECONDS, 5); + assertTrue(eventReceived); + assertNull(children.get()); + } + + @Test + public void testGetCreationTime() throws Exception { + long start = System.currentTimeMillis(); + Thread.sleep(100); + String path = "/a"; + _client.createPersistent(path); + Thread.sleep(100); + long end = System.currentTimeMillis(); + long creationTime = _client.getCreationTime(path); + assertTrue(start < creationTime && end > creationTime); + } + + @Test + public void testNumberOfListeners() { + IZkChildListener zkChildListener = mock(IZkChildListener.class); + _client.subscribeChildChanges("/", zkChildListener); + assertEquals(1, _client.numberOfListeners()); + + IZkDataListener zkDataListener = mock(IZkDataListener.class); + _client.subscribeDataChanges("/a", zkDataListener); + assertEquals(2, _client.numberOfListeners()); + + _client.subscribeDataChanges("/b", zkDataListener); + assertEquals(3, _client.numberOfListeners()); + + IZkStateListener zkStateListener = mock(IZkStateListener.class); + _client.subscribeStateChanges(zkStateListener); + assertEquals(4, _client.numberOfListeners()); + + _client.unsubscribeChildChanges("/", zkChildListener); + assertEquals(3, _client.numberOfListeners()); + + _client.unsubscribeDataChanges("/b", zkDataListener); + assertEquals(2, _client.numberOfListeners()); + + _client.unsubscribeDataChanges("/a", zkDataListener); + assertEquals(1, _client.numberOfListeners()); + + _client.unsubscribeStateChanges(zkStateListener); + assertEquals(0, _client.numberOfListeners()); + } +} diff --git a/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/AbstractConnectionTest.java b/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/AbstractConnectionTest.java new file mode 100644 index 000000000..26b846c06 --- /dev/null +++ b/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/AbstractConnectionTest.java @@ -0,0 +1,54 @@ +/** + * Copyright 2010 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.I0Itec.zkclient; + +import java.util.List; + +import org.I0Itec.zkclient.testutil.ZkPathUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public abstract class AbstractConnectionTest { + + private final IZkConnection _connection; + + public AbstractConnectionTest(IZkConnection connection) { + _connection = connection; + } + + @Test + public void testGetChildren_OnEmptyFileSystem() throws KeeperException, InterruptedException { + InMemoryConnection connection = new InMemoryConnection(); + List children = connection.getChildren("/", false); + assertEquals(0, children.size()); + } + + @Test + public void testSequentials() throws KeeperException, InterruptedException { + String sequentialPath = _connection.create("/a", new byte[0], CreateMode.EPHEMERAL_SEQUENTIAL); + int firstSequential = Integer.parseInt(sequentialPath.substring(2)); + assertEquals("/a" + ZkPathUtil.leadingZeros(firstSequential++, 10), sequentialPath); + assertEquals("/a" + ZkPathUtil.leadingZeros(firstSequential++, 10), _connection.create("/a", new byte[0], CreateMode.EPHEMERAL_SEQUENTIAL)); + assertEquals("/a" + ZkPathUtil.leadingZeros(firstSequential++, 10), _connection.create("/a", new byte[0], CreateMode.PERSISTENT_SEQUENTIAL)); + assertEquals("/b" + ZkPathUtil.leadingZeros(firstSequential++, 10), _connection.create("/b", new byte[0], CreateMode.EPHEMERAL_SEQUENTIAL)); + assertEquals("/b" + ZkPathUtil.leadingZeros(firstSequential++, 10), _connection.create("/b", new byte[0], CreateMode.PERSISTENT_SEQUENTIAL)); + assertEquals("/a" + ZkPathUtil.leadingZeros(firstSequential++, 10), _connection.create("/a", new byte[0], CreateMode.EPHEMERAL_SEQUENTIAL)); + } + +} diff --git a/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/ContentWatcherTest.java b/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/ContentWatcherTest.java new file mode 100644 index 000000000..b078f3754 --- /dev/null +++ b/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/ContentWatcherTest.java @@ -0,0 +1,155 @@ +/** + * Copyright 2010 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.I0Itec.zkclient; + +import static org.junit.Assert.assertEquals; + +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import org.apache.log4j.Logger; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class ContentWatcherTest { + + private static final Logger LOG = Logger.getLogger(ContentWatcherTest.class); + + private static final String FILE_NAME = "/ContentWatcherTest"; + private ZkServer _zkServer; + private ZkClient _zkClient; + + @Before + public void setUp() throws Exception { + LOG.info("------------ BEFORE -------------"); + _zkServer = TestUtil.startZkServer("ContentWatcherTest", 4711); + _zkClient = _zkServer.getZkClient(); + } + + @After + public void tearDown() throws Exception { + if (_zkServer != null) { + _zkServer.shutdown(); + } + LOG.info("------------ AFTER -------------"); + } + + @Test + public void testGetContent() throws Exception { + LOG.info("--- testGetContent"); + _zkClient.createPersistent(FILE_NAME, "a"); + final ContentWatcher watcher = new ContentWatcher(_zkClient, FILE_NAME); + watcher.start(); + assertEquals("a", watcher.getContent()); + + // update the content + _zkClient.writeData(FILE_NAME, "b"); + + String contentFromWatcher = TestUtil.waitUntil("b", new Callable() { + + @Override + public String call() throws Exception { + return watcher.getContent(); + } + }, TimeUnit.SECONDS, 5); + + assertEquals("b", contentFromWatcher); + watcher.stop(); + } + + @Test + public void testGetContentWaitTillCreated() throws InterruptedException { + LOG.info("--- testGetContentWaitTillCreated"); + final Holder contentHolder = new Holder(); + + Thread thread = new Thread() { + @Override + public void run() { + ContentWatcher watcher = new ContentWatcher(_zkClient, FILE_NAME); + try { + watcher.start(); + contentHolder.set(watcher.getContent()); + watcher.stop(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }; + + thread.start(); + + // create content after 200ms + Thread.sleep(200); + _zkClient.createPersistent(FILE_NAME, "aaa"); + + // we give the thread some time to pick up the change + thread.join(1000); + assertEquals("aaa", contentHolder.get()); + } + + @Test + public void testHandlingNullContent() throws InterruptedException { + LOG.info("--- testHandlingNullContent"); + _zkClient.createPersistent(FILE_NAME, null); + ContentWatcher watcher = new ContentWatcher(_zkClient, FILE_NAME); + watcher.start(); + assertEquals(null, watcher.getContent()); + watcher.stop(); + } + + @Test(timeout = 20000) + public void testHandlingOfConnectionLoss() throws Exception { + LOG.info("--- testHandlingOfConnectionLoss"); + final Gateway gateway = new Gateway(4712, 4711); + gateway.start(); + final ZkClient zkClient = new ZkClient("localhost:4712", 5000); + + // disconnect + gateway.stop(); + + // reconnect after 250ms and create file with content + new Thread() { + @Override + public void run() { + try { + Thread.sleep(250); + gateway.start(); + zkClient.createPersistent(FILE_NAME, "aaa"); + zkClient.writeData(FILE_NAME, "b"); + } catch (Exception e) { + // ignore + } + } + }.start(); + + final ContentWatcher watcher = new ContentWatcher(zkClient, FILE_NAME); + watcher.start(); + + TestUtil.waitUntil("b", new Callable() { + + @Override + public String call() throws Exception { + return watcher.getContent(); + } + }, TimeUnit.SECONDS, 5); + assertEquals("b", watcher.getContent()); + + watcher.stop(); + zkClient.close(); + gateway.stop(); + } +} diff --git a/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/DeferredGatewayStarter.java b/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/DeferredGatewayStarter.java new file mode 100644 index 000000000..bf615dfa9 --- /dev/null +++ b/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/DeferredGatewayStarter.java @@ -0,0 +1,37 @@ +/** + * Copyright 2010 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.I0Itec.zkclient; + +public class DeferredGatewayStarter extends Thread { + + private final Gateway _zkServer; + private int _delay; + + public DeferredGatewayStarter(Gateway gateway, int delay) { + _zkServer = gateway; + _delay = delay; + } + + @Override + public void run() { + try { + Thread.sleep(_delay); + _zkServer.start(); + } catch (Exception e) { + // ignore + } + } +} \ No newline at end of file diff --git a/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/DistributedQueueTest.java b/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/DistributedQueueTest.java new file mode 100644 index 000000000..42311e9c1 --- /dev/null +++ b/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/DistributedQueueTest.java @@ -0,0 +1,127 @@ +/** + * Copyright 2010 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.I0Itec.zkclient; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.Vector; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class DistributedQueueTest { + + private ZkServer _zkServer; + private ZkClient _zkClient; + + @Before + public void setUp() throws IOException { + _zkServer = TestUtil.startZkServer("ZkClientTest-testDistributedQueue", 4711); + _zkClient = _zkServer.getZkClient(); + } + + @After + public void tearDown() { + if (_zkServer != null) { + _zkServer.shutdown(); + } + } + + @Test(timeout = 15000) + public void testDistributedQueue() { + _zkClient.createPersistent("/queue"); + + DistributedQueue distributedQueue = new DistributedQueue(_zkClient, "/queue"); + distributedQueue.offer(17L); + distributedQueue.offer(18L); + distributedQueue.offer(19L); + + assertEquals(Long.valueOf(17L), distributedQueue.poll()); + assertEquals(Long.valueOf(18L), distributedQueue.poll()); + assertEquals(Long.valueOf(19L), distributedQueue.poll()); + assertNull(distributedQueue.poll()); + } + + @Test(timeout = 15000) + public void testPeek() { + _zkClient.createPersistent("/queue"); + + DistributedQueue distributedQueue = new DistributedQueue(_zkClient, "/queue"); + distributedQueue.offer(17L); + distributedQueue.offer(18L); + + assertEquals(Long.valueOf(17L), distributedQueue.peek()); + assertEquals(Long.valueOf(17L), distributedQueue.peek()); + assertEquals(Long.valueOf(17L), distributedQueue.poll()); + assertEquals(Long.valueOf(18L), distributedQueue.peek()); + assertEquals(Long.valueOf(18L), distributedQueue.poll()); + assertNull(distributedQueue.peek()); + } + + @Test(timeout = 30000) + public void testMultipleReadingThreads() throws InterruptedException { + _zkClient.createPersistent("/queue"); + + final DistributedQueue distributedQueue = new DistributedQueue(_zkClient, "/queue"); + + // insert 100 elements + for (int i = 0; i < 100; i++) { + distributedQueue.offer(new Long(i)); + } + + // 3 reading threads + final Set readElements = Collections.synchronizedSet(new HashSet()); + List threads = new ArrayList(); + final List exceptions = new Vector(); + + for (int i = 0; i < 3; i++) { + Thread thread = new Thread() { + @Override + public void run() { + try { + while (true) { + Long value = distributedQueue.poll(); + if (value == null) { + return; + } + readElements.add(value); + } + } catch (Exception e) { + exceptions.add(e); + e.printStackTrace(); + } + } + }; + threads.add(thread); + thread.start(); + } + + for (Thread thread : threads) { + thread.join(); + } + + assertEquals(0, exceptions.size()); + assertEquals(100, readElements.size()); + } +} diff --git a/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/InMemoryConnectionTest.java b/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/InMemoryConnectionTest.java new file mode 100644 index 000000000..ec3cd65f6 --- /dev/null +++ b/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/InMemoryConnectionTest.java @@ -0,0 +1,25 @@ +/** + * Copyright 2010 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.I0Itec.zkclient; + + +public class InMemoryConnectionTest extends AbstractConnectionTest { + + public InMemoryConnectionTest() { + super(new InMemoryConnection()); + } + +} diff --git a/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/MemoryZkClientTest.java b/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/MemoryZkClientTest.java new file mode 100644 index 000000000..c05a41b15 --- /dev/null +++ b/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/MemoryZkClientTest.java @@ -0,0 +1,49 @@ +/** + * Copyright 2010 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.I0Itec.zkclient; + +import org.apache.zookeeper.CreateMode; +import org.junit.Assert; +import org.junit.Test; + +public class MemoryZkClientTest extends AbstractBaseZkClientTest { + + @Override + public void setUp() throws Exception { + super.setUp(); + _client = new ZkClient(new InMemoryConnection()); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + _client.close(); + } + + @Test + public void testGetChildren() throws Exception { + String path1 = "/a"; + String path2 = "/a/a"; + String path3 = "/a/a/a"; + + _client.create(path1, null, CreateMode.PERSISTENT); + _client.create(path2, null, CreateMode.PERSISTENT); + _client.create(path3, null, CreateMode.PERSISTENT); + Assert.assertEquals(1, _client.getChildren(path1).size()); + Assert.assertEquals(1, _client.getChildren(path2).size()); + Assert.assertEquals(0, _client.getChildren(path3).size()); + } +} diff --git a/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/ServerZkClientTest.java b/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/ServerZkClientTest.java new file mode 100644 index 000000000..76189f74a --- /dev/null +++ b/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/ServerZkClientTest.java @@ -0,0 +1,289 @@ +/** + * Copyright 2010 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.I0Itec.zkclient; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.I0Itec.zkclient.exception.ZkBadVersionException; +import org.I0Itec.zkclient.exception.ZkInterruptedException; +import org.I0Itec.zkclient.exception.ZkNoNodeException; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.data.Stat; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class ServerZkClientTest extends AbstractBaseZkClientTest { + private AtomicInteger _counter = new AtomicInteger(); + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + _zkServer = TestUtil.startZkServer("ZkClientTest_" + _counter.addAndGet(1), 4711); + _client = new ZkClient("localhost:4711", 5000); + } + + @Override + @After + public void tearDown() throws Exception { + super.tearDown(); + _client.close(); + _zkServer.shutdown(); + } + + @Test(timeout = 15000) + public void testRetryUntilConnected() throws Exception { + LOG.info("--- testRetryUntilConnected"); + Gateway gateway = new Gateway(4712, 4711); + gateway.start(); + final ZkConnection zkConnection = new ZkConnection("localhost:4712"); + final ZkClient zkClient = new ZkClient(zkConnection, 1000); + + gateway.stop(); + + // start server in 250ms + new DeferredGatewayStarter(gateway, 250).start(); + + // this should work as soon as the connection is reestablished, if it + // fails it throws a ConnectionLossException + zkClient.retryUntilConnected(new Callable() { + + @Override + public Object call() throws Exception { + zkConnection.exists("/a", false); + return null; + } + }); + + zkClient.close(); + gateway.stop(); + } + + @Test(timeout = 15000) + public void testWaitUntilConnected() throws Exception { + LOG.info("--- testWaitUntilConnected"); + ZkClient _client = new ZkClient("localhost:4711", 5000); + + _zkServer.shutdown(); + + // the _client state should change to KeeperState.Disconnected + assertTrue(_client.waitForKeeperState(KeeperState.Disconnected, 1, TimeUnit.SECONDS)); + + // connection should not be possible and timeout after 100ms + assertFalse(_client.waitUntilConnected(100, TimeUnit.MILLISECONDS)); + } + + @Test(timeout = 15000) + public void testRetryUntilConnected_SessionExpiredException() { + LOG.info("--- testRetryUntilConnected_SessionExpiredException"); + + // Use a tick time of 100ms, because the minimum session timeout is 2 x tick-time. + // ZkServer zkServer = TestUtil.startZkServer("ZkClientTest-testSessionExpiredException", 4711, 100); + Gateway gateway = new Gateway(4712, 4711); + gateway.start(); + + // Use a session timeout of 200ms + final ZkClient zkClient = new ZkClient("localhost:4712", 200, 5000); + + gateway.stop(); + + // Start server in 600ms, the session should have expired by then + new DeferredGatewayStarter(gateway, 600).start(); + + // This should work as soon as a new session has been created (and the connection is reestablished), if it fails + // it throws a SessionExpiredException + zkClient.retryUntilConnected(new Callable() { + + @Override + public Object call() throws Exception { + zkClient.exists("/a"); + return null; + } + }); + + zkClient.close(); + // zkServer.shutdown(); + gateway.stop(); + } + + @Test(timeout = 15000) + public void testChildListenerAfterSessionExpiredException() throws Exception { + LOG.info("--- testChildListenerAfterSessionExpiredException"); + + int sessionTimeout = 200; + ZkClient connectedClient = _zkServer.getZkClient(); + connectedClient.createPersistent("/root"); + + Gateway gateway = new Gateway(4712, 4711); + gateway.start(); + + final ZkClient disconnectedZkClient = new ZkClient("localhost:4712", sessionTimeout, 5000); + final Holder> children = new Holder>(); + disconnectedZkClient.subscribeChildChanges("/root", new IZkChildListener() { + + @Override + public void handleChildChange(String parentPath, List currentChilds) throws Exception { + children.set(currentChilds); + } + }); + + gateway.stop(); + + // The connected client now created a new child node + connectedClient.createPersistent("/root/node"); + + // Wait for 3 x sessionTimeout, the session should have expired by then and start the gateway again + Thread.sleep(sessionTimeout * 3); + gateway.start(); + + Boolean hasOneChild = TestUtil.waitUntil(true, new Callable() { + + @Override + public Boolean call() throws Exception { + return children.get() != null && children.get().size() == 1; + } + }, TimeUnit.SECONDS, 5); + + assertTrue(hasOneChild); + + disconnectedZkClient.close(); + gateway.stop(); + } + + @Test(timeout = 10000) + public void testZkClientConnectedToGatewayClosesQuickly() throws Exception { + LOG.info("--- testZkClientConnectedToGatewayClosesQuickly"); + final Gateway gateway = new Gateway(4712, 4711); + gateway.start(); + + ZkClient zkClient = new ZkClient("localhost:4712", 5000); + zkClient.close(); + + gateway.stop(); + } + + @Test + public void testCountChildren() throws InterruptedException { + assertEquals(0, _client.countChildren("/a")); + _client.createPersistent("/a"); + assertEquals(0, _client.countChildren("/a")); + _client.createPersistent("/a/b"); + assertEquals(1, _client.countChildren("/a")); + + // test concurrent access + Thread thread = new Thread() { + @Override + public void run() { + try { + while (!isInterrupted()) { + _client.createPersistent("/test"); + _client.delete("/test"); + } + } catch (ZkInterruptedException e) { + // ignore and finish + } + } + }; + + thread.start(); + for (int i = 0; i < 1000; i++) { + assertEquals(0, _client.countChildren("/test")); + } + thread.interrupt(); + thread.join(); + } + + @Test + public void testReadDataWithStat() { + _client.createPersistent("/a", "data"); + Stat stat = new Stat(); + _client.readData("/a", stat); + assertEquals(0, stat.getVersion()); + assertTrue(stat.getDataLength() > 0); + } + + @Test + public void testWriteDataWithExpectedVersion() { + _client.createPersistent("/a", "data"); + _client.writeData("/a", "data2", 0); + + try { + _client.writeData("/a", "data3", 0); + fail("expected exception"); + } catch (ZkBadVersionException e) { + // expected + } + } + + @Test + public void testCreateWithParentDirs() { + String path = "/a/b"; + try { + _client.createPersistent(path, false); + fail("should throw exception"); + } catch (ZkNoNodeException e) { + assertFalse(_client.exists(path)); + } + + _client.createPersistent(path, true); + assertTrue(_client.exists(path)); + } + + @Test + public void testUpdateSerialized() throws InterruptedException { + _client.createPersistent("/a", 0); + + int numberOfThreads = 2; + final int numberOfIncrementsPerThread = 100; + + List threads = new ArrayList(); + for (int i = 0; i < numberOfThreads; i++) { + Thread thread = new Thread() { + @Override + public void run() { + for (int j = 0; j < numberOfIncrementsPerThread; j++) { + _client.updateDataSerialized("/a", new DataUpdater() { + + @Override + public Integer update(Integer integer) { + return integer + 1; + } + }); + } + } + }; + thread.start(); + threads.add(thread); + } + + for (Thread thread : threads) { + thread.join(); + } + + Integer finalValue = _client.readData("/a"); + assertEquals(numberOfIncrementsPerThread * numberOfThreads, finalValue.intValue()); + } +} diff --git a/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/TestUtil.java b/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/TestUtil.java new file mode 100644 index 000000000..2b0919fbd --- /dev/null +++ b/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/TestUtil.java @@ -0,0 +1,104 @@ +/** + * Copyright 2010 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.I0Itec.zkclient; + +import static org.mockito.Mockito.mock; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.io.FileUtils; +import org.mockito.exceptions.base.MockitoAssertionError; + +public class TestUtil { + + /** + * This waits until the provided {@link Callable} returns an object that is equals to the given expected value or + * the timeout has been reached. In both cases this method will return the return value of the latest + * {@link Callable} execution. + * + * @param expectedValue + * The expected value of the callable. + * @param callable + * The callable. + * @param + * The return type of the callable. + * @param timeUnit + * The timeout timeunit. + * @param timeout + * The timeout. + * @return the return value of the latest {@link Callable} execution. + * @throws Exception + * @throws InterruptedException + */ + public static T waitUntil(T expectedValue, Callable callable, TimeUnit timeUnit, long timeout) throws Exception { + long startTime = System.currentTimeMillis(); + do { + T actual = callable.call(); + if (expectedValue.equals(actual)) { + return actual; + } + if (System.currentTimeMillis() > startTime + timeUnit.toMillis(timeout)) { + return actual; + } + Thread.sleep(50); + } while (true); + } + + /** + * This waits until a mockito verification passed (which is provided in the runnable). This waits until the + * virification passed or the timeout has been reached. If the timeout has been reached this method will rethrow the + * {@link MockitoAssertionError} that comes from the mockito verification code. + * + * @param runnable + * The runnable containing the mockito verification. + * @param timeUnit + * The timeout timeunit. + * @param timeout + * The timeout. + * @throws InterruptedException + */ + public static void waitUntilVerified(Runnable runnable, TimeUnit timeUnit, int timeout) throws InterruptedException { + long startTime = System.currentTimeMillis(); + do { + MockitoAssertionError exception = null; + try { + runnable.run(); + } catch (MockitoAssertionError e) { + exception = e; + } + if (exception == null) { + return; + } + if (System.currentTimeMillis() > startTime + timeUnit.toMillis(timeout)) { + throw exception; + } + Thread.sleep(50); + } while (true); + } + + public static ZkServer startZkServer(String testName, int port) throws IOException { + String dataPath = "./build/test/" + testName + "/data"; + String logPath = "./build/test/" + testName + "/log"; + FileUtils.deleteDirectory(new File(dataPath)); + FileUtils.deleteDirectory(new File(logPath)); + ZkServer zkServer = new ZkServer(dataPath, logPath, mock(IDefaultNameSpace.class), port, ZkServer.DEFAULT_TICK_TIME, 100); + zkServer.start(); + return zkServer; + } +} \ No newline at end of file diff --git a/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/ZkClientSerializationTest.java b/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/ZkClientSerializationTest.java new file mode 100644 index 000000000..e523a1b2e --- /dev/null +++ b/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/ZkClientSerializationTest.java @@ -0,0 +1,52 @@ +/** + * Copyright 2010 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.I0Itec.zkclient; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import java.util.Random; + +import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer; +import org.I0Itec.zkclient.serialize.SerializableSerializer; +import org.I0Itec.zkclient.testutil.ZkTestSystem; +import org.junit.Rule; +import org.junit.Test; + +public class ZkClientSerializationTest { + + @Rule + public ZkTestSystem _zk = ZkTestSystem.getInstance(); + + @Test + public void testBytes() throws Exception { + ZkClient zkClient = new ZkClient(_zk.getZkServerAddress(), 2000, 2000, new BytesPushThroughSerializer()); + byte[] bytes = new byte[100]; + new Random().nextBytes(bytes); + zkClient.createPersistent("/a", bytes); + byte[] readBytes = zkClient.readData("/a"); + assertArrayEquals(bytes, readBytes); + } + + @Test + public void testSerializables() throws Exception { + ZkClient zkClient = new ZkClient(_zk.getZkServerAddress(), 2000, 2000, new SerializableSerializer()); + String data = "hello world"; + zkClient.createPersistent("/a", data); + String readData = zkClient.readData("/a"); + assertEquals(data, readData); + } +} diff --git a/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/ZkConnectionTest.java b/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/ZkConnectionTest.java new file mode 100644 index 000000000..5ace6f435 --- /dev/null +++ b/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/ZkConnectionTest.java @@ -0,0 +1,36 @@ +/** + * Copyright 2010 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.I0Itec.zkclient; + +import org.I0Itec.zkclient.testutil.ZkTestSystem; +import org.junit.Rule; + +public class ZkConnectionTest extends AbstractConnectionTest { + + @Rule + public ZkTestSystem _zk = ZkTestSystem.getInstance(); + + public ZkConnectionTest() { + super(establishConnection()); + } + + private static ZkConnection establishConnection() { + ZkConnection zkConnection = new ZkConnection("localhost:" + ZkTestSystem.getInstance().getZkServer().getPort()); + new ZkClient(zkConnection);// connect + return zkConnection; + } + +} diff --git a/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/testutil/ZkPathUtil.java b/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/testutil/ZkPathUtil.java new file mode 100644 index 000000000..5dfc23e4d --- /dev/null +++ b/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/testutil/ZkPathUtil.java @@ -0,0 +1,80 @@ +/* + * Copyright 2012 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.I0Itec.zkclient.testutil; + +import java.util.List; + +import org.I0Itec.zkclient.ZkClient; + +// JLZ - copied from https://raw.github.com/sgroschupf/zkclient/master/src/main/java/org/I0Itec/zkclient/util/ZkPathUtil.java +public class ZkPathUtil { + + public static String leadingZeros(long number, int numberOfLeadingZeros) { + return String.format("%0" + numberOfLeadingZeros + "d", number); + } + + public static String toString(ZkClient zkClient) { + return toString(zkClient, "/", PathFilter.ALL); + } + + public static String toString(ZkClient zkClient, String startPath, PathFilter pathFilter) { + final int level = 1; + final StringBuilder builder = new StringBuilder("+ (" + startPath + ")"); + builder.append("\n"); + addChildrenToStringBuilder(zkClient, pathFilter, level, builder, startPath); + return builder.toString(); + } + + private static void addChildrenToStringBuilder(ZkClient zkClient, PathFilter pathFilter, final int level, final StringBuilder builder, final String startPath) { + final List children = zkClient.getChildren(startPath); + for (final String node : children) { + String nestedPath; + if (startPath.endsWith("/")) { + nestedPath = startPath + node; + } else { + nestedPath = startPath + "/" + node; + } + if (pathFilter.showChilds(nestedPath)) { + builder.append(getSpaces(level - 1) + "'-" + "+" + node + "\n"); + addChildrenToStringBuilder(zkClient, pathFilter, level + 1, builder, nestedPath); + } else { + builder.append(getSpaces(level - 1) + "'-" + "-" + node + " (contents hidden)\n"); + } + } + } + + private static String getSpaces(final int level) { + String s = ""; + for (int i = 0; i < level; i++) { + s += " "; + } + return s; + } + + public static interface PathFilter { + + public static PathFilter ALL = new PathFilter() { + + @Override + public boolean showChilds(String path) { + return true; + } + }; + + boolean showChilds(String path); + } + +} \ No newline at end of file diff --git a/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/testutil/ZkTestSystem.java b/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/testutil/ZkTestSystem.java new file mode 100644 index 000000000..00508dfa0 --- /dev/null +++ b/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/testutil/ZkTestSystem.java @@ -0,0 +1,117 @@ +/** + * Copyright 2010 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.I0Itec.zkclient.testutil; + +import static org.mockito.Mockito.mock; + +import java.io.File; +import java.io.IOException; +import java.util.List; + +import org.I0Itec.zkclient.IDefaultNameSpace; +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkServer; +import org.apache.commons.io.FileUtils; +import org.apache.log4j.Logger; +import org.junit.rules.ExternalResource; + +public class ZkTestSystem extends ExternalResource { + + protected static final Logger LOG = Logger.getLogger(ZkTestSystem.class); + + private static int PORT = 10002; + private static ZkTestSystem _instance; + private ZkServer _zkServer; + + private ZkTestSystem() { + LOG.info("~~~~~~~~~~~~~~~ starting zk system ~~~~~~~~~~~~~~~"); + String baseDir = "build/zkdata"; + try { + FileUtils.deleteDirectory(new File(baseDir)); + } catch (IOException e) { + throw new RuntimeException(e); + } + String dataDir = baseDir + "/data"; + String logDir = baseDir + "/log"; + _zkServer = new ZkServer(dataDir, logDir, mock(IDefaultNameSpace.class), PORT); + _zkServer.start(); + LOG.info("~~~~~~~~~~~~~~~ zk system started ~~~~~~~~~~~~~~~"); + } + + @Override + // executed before every test method + protected void before() throws Throwable { + cleanupZk(); + } + + @Override + // executed after every test method + protected void after() { + cleanupZk(); + } + + private void cleanupZk() { + LOG.info("cleanup zk namespace"); + List children = getZkClient().getChildren("/"); + for (String child : children) { + if (!child.equals("zookeeper")) { + getZkClient().deleteRecursive("/" + child); + } + } + LOG.info("unsubscribing " + getZkClient().numberOfListeners() + " listeners"); + getZkClient().unsubscribeAll(); + } + + public static ZkTestSystem getInstance() { + if (_instance == null) { + _instance = new ZkTestSystem(); + _instance.cleanupZk(); + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + LOG.info("shutting zk down"); + getInstance().getZkServer().shutdown(); + } + }); + } + return _instance; + } + + public ZkServer getZkServer() { + return _zkServer; + } + + public String getZkServerAddress() { + return "localhost:" + getServerPort(); + } + + public ZkClient getZkClient() { + return _zkServer.getZkClient(); + } + + public int getServerPort() { + return PORT; + } + + public ZkClient createZkClient() { + return new ZkClient("localhost:" + PORT); + } + + public void showStructure() { + getZkClient().showFolders(System.out); + } + +} diff --git a/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/util/ZkPathUtilTest.java b/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/util/ZkPathUtilTest.java new file mode 100644 index 000000000..eb76e6976 --- /dev/null +++ b/curator-x-zkclient-bridge/src/test/java/org/I0Itec/zkclient/util/ZkPathUtilTest.java @@ -0,0 +1,70 @@ +/** + * Copyright 2010 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.I0Itec.zkclient.util; + +import junit.framework.TestCase; + +import org.I0Itec.zkclient.TestUtil; +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkServer; +import org.I0Itec.zkclient.testutil.ZkPathUtil; + +public class ZkPathUtilTest extends TestCase { + + protected ZkServer _zkServer; + protected ZkClient _client; + + public void testToString() throws Exception { + _zkServer = TestUtil.startZkServer("ZkPathUtilTest", 4711); + _client = new ZkClient("localhost:4711", 5000); + final String file1 = "/files/file1"; + final String file2 = "/files/file2"; + final String file3 = "/files/file2/file3"; + _client.createPersistent(file1, true); + _client.createPersistent(file2, true); + _client.createPersistent(file3, true); + + String stringRepresentation = ZkPathUtil.toString(_client); + System.out.println(stringRepresentation); + System.out.println("-------------------------"); + assertTrue(stringRepresentation.contains("file1")); + assertTrue(stringRepresentation.contains("file2")); + assertTrue(stringRepresentation.contains("file3")); + + // path filtering + stringRepresentation = ZkPathUtil.toString(_client, "/", new ZkPathUtil.PathFilter() { + @Override + public boolean showChilds(String path) { + return !file2.equals(path); + } + }); + assertTrue(stringRepresentation.contains("file1")); + assertTrue(stringRepresentation.contains("file2")); + assertFalse(stringRepresentation.contains("file3")); + + // start path + stringRepresentation = ZkPathUtil.toString(_client, file2, ZkPathUtil.PathFilter.ALL); + assertFalse(stringRepresentation.contains("file1")); + assertTrue(stringRepresentation.contains("file2")); + assertTrue(stringRepresentation.contains("file3")); + + _zkServer.shutdown(); + } + + public void testLeadingZeros() throws Exception { + assertEquals("0000000001", ZkPathUtil.leadingZeros(1, 10)); + } +} diff --git a/settings.gradle b/settings.gradle index 441feda90..3c3c22d9b 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1 +1 @@ -include 'curator-client','curator-framework','curator-recipes','curator-test','curator-x-discovery','curator-x-discovery-server' +include 'curator-client','curator-framework','curator-recipes','curator-test','curator-x-discovery','curator-x-discovery-server','curator-x-zkclient-bridge'