Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,10 @@ public interface DistributedLock {
* false if the user is not the lock owner or the lock doesn't have a owner
*/
boolean isCurrentOwner();

/**
* Call this method to close the lock's zookeeper connection
* The lock has to be unlocked or expired before this method can be called
*/
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
* Helix nonblocking lock implementation based on Zookeeper
*/
public class ZKDistributedNonblockingLock implements DistributedLock {

private static final Logger LOG = Logger.getLogger(ZKDistributedNonblockingLock.class);

private final String _lockPath;
Expand All @@ -50,25 +49,25 @@ public class ZKDistributedNonblockingLock implements DistributedLock {
* Initialize the lock with user provided information, e.g.,cluster, scope, etc.
* @param scope the scope to lock
* @param zkAddress the zk address the cluster connects to
* @param timeout the timeout period of the lcok
* @param timeout the timeout period of the lock
* @param lockMsg the reason for having this lock
* @param userId a universal unique userId for lock owner identity
*/
public ZKDistributedNonblockingLock(LockScope scope, String zkAddress, Long timeout, String lockMsg,
String userId) {
public ZKDistributedNonblockingLock(LockScope scope, String zkAddress, Long timeout,
String lockMsg, String userId) {
this(scope.getPath(), zkAddress, timeout, lockMsg, userId);
}

/**
* Initialize the lock with user provided information, e.g., lock path under zookeeper, etc.
* @param lockPath the path of the lock under Zookeeper
* @param zkAddress the zk address of the cluster
* @param timeout the timeout period of the lcok
* @param timeout the timeout period of the lock
* @param lockMsg the reason for having this lock
* @param userId a universal unique userId for lock owner identity
*/
private ZKDistributedNonblockingLock(String lockPath, String zkAddress, Long timeout, String lockMsg,
String userId) {
private ZKDistributedNonblockingLock(String lockPath, String zkAddress, Long timeout,
String lockMsg, String userId) {
_lockPath = lockPath;
if (timeout < 0) {
throw new IllegalArgumentException("The expiration time cannot be negative.");
Expand All @@ -81,7 +80,6 @@ private ZKDistributedNonblockingLock(String lockPath, String zkAddress, Long tim

@Override
public boolean tryLock() {

// Set lock information fields
long deadline;
// Prevent value overflow
Expand Down Expand Up @@ -115,6 +113,14 @@ public boolean isCurrentOwner() {
.getTimeout());
}

@Override
public void close() {
if (isCurrentOwner()) {
throw new HelixException("Please unlock the lock before closing it.");
}
_baseDataAccessor.close();
}

/**
* Class that specifies how a lock node should be updated with another lock node
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,22 @@

package org.apache.helix.lock.helix;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;

import org.apache.helix.HelixException;
import org.apache.helix.TestHelper;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.lock.LockInfo;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.zookeeper.CreateMode;
import org.testng.Assert;
import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand All @@ -48,7 +51,6 @@ public class TestZKHelixNonblockingLock extends ZkTestBase {

@BeforeClass
public void beforeClass() throws Exception {

System.out.println("START " + _clusterName + " at " + new Date(System.currentTimeMillis()));

TestHelper.setupCluster(_clusterName, ZK_ADDR, 12918, "localhost", "TestDB", 1, 10, 5, 3,
Expand All @@ -61,8 +63,9 @@ public void beforeClass() throws Exception {

_participantScope = new HelixLockScope(HelixLockScope.LockScopeProperty.CLUSTER, pathKeys);
_lockPath = _participantScope.getPath();
_lock = new ZKDistributedNonblockingLock(_participantScope, ZK_ADDR, Long.MAX_VALUE, _lockMessage,
_userId);
_lock =
new ZKDistributedNonblockingLock(_participantScope, ZK_ADDR, Long.MAX_VALUE, _lockMessage,
_userId);
}

@BeforeMethod
Expand All @@ -71,9 +74,14 @@ public void beforeMethod() {
Assert.assertFalse(_gZkClient.exists(_lockPath));
}

@AfterSuite
public void afterSuite() throws IOException {
_lock.close();
super.afterSuite();
}

@Test
public void testAcquireLock() {

// Acquire lock
_lock.tryLock();
Assert.assertTrue(_gZkClient.exists(_lockPath));
Expand All @@ -93,7 +101,6 @@ public void testAcquireLock() {

@Test
public void testAcquireLockWhenExistingLockNotExpired() {

// Fake condition when the lock owner is not current user
String fakeUserID = UUID.randomUUID().toString();
ZNRecord fakeRecord = new ZNRecord(fakeUserID);
Expand All @@ -115,7 +122,6 @@ public void testAcquireLockWhenExistingLockNotExpired() {

@Test
public void testAcquireLockWhenExistingLockExpired() {

// Fake condition when the current lock already expired
String fakeUserID = UUID.randomUUID().toString();
ZNRecord fakeRecord = new ZNRecord(fakeUserID);
Expand Down Expand Up @@ -159,5 +165,33 @@ public Boolean call() throws Exception {
return _lock.tryLock();
}
}

@Test
public void testCloseLockedLock() {
_lock.tryLock();
Assert.assertTrue(_lock.isCurrentOwner());
try {
_lock.close();
Assert.fail("Should throw exception here.");
} catch (HelixException e) {
Assert.assertEquals(e.getMessage(), "Please unlock the lock before closing it.");
}
Assert.assertTrue(_lock.isCurrentOwner());
}

@Test
public void testCloseUnlockedLock() {
Assert.assertFalse(_lock.isCurrentOwner());
try {
_lock.close();
_lock.getCurrentLockInfo();
Assert.fail("Should throw exception here");
} catch (IllegalStateException e) {
Assert.assertEquals(e.getMessage(), "ZkClient already closed!");
}
_lock =
new ZKDistributedNonblockingLock(_participantScope, ZK_ADDR, Long.MAX_VALUE, _lockMessage,
_userId);
}
}