From 840afd3158e825ccf038ab4cc272fbc357b8c7a1 Mon Sep 17 00:00:00 2001 From: Shivram Khandeparker Date: Tue, 5 Apr 2016 22:02:34 +0530 Subject: [PATCH 1/2] Added support for listenable events in GroupMember --- .../framework/recipes/nodes/GroupData.java | 94 +++++++++++++++++++ .../framework/recipes/nodes/GroupMember.java | 93 +++++++++++++++++- .../recipes/nodes/GroupMemberEvent.java | 75 +++++++++++++++ .../recipes/nodes/GroupMemberListener.java | 35 +++++++ 4 files changed, 295 insertions(+), 2 deletions(-) create mode 100644 curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupData.java create mode 100644 curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMemberEvent.java create mode 100644 curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMemberListener.java diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupData.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupData.java new file mode 100644 index 0000000000..cc840d5310 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupData.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.nodes; + +import java.util.Arrays; + +public class GroupData +{ + private final String id; + private final byte[] data; + + public GroupData(String id, byte[] data) + { + this.id = id; + this.data = data; + } + + @Override + public boolean equals(Object o) + { + if ( this == o ) + { + return true; + } + if ( o == null || getClass() != o.getClass() ) + { + return false; + } + + GroupData groupData = (GroupData) o; + + if (!id.equals(groupData.id)) + { + return false; + } + return Arrays.equals(data, groupData.data); + } + + @Override + public int hashCode() + { + int result = id.hashCode(); + result = 31 * result + Arrays.hashCode(data); + return result; + } + + /** + *

Returns the data associated with this group member

+ * + *

NOTE: the byte array returned is the raw reference of this instance's field. If you change + * the values in the array any other callers to this method will see the change.

+ * + * @return node data or null + */ + public byte[] getData() + { + return data; + } + + /** + * Returns the id of the group member + * + * @return id or null + */ + public String getId() + { + return id; + } + + @Override + public String toString() + { + return "GroupData{" + + "id='" + id + '\'' + + ", data=" + Arrays.toString(data) + + '}'; + } +} diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java index 8cd1f65afb..967b6335b8 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java @@ -18,16 +18,23 @@ */ package org.apache.curator.framework.recipes.nodes; +import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.listen.ListenerContainer; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.utils.ThreadUtils; import org.apache.curator.utils.ZKPaths; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.Closeable; import java.util.Map; @@ -35,11 +42,47 @@ * Group membership management. Adds this instance into a group and * keeps a cache of members in the group */ -public class GroupMember implements Closeable -{ +public class GroupMember implements Closeable { + private final Logger log = LoggerFactory.getLogger(getClass()); + private final CuratorFramework client; private final PersistentEphemeralNode pen; private final PathChildrenCache cache; private final String thisId; + private ListenerContainer listeners = new ListenerContainer(); + private final PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() + { + @Override + public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { + ChildData data = event.getData(); + GroupData groupData = new GroupData(idFromPath(data.getPath()), data.getData()); + + switch( event.getType() ) + { + case CHILD_ADDED: + { + callListeners(new GroupMemberEvent(GroupMemberEvent.Type.MEMBER_JOINED, groupData)); + break; + } + + case CHILD_REMOVED: + { + callListeners(new GroupMemberEvent(GroupMemberEvent.Type.MEMBER_LEFT, groupData)); + break; + } + + case CHILD_UPDATED: + { + callListeners(new GroupMemberEvent(GroupMemberEvent.Type.MEMBER_UPDATED, groupData)); + break; + } + + default: + { + break; + } + } + } + }; /** * @param client client @@ -59,6 +102,7 @@ public GroupMember(CuratorFramework client, String membershipPath, String thisId */ public GroupMember(CuratorFramework client, String membershipPath, String thisId, byte[] payload) { + this.client = client; this.thisId = Preconditions.checkNotNull(thisId, "thisId cannot be null"); cache = newPathChildrenCache(client, membershipPath); @@ -75,6 +119,7 @@ public void start() try { cache.start(); + cache.getListenable().addListener(childrenCacheListener); } catch ( Exception e ) { @@ -101,6 +146,50 @@ public void setThisData(byte[] data) } } + /** + * Return the group member listenable + * + * @return listenable + */ + public ListenerContainer getListenable() + { + return listeners; + } + + /** + * Default behavior is just to log the exception + * + * @param e the exception + */ + protected void handleException(Throwable e) + { + log.error("", e); + } + + void callListeners(final GroupMemberEvent event) + { + listeners.forEach + ( + new Function() + { + @Override + public Void apply(GroupMemberListener listener) + { + try + { + listener.groupEvent(client, event); + } + catch ( Exception e ) + { + ThreadUtils.checkInterrupted(e); + handleException(e); + } + return null; + } + } + ); + } + /** * Have thisId leave the group and stop caching membership */ diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMemberEvent.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMemberEvent.java new file mode 100644 index 0000000000..905747e02f --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMemberEvent.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.nodes; + +/** + * POJO that abstracts a change to a group membership + */ +public class GroupMemberEvent +{ + private final Type type; + private final GroupData data; + + /** + * Type of change + */ + public enum Type + { + /** + * A new member joined group + */ + MEMBER_JOINED, + + /** + * An existing member left the group + */ + MEMBER_LEFT, + + /** + * The data for a member was updated + */ + MEMBER_UPDATED + } + + /** + * @param type event type + * @param data event data or null + */ + public GroupMemberEvent(Type type, GroupData data) + { + this.type = type; + this.data = data; + } + + /** + * @return change type + */ + public Type getType() + { + return type; + } + + /** + * @return the node's data + */ + public GroupData getData() + { + return data; + } +} diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMemberListener.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMemberListener.java new file mode 100644 index 0000000000..cd1c83cbff --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMemberListener.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.nodes; + +import org.apache.curator.framework.CuratorFramework; + +/** + * Listener for GroupMember changes + */ +public interface GroupMemberListener { + /** + * Called when a change has occurred + * + * @param client the client + * @param event describes the change + * @throws Exception errors + */ + public void groupEvent(CuratorFramework client, GroupMemberEvent event) throws Exception; +} From debad0ab1cbbfd754f80d51ce6b24bca0b0e4034 Mon Sep 17 00:00:00 2001 From: Shivram Khandeparker Date: Tue, 5 Apr 2016 22:19:41 +0530 Subject: [PATCH 2/2] Added test case for GroupMemberListener --- .../nodes/TestGroupMemberListener.java | 89 +++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMemberListener.java diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMemberListener.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMemberListener.java new file mode 100644 index 0000000000..7540df5e02 --- /dev/null +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMemberListener.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.nodes; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.test.Timing; +import org.apache.curator.utils.CloseableUtils; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.util.concurrent.CountDownLatch; + +public class TestGroupMemberListener extends BaseClassForTests +{ + @Test + public void testBasic() throws Exception + { + Timing timing = new Timing(); + GroupMember groupMember1 = null; + GroupMember groupMember2 = null; + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + try + { + client.start(); + + groupMember1 = new GroupMember(client, "/member", "1"); + groupMember1.start(); + + final CountDownLatch memberAddedLatch = new CountDownLatch(1); + final CountDownLatch memberLeftLatch = new CountDownLatch(1); + final CountDownLatch memberUpdatedLatch = new CountDownLatch(1); + GroupMemberListener listener = new GroupMemberListener() + { + @Override + public void groupEvent(CuratorFramework client, GroupMemberEvent event) throws Exception + { + if (event.getType() == GroupMemberEvent.Type.MEMBER_JOINED) + { + memberAddedLatch.countDown(); + } + else if (event.getType() == GroupMemberEvent.Type.MEMBER_LEFT) + { + memberLeftLatch.countDown(); + } + else if (event.getType() == GroupMemberEvent.Type.MEMBER_UPDATED) + { + memberUpdatedLatch.countDown(); + } + } + }; + groupMember1.getListenable().addListener(listener); + + groupMember2 = new GroupMember(client, "/member", "2"); + groupMember2.start(); + timing.sleepABit(); + Assert.assertTrue(timing.awaitLatch(memberAddedLatch)); + groupMember2.setThisData("new data".getBytes()); + timing.sleepABit(); + Assert.assertTrue(timing.awaitLatch(memberUpdatedLatch)); + CloseableUtils.closeQuietly(groupMember2); + timing.sleepABit(); + Assert.assertTrue(timing.awaitLatch(memberLeftLatch)); + } + finally + { + CloseableUtils.closeQuietly(groupMember1); + CloseableUtils.closeQuietly(client); + } + } +}