Skip to content

Commit

Permalink
Simplified LocalAsyncLocks
Browse files Browse the repository at this point in the history
- Removed WaitersList abstraction
- Do not skip timed out waiters when recomputing the list

Signed-off-by: Thomas Segismont <tsegismont@gmail.com>
  • Loading branch information
tsegismont committed Jun 20, 2018
1 parent 597a2ca commit d0994a1
Showing 1 changed file with 17 additions and 44 deletions.
61 changes: 17 additions & 44 deletions src/main/java/io/vertx/core/shareddata/impl/LocalAsyncLocks.java
Expand Up @@ -19,55 +19,17 @@
import io.vertx.core.shareddata.Lock; import io.vertx.core.shareddata.Lock;


import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;


import static java.util.stream.Collectors.*;

/** /**
* @author Thomas Segismont * @author Thomas Segismont
*/ */
public class LocalAsyncLocks { public class LocalAsyncLocks {


// Immutable
private static class WaitersList {

final List<LockWaiter> waiters;

WaitersList(List<LockWaiter> waiters) {
this.waiters = waiters;
}

int size() {
return waiters.size();
}

LockWaiter first() {
return waiters.get(0);
}

WaitersList add(LockWaiter waiter) {
List<LockWaiter> list = new ArrayList<>(waiters.size() + 1);
list.addAll(waiters);
list.add(waiter);
return new WaitersList(list);
}

WaitersList removeStale() {
if (waiters.size() > 1) {
List<LockWaiter> lockWaiters = this.waiters.stream().skip(1).filter(LockWaiter::isWaiting).collect(toList());
if (!lockWaiters.isEmpty()) {
return new WaitersList(lockWaiters);
}
}
return null;
}
}

private enum Status {WAITING, ACQUIRED, TIMED_OUT} private enum Status {WAITING, ACQUIRED, TIMED_OUT}


private class LockWaiter { private class LockWaiter {
Expand Down Expand Up @@ -124,22 +86,33 @@ public void release() {
} }
} }


private final ConcurrentMap<String, WaitersList> waitersMap = new ConcurrentHashMap<>(); // Value should never be modified
private final ConcurrentMap<String, List<LockWaiter>> waitersMap = new ConcurrentHashMap<>();


public void acquire(Context context, String name, long timeout, Handler<AsyncResult<Lock>> handler) { public void acquire(Context context, String name, long timeout, Handler<AsyncResult<Lock>> handler) {
LockWaiter lockWaiter = new LockWaiter(context, name, timeout, handler); LockWaiter lockWaiter = new LockWaiter(context, name, timeout, handler);
WaitersList waiters = waitersMap.compute(name, (s, list) -> { List<LockWaiter> waiters = waitersMap.compute(name, (s, list) -> {
return list == null ? new WaitersList(Collections.singletonList(lockWaiter)) : list.add(lockWaiter); List<LockWaiter> result;
if (list != null) {
result = new ArrayList<>(list.size() + 1);
result.addAll(list);
} else {
result = new ArrayList<>(1);
}
result.add(lockWaiter);
return result;
}); });
if (waiters.size() == 1) { if (waiters.size() == 1) {
waiters.first().acquireLock(); waiters.get(0).acquireLock();
} }
} }


private void nextWaiter(String lockName) { private void nextWaiter(String lockName) {
WaitersList waiters = waitersMap.compute(lockName, (s, list) -> list == null ? null : list.removeStale()); List<LockWaiter> waiters = waitersMap.compute(lockName, (s, list) -> {
return list == null || list.size() == 1 ? null : new ArrayList<>(list.subList(1, list.size()));
});
if (waiters != null) { if (waiters != null) {
waiters.first().acquireLock(); waiters.get(0).acquireLock();
} }
} }
} }

0 comments on commit d0994a1

Please sign in to comment.