From 3029856c894a72ed2c5762e0c6acd2c0a8cd3937 Mon Sep 17 00:00:00 2001 From: randgalt Date: Sun, 27 Sep 2015 16:59:48 -0500 Subject: [PATCH 1/3] First pass implementation of group membership --- .../framework/CuratorFrameworkFactory.java | 31 ++-- .../framework/recipes/nodes/GroupMember.java | 140 ++++++++++++++++++ .../site/confluence/group-member.confluence | 42 ++++++ .../src/site/confluence/index.confluence | 3 +- .../recipes/nodes/TestGroupMember.java | 79 ++++++++++ 5 files changed, 281 insertions(+), 14 deletions(-) create mode 100644 curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java create mode 100644 curator-recipes/src/site/confluence/group-member.confluence create mode 100644 curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java index dcb2ee653f..41ff9cd03b 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java @@ -100,6 +100,24 @@ public static CuratorFramework newClient(String connectString, int sessionTimeou build(); } + /** + * Return the local address as bytes that can be used as a node payload + * + * @return local address bytes + */ + public static byte[] getLocalAddress() + { + try + { + return InetAddress.getLocalHost().getHostAddress().getBytes(); + } + catch ( UnknownHostException ignore ) + { + // ignore + } + return new byte[0]; + } + public static class Builder { private EnsembleProvider ensembleProvider; @@ -465,19 +483,6 @@ private Builder() } } - private static byte[] getLocalAddress() - { - try - { - return InetAddress.getLocalHost().getHostAddress().getBytes(); - } - catch ( UnknownHostException ignore ) - { - // ignore - } - return new byte[0]; - } - private CuratorFrameworkFactory() { } diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java new file mode 100644 index 0000000000..5aa8ca212b --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.nodes; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.utils.CloseableUtils; +import org.apache.curator.utils.ZKPaths; +import java.io.Closeable; +import java.util.Arrays; +import java.util.Map; + +/** + * Group membership management. Adds this instance into a group and + * keeps a cache of members in the group + */ +public class GroupMember implements Closeable +{ + private final PersistentEphemeralNode pen; + private final PathChildrenCache cache; + private final String thisId; + private final byte[] payload; + + /** + * @param client client + * @param membershipPath the path to use for membership + * @param thisId ID of this group member. MUST be unique for the group + */ + public GroupMember(CuratorFramework client, String membershipPath, String thisId) + { + this(client, membershipPath, thisId, CuratorFrameworkFactory.getLocalAddress()); + } + + /** + * @param client client + * @param membershipPath the path to use for membership + * @param thisId ID of this group member. MUST be unique for the group + * @param payload the payload to write in our member node + */ + public GroupMember(CuratorFramework client, String membershipPath, String thisId, byte[] payload) + { + this.thisId = Preconditions.checkNotNull(thisId, "thisId cannot be null"); + this.payload = Arrays.copyOf(payload, payload.length); + + cache = newPathChildrenCache(client, membershipPath); + pen = newPersistentEphemeralNode(client, membershipPath, thisId, payload); + } + + /** + * Start the group membership. Register thisId as a member and begin + * caching all members + */ + public void start() + { + pen.start(); + try + { + cache.start(); + } + catch ( Exception e ) + { + Throwables.propagate(e); + } + } + + /** + * Have thisId leave the group and stop caching membership + */ + @Override + public void close() + { + CloseableUtils.closeQuietly(cache); + CloseableUtils.closeQuietly(pen); + } + + /** + * Return the current view of membership. The keys are the IDs + * of the members. The values are each member's payload + * + * @return membership + */ + public Map getCurrentMembers() + { + ImmutableMap.Builder builder = ImmutableMap.builder(); + boolean thisIdAdded = false; + for ( ChildData data : cache.getCurrentData() ) + { + String id = idFromPath(data.getPath()); + thisIdAdded = thisIdAdded || id.equals(thisId); + builder.put(id, data.getData()); + } + if ( !thisIdAdded ) + { + builder.put(thisId, payload); // this instance is always a member + } + return builder.build(); + } + + /** + * Given a full ZNode path, return the member ID + * + * @param path full ZNode path + * @return id + */ + public String idFromPath(String path) + { + return ZKPaths.getNodeFromPath(path); + } + + protected PersistentEphemeralNode newPersistentEphemeralNode(CuratorFramework client, String membershipPath, String thisId, byte[] payload) + { + return new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, ZKPaths.makePath(membershipPath, thisId), payload); + } + + protected PathChildrenCache newPathChildrenCache(CuratorFramework client, String membershipPath) + { + return new PathChildrenCache(client, membershipPath, true); + } +} diff --git a/curator-recipes/src/site/confluence/group-member.confluence b/curator-recipes/src/site/confluence/group-member.confluence new file mode 100644 index 0000000000..dcb27b3f10 --- /dev/null +++ b/curator-recipes/src/site/confluence/group-member.confluence @@ -0,0 +1,42 @@ +h1. Group Member + +h2. Description +Group membership management. Adds this instance into a group and keeps a cache of members in the group. + +h2. Participating Classes +* GroupMember + +h2. Usage +h3. Creating a GroupMember +{code} +public GroupMember(CuratorFramework client, + String membershipPath, + String thisId, + byte[] payload) +Parameters: +client - client instance +membershipPath - the path to use for membership +thisId - ID of this group member. MUST be unique for the group +payload - the payload to write in our member node +{code} + +h3. General Usage +GroupMember must be started: +{code} +group.start(); +{code} + +When you are through with the GroupMember instance, you should call close: +{code} +group.close(); +{code} + +NOTE: this will remove the instance from the group + +You can get a current view of the members by calling: +{code} +group.getCurrentMembers(); +{code} + +h2. Error Handling +GroupMember instances internally handle all error states recreating the node as necessary. diff --git a/curator-recipes/src/site/confluence/index.confluence b/curator-recipes/src/site/confluence/index.confluence index 4f3a0324f1..71de8dfe8d 100644 --- a/curator-recipes/src/site/confluence/index.confluence +++ b/curator-recipes/src/site/confluence/index.confluence @@ -29,7 +29,8 @@ regarding "Curator Recipes Own Their ZNode/Paths". |[[Tree Cache|tree-cache.html]] \- A utility that attempts to keep all data from all children of a ZK path locally cached. This class will watch the ZK path, respond to update/create/delete events, pull down the data, etc. You can register a listener that will get notified when changes occur.| ||Nodes|| -|[[Persistent Ephemeral Node|persistent-ephemeral-node.html]] \- An ephemeral node that attempts to stay present in ZooKeeper, even through connection and session interruptions..| +|[[Persistent Ephemeral Node|persistent-ephemeral-node.html]] \- An ephemeral node that attempts to stay present in ZooKeeper, even through connection and session interruptions.| +|[Group Member|group-member-node.html]] \- Group membership management. Adds this instance into a group and keeps a cache of members in the group.| ||Queues|| |[[Distributed Queue|distributed-queue.html]] \- An implementation of the Distributed Queue ZK recipe. Items put into the queue are guaranteed to be ordered (by means of ZK's PERSISTENT_SEQUENTIAL node). If a single consumer takes items out of the queue, they will be ordered FIFO. If ordering is important, use a LeaderSelector to nominate a single consumer.| diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java new file mode 100644 index 0000000000..3d4b9512a2 --- /dev/null +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.nodes; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.test.Timing; +import org.apache.curator.utils.CloseableUtils; +import org.testng.Assert; +import org.testng.annotations.Test; +import java.util.Map; + +public class TestGroupMember extends BaseClassForTests +{ + // NOTE - don't need many tests as this class is just a wrapper around two existing recipes + + @Test + public void testBasic() throws Exception + { + Timing timing = new Timing(); + GroupMember groupMember1 = null; + GroupMember groupMember2 = null; + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + try + { + client.start(); + + groupMember1 = new GroupMember(client, "/member", "1"); + Assert.assertTrue(groupMember1.getCurrentMembers().containsKey("1")); + groupMember1.start(); + + groupMember2 = new GroupMember(client, "/member", "2"); + groupMember2.start(); + + timing.sleepABit(); + + Map currentMembers1 = groupMember1.getCurrentMembers(); + Map currentMembers2 = groupMember2.getCurrentMembers(); + Assert.assertEquals(currentMembers1.size(), 2); + Assert.assertEquals(currentMembers2.size(), 2); + Assert.assertEquals(currentMembers1, currentMembers2); + Assert.assertTrue(currentMembers1.containsKey("1")); + Assert.assertTrue(currentMembers1.containsKey("2")); + + groupMember2.close(); + + timing.sleepABit(); + + currentMembers1 = groupMember1.getCurrentMembers(); + Assert.assertEquals(currentMembers1.size(), 1); + Assert.assertTrue(currentMembers1.containsKey("1")); + Assert.assertFalse(currentMembers1.containsKey("2")); + } + finally + { + CloseableUtils.closeQuietly(groupMember1); + CloseableUtils.closeQuietly(groupMember2); + CloseableUtils.closeQuietly(client); + } + } +} From a49d2bb4f83a9627ad546a149a8f77831d174ef1 Mon Sep 17 00:00:00 2001 From: randgalt Date: Sun, 27 Sep 2015 17:06:46 -0500 Subject: [PATCH 2/3] api for setting data --- .../framework/recipes/nodes/GroupMember.java | 22 +++++++++++++++---- .../nodes/PersistentEphemeralNode.java | 7 +++++- .../recipes/nodes/TestGroupMember.java | 7 ++++++ 3 files changed, 31 insertions(+), 5 deletions(-) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java index 5aa8ca212b..b914ba4e30 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java @@ -28,7 +28,6 @@ import org.apache.curator.utils.CloseableUtils; import org.apache.curator.utils.ZKPaths; import java.io.Closeable; -import java.util.Arrays; import java.util.Map; /** @@ -40,7 +39,6 @@ public class GroupMember implements Closeable private final PersistentEphemeralNode pen; private final PathChildrenCache cache; private final String thisId; - private final byte[] payload; /** * @param client client @@ -61,7 +59,6 @@ public GroupMember(CuratorFramework client, String membershipPath, String thisId public GroupMember(CuratorFramework client, String membershipPath, String thisId, byte[] payload) { this.thisId = Preconditions.checkNotNull(thisId, "thisId cannot be null"); - this.payload = Arrays.copyOf(payload, payload.length); cache = newPathChildrenCache(client, membershipPath); pen = newPersistentEphemeralNode(client, membershipPath, thisId, payload); @@ -84,6 +81,23 @@ public void start() } } + /** + * Change the data stored in this instance's node + * + * @param data new data (cannot be null) + */ + public void setThisData(byte[] data) + { + try + { + pen.setData(data); + } + catch ( Exception e ) + { + Throwables.propagate(e); + } + } + /** * Have thisId leave the group and stop caching membership */ @@ -112,7 +126,7 @@ public Map getCurrentMembers() } if ( !thisIdAdded ) { - builder.put(thisId, payload); // this instance is always a member + builder.put(thisId, pen.getData()); // this instance is always a member } return builder.build(); } diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java index 7e00e109ea..ff9cb12791 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java @@ -352,7 +352,12 @@ public void setData(byte[] data) throws Exception } } - private byte[] getData() { + /** + * Return the current value of our data + * + * @return our data + */ + public byte[] getData() { return this.data.get(); } diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java index 3d4b9512a2..b67831da46 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java @@ -68,6 +68,13 @@ public void testBasic() throws Exception Assert.assertEquals(currentMembers1.size(), 1); Assert.assertTrue(currentMembers1.containsKey("1")); Assert.assertFalse(currentMembers1.containsKey("2")); + + groupMember1.setThisData("something".getBytes()); + + timing.sleepABit(); + currentMembers1 = groupMember1.getCurrentMembers(); + Assert.assertTrue(currentMembers1.containsKey("1")); + Assert.assertEquals(currentMembers1.get("1"), "something".getBytes()); } finally { From 275b1d5e9f13c8d4dd76961a2fc6f2c86536cf9b Mon Sep 17 00:00:00 2001 From: randgalt Date: Sun, 27 Sep 2015 17:22:15 -0500 Subject: [PATCH 3/3] updated doc --- curator-recipes/src/site/confluence/group-member.confluence | 2 ++ curator-recipes/src/site/confluence/index.confluence | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/curator-recipes/src/site/confluence/group-member.confluence b/curator-recipes/src/site/confluence/group-member.confluence index dcb27b3f10..a3706756f5 100644 --- a/curator-recipes/src/site/confluence/group-member.confluence +++ b/curator-recipes/src/site/confluence/group-member.confluence @@ -5,6 +5,8 @@ Group membership management. Adds this instance into a group and keeps a cache o h2. Participating Classes * GroupMember +* PersistentEphemeralNode +* PathChildrenCache h2. Usage h3. Creating a GroupMember diff --git a/curator-recipes/src/site/confluence/index.confluence b/curator-recipes/src/site/confluence/index.confluence index 71de8dfe8d..01dfc7e934 100644 --- a/curator-recipes/src/site/confluence/index.confluence +++ b/curator-recipes/src/site/confluence/index.confluence @@ -30,7 +30,7 @@ regarding "Curator Recipes Own Their ZNode/Paths". ||Nodes|| |[[Persistent Ephemeral Node|persistent-ephemeral-node.html]] \- An ephemeral node that attempts to stay present in ZooKeeper, even through connection and session interruptions.| -|[Group Member|group-member-node.html]] \- Group membership management. Adds this instance into a group and keeps a cache of members in the group.| +|[Group Member|group-member.html]] \- Group membership management. Adds this instance into a group and keeps a cache of members in the group.| ||Queues|| |[[Distributed Queue|distributed-queue.html]] \- An implementation of the Distributed Queue ZK recipe. Items put into the queue are guaranteed to be ordered (by means of ZK's PERSISTENT_SEQUENTIAL node). If a single consumer takes items out of the queue, they will be ordered FIFO. If ordering is important, use a LeaderSelector to nominate a single consumer.|