From ddadeeab9d8e1f42dac13ad65494c84e37f7f3d6 Mon Sep 17 00:00:00 2001 From: faucct Date: Sun, 3 Oct 2021 22:11:49 +0300 Subject: [PATCH] CURATOR-607: InterProcessReadWriteLock should expose exposing getLockPath (#394) Co-authored-by: Nikita Sokolov --- .../locks/InterProcessReadWriteLock.java | 182 +++++++++--------- .../locks/TestInterProcessReadWriteLock.java | 105 ++++++++++ 2 files changed, 200 insertions(+), 87 deletions(-) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessReadWriteLock.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessReadWriteLock.java index 57af212765..a1ea94dbf7 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessReadWriteLock.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessReadWriteLock.java @@ -55,8 +55,8 @@ */ public class InterProcessReadWriteLock { - private final InterProcessMutex readMutex; - private final InterProcessMutex writeMutex; + private final ReadLock readMutex; + private final WriteLock writeMutex; // must be the same length. LockInternals depends on it private static final String READ_LOCK_NAME = "__READ__"; @@ -82,33 +82,100 @@ private static class InternalInterProcessMutex extends InterProcessMutex { super(client, path, lockName, maxLeases, driver); this.lockName = lockName; - this.lockData = lockData; + this.lockData = (lockData == null) ? null : Arrays.copyOf(lockData, lockData.length); } @Override - public Collection getParticipantNodes() throws Exception + final public Collection getParticipantNodes() throws Exception { - Collection nodes = super.getParticipantNodes(); - Iterable filtered = Iterables.filter - ( - nodes, - new Predicate() - { - @Override - public boolean apply(String node) - { - return node.contains(lockName); - } + return ImmutableList.copyOf(Iterables.filter(super.getParticipantNodes(), new Predicate() { + @Override + public boolean apply(String node) { + return node.contains(lockName); } - ); - return ImmutableList.copyOf(filtered); + })); } @Override - protected byte[] getLockNodeBytes() + final protected byte[] getLockNodeBytes() { return lockData; } + + @Override + protected String getLockPath() + { + return super.getLockPath(); + } + } + + public static class WriteLock extends InternalInterProcessMutex + { + public WriteLock(CuratorFramework client, String basePath, byte[] lockData) + { + super(client, basePath, WRITE_LOCK_NAME, lockData, 1, new SortingLockInternalsDriver() { + @Override + public PredicateResults getsTheLock( + CuratorFramework client, + List children, + String sequenceNodeName, + int maxLeases + ) throws Exception { + return super.getsTheLock(client, children, sequenceNodeName, maxLeases); + } + }); + } + + @Override + protected String getLockPath() + { + return super.getLockPath(); + } + } + + public static class ReadLock extends InternalInterProcessMutex { + public ReadLock(CuratorFramework client, String basePath, byte[] lockData, WriteLock writeLock) + { + super(client, basePath, READ_LOCK_NAME, lockData, Integer.MAX_VALUE, new SortingLockInternalsDriver() { + @Override + public PredicateResults getsTheLock( + CuratorFramework client, + List children, + String sequenceNodeName, + int maxLeases + ) throws Exception { + if (writeLock.isOwnedByCurrentThread()) { + return new PredicateResults(null, true); + } + + int index = 0; + int firstWriteIndex = Integer.MAX_VALUE; + int ourIndex = -1; + for (String node : children) { + if (node.contains(WRITE_LOCK_NAME)) { + firstWriteIndex = Math.min(index, firstWriteIndex); + } else if (node.startsWith(sequenceNodeName)) { + ourIndex = index; + break; + } + + ++index; + } + + validateOurIndex(sequenceNodeName, ourIndex); + + boolean getsTheLock = (ourIndex < firstWriteIndex); + String pathToWatch = getsTheLock ? null : children.get(firstWriteIndex); + return new PredicateResults(pathToWatch, getsTheLock); + } + }); + } + + @Override + protected String getLockPath() + { + return super.getLockPath(); + } } /** @@ -127,41 +194,14 @@ public InterProcessReadWriteLock(CuratorFramework client, String basePath) */ public InterProcessReadWriteLock(CuratorFramework client, String basePath, byte[] lockData) { - lockData = (lockData == null) ? null : Arrays.copyOf(lockData, lockData.length); - - writeMutex = new InternalInterProcessMutex - ( - client, - basePath, - WRITE_LOCK_NAME, - lockData, - 1, - new SortingLockInternalsDriver() - { - @Override - public PredicateResults getsTheLock(CuratorFramework client, List children, String sequenceNodeName, int maxLeases) throws Exception - { - return super.getsTheLock(client, children, sequenceNodeName, maxLeases); - } - } - ); - - readMutex = new InternalInterProcessMutex - ( - client, - basePath, - READ_LOCK_NAME, - lockData, - Integer.MAX_VALUE, - new SortingLockInternalsDriver() - { - @Override - public PredicateResults getsTheLock(CuratorFramework client, List children, String sequenceNodeName, int maxLeases) throws Exception - { - return readLockPredicate(children, sequenceNodeName); - } - } - ); + this.writeMutex = new WriteLock(client, basePath, lockData); + this.readMutex = new ReadLock(client, basePath, lockData, writeMutex); + } + + protected InterProcessReadWriteLock(WriteLock writeLock, ReadLock readLock) + { + this.writeMutex = writeLock; + this.readMutex = readLock; } /** @@ -169,7 +209,7 @@ public PredicateResults getsTheLock(CuratorFramework client, List childr * * @return read lock */ - public InterProcessMutex readLock() + public ReadLock readLock() { return readMutex; } @@ -179,40 +219,8 @@ public InterProcessMutex readLock() * * @return write lock */ - public InterProcessMutex writeLock() + public WriteLock writeLock() { return writeMutex; } - - private PredicateResults readLockPredicate(List children, String sequenceNodeName) throws Exception - { - if ( writeMutex.isOwnedByCurrentThread() ) - { - return new PredicateResults(null, true); - } - - int index = 0; - int firstWriteIndex = Integer.MAX_VALUE; - int ourIndex = -1; - for ( String node : children ) - { - if ( node.contains(WRITE_LOCK_NAME) ) - { - firstWriteIndex = Math.min(index, firstWriteIndex); - } - else if ( node.startsWith(sequenceNodeName) ) - { - ourIndex = index; - break; - } - - ++index; - } - - StandardLockInternalsDriver.validateOurIndex(sequenceNodeName, ourIndex); - - boolean getsTheLock = (ourIndex < firstWriteIndex); - String pathToWatch = getsTheLock ? null : children.get(firstWriteIndex); - return new PredicateResults(pathToWatch, getsTheLock); - } } diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java index a601241b18..b54ae50af5 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java @@ -24,12 +24,15 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import com.google.common.collect.Lists; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.imps.TestCleanState; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.test.KillSession; +import org.apache.zookeeper.KeeperException; import org.junit.jupiter.api.Test; import java.util.Collection; @@ -362,4 +365,106 @@ private void doLocking(InterProcessLock lock, AtomicInteger concurrentCount, Ato } } } + + public static class LockPathInterProcessReadWriteLock extends InterProcessReadWriteLock + { + private final WriteLock writeLock; + private final ReadLock readLock; + + public LockPathInterProcessReadWriteLock(CuratorFramework client, String basePath) + { + this(client, basePath, null); + } + + public LockPathInterProcessReadWriteLock(CuratorFramework client, String basePath, byte[] lockData) + { + this(client, basePath, lockData, new WriteLock(client, basePath, lockData)); + } + + private LockPathInterProcessReadWriteLock( + CuratorFramework client, + String basePath, + byte[] lockData, + WriteLock writeLock + ) + { + this(writeLock, new ReadLock(client, basePath, lockData, writeLock)); + } + + private LockPathInterProcessReadWriteLock(WriteLock writeLock, ReadLock readLock) + { + super(writeLock, readLock); + this.writeLock = writeLock; + this.readLock = readLock; + } + + @Override + public WriteLock writeLock() + { + return writeLock; + } + + @Override + public ReadLock readLock() + { + return readLock; + } + + public static class WriteLock extends InterProcessReadWriteLock.WriteLock + { + private WriteLock(CuratorFramework client, String basePath, byte[] lockData) + { + super(client, basePath, lockData); + } + + @Override + public String getLockPath() + { + return super.getLockPath(); + } + } + + public static class ReadLock extends InterProcessReadWriteLock.ReadLock + { + private ReadLock(CuratorFramework client, String basePath, byte[] lockData, WriteLock writeLock) + { + super(client, basePath, lockData, writeLock); + } + + @Override + public String getLockPath() + { + return super.getLockPath(); + } + } + } + + @Test + public void testLockPath() throws Exception + { + CuratorFramework client1 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + CuratorFramework client2 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + try + { + client1.start(); + client2.start(); + LockPathInterProcessReadWriteLock lock1 = new LockPathInterProcessReadWriteLock(client1, "/lock"); + LockPathInterProcessReadWriteLock lock2 = new LockPathInterProcessReadWriteLock(client2, "/lock"); + lock1.writeLock().acquire(); + KillSession.kill(client1.getZookeeperClient().getZooKeeper()); + lock2.readLock().acquire(); + try { + client1.getData().forPath(lock1.writeLock().getLockPath()); + fail("expected not to find node"); + } catch (KeeperException.NoNodeException ignored) { + } + lock2.readLock().release(); + lock1.writeLock().release(); + } + finally + { + TestCleanState.closeAndTestClean(client2); + TestCleanState.closeAndTestClean(client1); + } + } }