Skip to content

Commit

Permalink
Assign monotonically increasing per-client lock IDs to lock requests …
Browse files Browse the repository at this point in the history
…to handle concurrency and support failing attempts to acquire a lock.
  • Loading branch information
kuujo committed Feb 14, 2016
1 parent ac84075 commit 0e60f2b
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 33 deletions.
Expand Up @@ -22,10 +22,10 @@
import io.atomix.resource.ResourceTypeInfo;

import java.time.Duration;
import java.util.Queue;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Consumer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Provides a mechanism for synchronizing access to cluster-wide shared resources.
Expand Down Expand Up @@ -98,7 +98,9 @@ public static Config config() {
return new Config();
}

private final Queue<Consumer<Long>> queue = new ConcurrentLinkedQueue<>();
private final Map<Integer, CompletableFuture<Long>> futures = new ConcurrentHashMap<>();
private final AtomicInteger id = new AtomicInteger();
private int lock;

public DistributedLock(CopycatClient client, Resource.Options options) {
super(client, options);
Expand All @@ -108,17 +110,29 @@ public DistributedLock(CopycatClient client, Resource.Options options) {
public CompletableFuture<DistributedLock> open() {
return super.open().thenApply(result -> {
client.onEvent("lock", this::handleEvent);
client.onEvent("fail", this::handleFail);
return result;
});
}

/**
* Handles a received session event.
* Handles a received lock event.
*/
private void handleEvent(long version) {
Consumer<Long> consumer = queue.poll();
if (consumer != null) {
consumer.accept(version);
private void handleEvent(LockCommands.LockEvent event) {
CompletableFuture<Long> future = futures.get(event.id());
if (future != null) {
this.lock = event.id();
future.complete(event.version());
}
}

/**
* Handles a received failure event.
*/
private void handleFail(LockCommands.LockEvent event) {
CompletableFuture<Long> future = futures.get(event.id());
if (future != null) {
future.complete(null);
}
}

Expand Down Expand Up @@ -153,11 +167,12 @@ private void handleEvent(long version) {
*/
public CompletableFuture<Long> lock() {
CompletableFuture<Long> future = new CompletableFuture<>();
Consumer<Long> consumer = locked -> future.complete(null);
queue.add(consumer);
submit(new LockCommands.Lock(-1)).whenComplete((result, error) -> {
int id = this.id.incrementAndGet();
futures.put(id, future);
submit(new LockCommands.Lock(id, -1)).whenComplete((result, error) -> {
if (error != null) {
queue.remove(consumer);
futures.remove(id);
future.completeExceptionally(error);
}
});
return future;
Expand Down Expand Up @@ -206,11 +221,12 @@ public CompletableFuture<Long> lock() {
*/
public CompletableFuture<Long> tryLock() {
CompletableFuture<Long> future = new CompletableFuture<>();
Consumer<Long> consumer = future::complete;
queue.add(consumer);
submit(new LockCommands.Lock()).whenComplete((result, error) -> {
int id = this.id.incrementAndGet();
futures.put(id, future);
submit(new LockCommands.Lock(id, 0)).whenComplete((result, error) -> {
if (error != null) {
queue.remove(consumer);
futures.remove(id);
future.completeExceptionally(error);
}
});
return future;
Expand Down Expand Up @@ -267,11 +283,12 @@ public CompletableFuture<Long> tryLock() {
*/
public CompletableFuture<Long> tryLock(Duration timeout) {
CompletableFuture<Long> future = new CompletableFuture<>();
Consumer<Long> consumer = future::complete;
queue.add(consumer);
submit(new LockCommands.Lock(timeout.toMillis())).whenComplete((result, error) -> {
int id = this.id.incrementAndGet();
futures.put(id, future);
submit(new LockCommands.Lock(id, timeout.toMillis())).whenComplete((result, error) -> {
if (error != null) {
queue.remove(consumer);
futures.remove(id);
future.completeExceptionally(error);
}
});
return future;
Expand Down Expand Up @@ -304,7 +321,12 @@ public CompletableFuture<Long> tryLock(Duration timeout) {
* @return A completable future to be completed once the lock has been released.
*/
public CompletableFuture<Void> unlock() {
return submit(new LockCommands.Unlock());
int lock = this.lock;
this.lock = 0;
if (lock != 0) {
return submit(new LockCommands.Unlock(lock));
}
return CompletableFuture.completedFuture(null);
}

}
Expand Up @@ -63,15 +63,26 @@ public void readObject(BufferInput buffer, Serializer serializer) {
* Lock command.
*/
public static class Lock extends LockCommand<Void> {
private int id;
private long timeout;

public Lock() {
}

public Lock(long timeout) {
public Lock(int id, long timeout) {
this.id = id;
this.timeout = timeout;
}

/**
* Returns the lock ID.
*
* @return The lock ID.
*/
public int id() {
return id;
}

/**
* Returns the try lock timeout.
*
Expand All @@ -88,11 +99,12 @@ public CompactionMode compaction() {

@Override
public void writeObject(BufferOutput buffer, Serializer serializer) {
buffer.writeLong(timeout);
buffer.writeInt(id).writeLong(timeout);
}

@Override
public void readObject(BufferInput buffer, Serializer serializer) {
id = buffer.readInt();
timeout = buffer.readLong();
}
}
Expand All @@ -101,10 +113,88 @@ public void readObject(BufferInput buffer, Serializer serializer) {
* Unlock command.
*/
public static class Unlock extends LockCommand<Void> {
private int id;

public Unlock() {
}

public Unlock(int id) {
this.id = id;
}

/**
* Returns the lock ID.
*
* @return The lock ID.
*/
public int id() {
return id;
}

@Override
public CompactionMode compaction() {
return CompactionMode.SEQUENTIAL;
}

@Override
public void writeObject(BufferOutput buffer, Serializer serializer) {
buffer.writeInt(id);
}

@Override
public void readObject(BufferInput buffer, Serializer serializer) {
id = buffer.readInt();
}
}

/**
* Lock event.
*/
public static class LockEvent implements CatalystSerializable {
private int id;
private long version;

public LockEvent() {
}

public LockEvent(int id, long version) {
this.id = id;
this.version = version;
}

/**
* Returns the lock ID.
*
* @return The lock ID.
*/
public int id() {
return id;
}

/**
* Returns the lock version.
*
* @return The lock version.
*/
public long version() {
return version;
}

@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
buffer.writeInt(id).writeLong(version);
}

@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
id = buffer.readInt();
version = buffer.readLong();
}

@Override
public String toString() {
return String.format("%s[id=%d, version=%d]", getClass().getSimpleName(), id, version);
}
}

/**
Expand All @@ -115,6 +205,7 @@ public static class TypeResolver implements SerializableTypeResolver {
public void resolve(SerializerRegistry registry) {
registry.register(Lock.class, -141);
registry.register(Unlock.class, -142);
registry.register(LockEvent.class, -143);
}
}

Expand Down
Expand Up @@ -56,7 +56,7 @@ public void close(Session session) {
if (lock.session().state() == Session.State.EXPIRED || lock.session().state() == Session.State.CLOSED) {
lock = queue.poll();
} else {
lock.session().publish("lock", lock.index());
lock.session().publish("lock", new LockCommands.LockEvent(lock.operation().id(), lock.index()));
break;
}
}
Expand All @@ -69,20 +69,26 @@ public void close(Session session) {
public void lock(Commit<LockCommands.Lock> commit) {
if (lock == null) {
lock = commit;
commit.session().publish("lock", commit.index());
commit.session().publish("lock", new LockCommands.LockEvent(commit.operation().id(), commit.index()));
} else if (commit.operation().timeout() == 0) {
try {
commit.session().publish("lock", null);
commit.session().publish("fail", new LockCommands.LockEvent(commit.operation().id(), commit.index()));
} finally {
commit.close();
}
} else {
queue.add(commit);
if (commit.operation().timeout() > 0) {
timers.put(commit.index(), executor.schedule(Duration.ofMillis(commit.operation().timeout()), () -> {
timers.remove(commit.index());
queue.remove(commit);
commit.close();
try {
timers.remove(commit.index());
queue.remove(commit);
if (commit.session().state().active()) {
commit.session().publish("fail", new LockCommands.LockEvent(commit.operation().id(), commit.index()));
}
} finally {
commit.close();
}
}));
}
}
Expand All @@ -108,7 +114,7 @@ public void unlock(Commit<LockCommands.Unlock> commit) {
if (lock.session().state() == Session.State.EXPIRED || lock.session().state() == Session.State.CLOSED) {
lock = queue.poll();
} else {
lock.session().publish("lock", lock.index());
lock.session().publish("lock", new LockCommands.LockEvent(lock.operation().id(), lock.index()));
break;
}
}
Expand Down
Expand Up @@ -15,9 +15,10 @@
*/
package io.atomix.coordination;

import io.atomix.testing.AbstractCopycatTest;
import org.testng.annotations.Test;

import io.atomix.testing.AbstractCopycatTest;
import java.time.Duration;

/**
* Async lock test.
Expand Down Expand Up @@ -64,4 +65,23 @@ public void testReleaseOnClose() throws Throwable {
await(10000);
}

/**
* Tests attempting to acquire a lock with a timeout.
*/
public void testTryLockFail() throws Throwable {
createServers(3);

DistributedLock lock1 = createResource();
DistributedLock lock2 = createResource();

lock1.lock().thenRun(this::resume);
await(10000);

lock2.tryLock(Duration.ofSeconds(1)).thenAccept(result -> {
threadAssertNull(result);
resume();
});
await(10000);
}

}
2 changes: 1 addition & 1 deletion coordination/src/test/resources/logback.xml
Expand Up @@ -21,7 +21,7 @@
</encoder>
</appender>

<root level="INFO">
<root level="DEBUG">
<appender-ref ref="STDOUT" />
</root>
</configuration>

0 comments on commit 0e60f2b

Please sign in to comment.