Skip to content

Commit

Permalink
Refactor lock implementation to use thread IDs in locks.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jan 3, 2015
1 parent 7d1d401 commit f132f9a
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 15 deletions.
Expand Up @@ -22,14 +22,15 @@
import net.kuujo.copycat.internal.AbstractManagedResource; import net.kuujo.copycat.internal.AbstractManagedResource;
import net.kuujo.copycat.internal.cluster.coordinator.DefaultClusterCoordinator; import net.kuujo.copycat.internal.cluster.coordinator.DefaultClusterCoordinator;


import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;


/** /**
* Asynchronous lock. * Asynchronous lock.
* *
* @author <a href="http://github.com/kuujo">Jordan Halterman</a> * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/ */
public interface AsyncLock extends AsyncLockProxy, DiscreteResource<AsyncLock> { public interface AsyncLock extends DiscreteResource<AsyncLock> {


/** /**
* Creates a new asynchronous lock. * Creates a new asynchronous lock.
Expand Down Expand Up @@ -65,4 +66,18 @@ static AsyncLock create(String name, String uri, ClusterConfig cluster, AsyncLoc
} }
} }


/**
* Acquires the lock.
*
* @return A completable future to be completed once the lock has been acquired.
*/
CompletableFuture<Boolean> lock();

/**
* Releases the lock.
*
* @return A completable future to be completed once the lock has been released.
*/
CompletableFuture<Void> unlock();

} }
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package net.kuujo.copycat.collections; package net.kuujo.copycat.collections.internal.lock;


import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;


Expand All @@ -27,15 +27,19 @@ public interface AsyncLockProxy {
/** /**
* Acquires the lock. * Acquires the lock.
* *
* @param member The member locking the lock.
* @param thread The thread locking the lock.
* @return A completable future to be completed once the lock has been acquired. * @return A completable future to be completed once the lock has been acquired.
*/ */
CompletableFuture<Boolean> lock(); CompletableFuture<Boolean> lock(String member, long thread);


/** /**
* Releases the lock. * Releases the lock.
* *
* @param member The member unlocking the lock.
* @param thread The thread unlocking the lock.
* @return A completable future to be completed once the lock has been released. * @return A completable future to be completed once the lock has been released.
*/ */
CompletableFuture<Void> unlock(); CompletableFuture<Void> unlock(String member, long thread);


} }
Expand Up @@ -17,7 +17,6 @@
import net.kuujo.copycat.ResourceContext; import net.kuujo.copycat.ResourceContext;
import net.kuujo.copycat.StateMachine; import net.kuujo.copycat.StateMachine;
import net.kuujo.copycat.collections.AsyncLock; import net.kuujo.copycat.collections.AsyncLock;
import net.kuujo.copycat.collections.AsyncLockProxy;
import net.kuujo.copycat.internal.AbstractDiscreteResource; import net.kuujo.copycat.internal.AbstractDiscreteResource;
import net.kuujo.copycat.internal.DefaultStateMachine; import net.kuujo.copycat.internal.DefaultStateMachine;
import net.kuujo.copycat.internal.util.concurrent.Futures; import net.kuujo.copycat.internal.util.concurrent.Futures;
Expand Down Expand Up @@ -56,12 +55,12 @@ protected <T> CompletableFuture<T> checkOpen(Supplier<CompletableFuture<T>> supp


@Override @Override
public CompletableFuture<Boolean> lock() { public CompletableFuture<Boolean> lock() {
return checkOpen(proxy::lock); return checkOpen(() -> proxy.lock(cluster().member().uri(), Thread.currentThread().getId()));
} }


@Override @Override
public CompletableFuture<Void> unlock() { public CompletableFuture<Void> unlock() {
return checkOpen(proxy::unlock); return checkOpen(() -> proxy.unlock(cluster().member().uri(), Thread.currentThread().getId()));
} }


@Override @Override
Expand Down
Expand Up @@ -38,14 +38,14 @@ public interface LockState {
* @param thread The thread that is unlocking the lock. * @param thread The thread that is unlocking the lock.
* @return Indicates whether the lock was successfully locked. * @return Indicates whether the lock was successfully locked.
*/ */
boolean lock(String member, String thread); boolean lock(String member, long thread);


/** /**
* Unlocks the lock. * Unlocks the lock.
* *
* @param member The member that is unlocking the lock. * @param member The member that is unlocking the lock.
* @param thread The thread that is unlocking the lock. * @param thread The thread that is unlocking the lock.
*/ */
void unlock(String member, String thread); void unlock(String member, long thread);


} }
Expand Up @@ -52,14 +52,14 @@ private void handleMembershipEvent(MembershipEvent event) {
} }


@Override @Override
public boolean lock(String member, String thread) { public boolean lock(String member, long thread) {
return false; return false;
} }


@Override @Override
public void unlock(String member, String thread) { public void unlock(String member, long thread) {
String currentMember = context.get("member"); String currentMember = context.get("member");
String currentThread = context.get("thread"); Long currentThread = context.get("thread");
if ((currentMember != null && !currentMember.equals(member)) || (currentThread != null && !currentThread.equals(thread))) { if ((currentMember != null && !currentMember.equals(member)) || (currentThread != null && !currentThread.equals(thread))) {
throw new IllegalStateException("Lock is owned by another thread"); throw new IllegalStateException("Lock is owned by another thread");
} }
Expand Down
Expand Up @@ -31,9 +31,9 @@ public void init(StateContext<LockState> context) {
} }


@Override @Override
public boolean lock(String member, String thread) { public boolean lock(String member, long thread) {
String currentMember = context.get("member"); String currentMember = context.get("member");
String currentThread = context.get("thread"); Long currentThread = context.get("thread");
if ((currentMember != null && !currentMember.equals(member)) || (currentThread != null && !currentThread.equals(thread))) { if ((currentMember != null && !currentMember.equals(member)) || (currentThread != null && !currentThread.equals(thread))) {
throw new IllegalStateException("Lock is owned by another thread"); throw new IllegalStateException("Lock is owned by another thread");
} }
Expand All @@ -44,7 +44,7 @@ public boolean lock(String member, String thread) {
} }


@Override @Override
public void unlock(String member, String thread) { public void unlock(String member, long thread) {
// Do nothing. The lock is already unlocked. // Do nothing. The lock is already unlocked.
} }


Expand Down

0 comments on commit f132f9a

Please sign in to comment.