diff --git a/helix-lock/src/main/java/org/apache/helix/lock/DistributedLock.java b/helix-lock/src/main/java/org/apache/helix/lock/DistributedLock.java index 87247c208d..3adbb75c40 100644 --- a/helix-lock/src/main/java/org/apache/helix/lock/DistributedLock.java +++ b/helix-lock/src/main/java/org/apache/helix/lock/DistributedLock.java @@ -49,4 +49,10 @@ public interface DistributedLock { * false if the user is not the lock owner or the lock doesn't have a owner */ boolean isCurrentOwner(); + + /** + * Call this method to close the lock's zookeeper connection + * The lock has to be unlocked or expired before this method can be called + */ + void close(); } diff --git a/helix-lock/src/main/java/org/apache/helix/lock/helix/ZKDistributedNonblockingLock.java b/helix-lock/src/main/java/org/apache/helix/lock/helix/ZKDistributedNonblockingLock.java index d5ad3a29f9..4b322e273e 100644 --- a/helix-lock/src/main/java/org/apache/helix/lock/helix/ZKDistributedNonblockingLock.java +++ b/helix-lock/src/main/java/org/apache/helix/lock/helix/ZKDistributedNonblockingLock.java @@ -37,7 +37,6 @@ * Helix nonblocking lock implementation based on Zookeeper */ public class ZKDistributedNonblockingLock implements DistributedLock { - private static final Logger LOG = Logger.getLogger(ZKDistributedNonblockingLock.class); private final String _lockPath; @@ -50,12 +49,12 @@ public class ZKDistributedNonblockingLock implements DistributedLock { * Initialize the lock with user provided information, e.g.,cluster, scope, etc. * @param scope the scope to lock * @param zkAddress the zk address the cluster connects to - * @param timeout the timeout period of the lcok + * @param timeout the timeout period of the lock * @param lockMsg the reason for having this lock * @param userId a universal unique userId for lock owner identity */ - public ZKDistributedNonblockingLock(LockScope scope, String zkAddress, Long timeout, String lockMsg, - String userId) { + public ZKDistributedNonblockingLock(LockScope scope, String zkAddress, Long timeout, + String lockMsg, String userId) { this(scope.getPath(), zkAddress, timeout, lockMsg, userId); } @@ -63,12 +62,12 @@ public ZKDistributedNonblockingLock(LockScope scope, String zkAddress, Long time * Initialize the lock with user provided information, e.g., lock path under zookeeper, etc. * @param lockPath the path of the lock under Zookeeper * @param zkAddress the zk address of the cluster - * @param timeout the timeout period of the lcok + * @param timeout the timeout period of the lock * @param lockMsg the reason for having this lock * @param userId a universal unique userId for lock owner identity */ - private ZKDistributedNonblockingLock(String lockPath, String zkAddress, Long timeout, String lockMsg, - String userId) { + private ZKDistributedNonblockingLock(String lockPath, String zkAddress, Long timeout, + String lockMsg, String userId) { _lockPath = lockPath; if (timeout < 0) { throw new IllegalArgumentException("The expiration time cannot be negative."); @@ -81,7 +80,6 @@ private ZKDistributedNonblockingLock(String lockPath, String zkAddress, Long tim @Override public boolean tryLock() { - // Set lock information fields long deadline; // Prevent value overflow @@ -115,6 +113,14 @@ public boolean isCurrentOwner() { .getTimeout()); } + @Override + public void close() { + if (isCurrentOwner()) { + throw new HelixException("Please unlock the lock before closing it."); + } + _baseDataAccessor.close(); + } + /** * Class that specifies how a lock node should be updated with another lock node */ diff --git a/helix-lock/src/test/java/org/apache/helix/lock/helix/TestZKHelixNonblockingLock.java b/helix-lock/src/test/java/org/apache/helix/lock/helix/TestZKHelixNonblockingLock.java index 758c72902e..ef80cf879e 100644 --- a/helix-lock/src/test/java/org/apache/helix/lock/helix/TestZKHelixNonblockingLock.java +++ b/helix-lock/src/test/java/org/apache/helix/lock/helix/TestZKHelixNonblockingLock.java @@ -19,6 +19,7 @@ package org.apache.helix.lock.helix; +import java.io.IOException; import java.util.ArrayList; import java.util.Date; import java.util.List; @@ -26,12 +27,14 @@ import java.util.UUID; import java.util.concurrent.Callable; +import org.apache.helix.HelixException; import org.apache.helix.TestHelper; import org.apache.helix.common.ZkTestBase; import org.apache.helix.lock.LockInfo; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.zookeeper.CreateMode; import org.testng.Assert; +import org.testng.annotations.AfterSuite; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -48,7 +51,6 @@ public class TestZKHelixNonblockingLock extends ZkTestBase { @BeforeClass public void beforeClass() throws Exception { - System.out.println("START " + _clusterName + " at " + new Date(System.currentTimeMillis())); TestHelper.setupCluster(_clusterName, ZK_ADDR, 12918, "localhost", "TestDB", 1, 10, 5, 3, @@ -61,8 +63,9 @@ public void beforeClass() throws Exception { _participantScope = new HelixLockScope(HelixLockScope.LockScopeProperty.CLUSTER, pathKeys); _lockPath = _participantScope.getPath(); - _lock = new ZKDistributedNonblockingLock(_participantScope, ZK_ADDR, Long.MAX_VALUE, _lockMessage, - _userId); + _lock = + new ZKDistributedNonblockingLock(_participantScope, ZK_ADDR, Long.MAX_VALUE, _lockMessage, + _userId); } @BeforeMethod @@ -71,9 +74,14 @@ public void beforeMethod() { Assert.assertFalse(_gZkClient.exists(_lockPath)); } + @AfterSuite + public void afterSuite() throws IOException { + _lock.close(); + super.afterSuite(); + } + @Test public void testAcquireLock() { - // Acquire lock _lock.tryLock(); Assert.assertTrue(_gZkClient.exists(_lockPath)); @@ -93,7 +101,6 @@ public void testAcquireLock() { @Test public void testAcquireLockWhenExistingLockNotExpired() { - // Fake condition when the lock owner is not current user String fakeUserID = UUID.randomUUID().toString(); ZNRecord fakeRecord = new ZNRecord(fakeUserID); @@ -115,7 +122,6 @@ public void testAcquireLockWhenExistingLockNotExpired() { @Test public void testAcquireLockWhenExistingLockExpired() { - // Fake condition when the current lock already expired String fakeUserID = UUID.randomUUID().toString(); ZNRecord fakeRecord = new ZNRecord(fakeUserID); @@ -159,5 +165,33 @@ public Boolean call() throws Exception { return _lock.tryLock(); } } + + @Test + public void testCloseLockedLock() { + _lock.tryLock(); + Assert.assertTrue(_lock.isCurrentOwner()); + try { + _lock.close(); + Assert.fail("Should throw exception here."); + } catch (HelixException e) { + Assert.assertEquals(e.getMessage(), "Please unlock the lock before closing it."); + } + Assert.assertTrue(_lock.isCurrentOwner()); + } + + @Test + public void testCloseUnlockedLock() { + Assert.assertFalse(_lock.isCurrentOwner()); + try { + _lock.close(); + _lock.getCurrentLockInfo(); + Assert.fail("Should throw exception here"); + } catch (IllegalStateException e) { + Assert.assertEquals(e.getMessage(), "ZkClient already closed!"); + } + _lock = + new ZKDistributedNonblockingLock(_participantScope, ZK_ADDR, Long.MAX_VALUE, _lockMessage, + _userId); + } }