Skip to content

Commit

Permalink
Fixes #2484 Internal references to local Locks taken using SharedData…
Browse files Browse the repository at this point in the history
….getLock() are not cleaned up

Refactored local locks implementation to avoid keeping references to lock with no waiters.

Signed-off-by: Thomas Segismont <tsegismont@gmail.com>
  • Loading branch information
tsegismont committed Jun 14, 2018
1 parent 6aad769 commit 65de1c9
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 138 deletions.
106 changes: 0 additions & 106 deletions src/main/java/io/vertx/core/shareddata/impl/AsynchronousLock.java

This file was deleted.

134 changes: 134 additions & 0 deletions src/main/java/io/vertx/core/shareddata/impl/LocalAsyncLocks.java
@@ -0,0 +1,134 @@
/*
* Copyright (c) 2011-2018 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/

package io.vertx.core.shareddata.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.VertxException;
import io.vertx.core.shareddata.Lock;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Stream;

import static java.util.stream.Collectors.*;

/**
* @author Thomas Segismont
*/
public class LocalAsyncLocks {

// Immutable
private class AsyncLock implements Lock {

final String name;
final List<LockWaiter> waiters;

AsyncLock(String name, LockWaiter waiter) {
this.name = name;
waiters = Collections.singletonList(waiter);
}

AsyncLock(String name, List<LockWaiter> waiters) {
this.name = name;
this.waiters = waiters;
}

boolean firstWaiter() {
return waiters.size() == 1;
}

AsyncLock addWaiter(LockWaiter waiter) {
return new AsyncLock(name, Stream.concat(waiters.stream(), Stream.of(waiter)).collect(toList()));
}

AsyncLock forNextWaiter() {
if (waiters.size() > 1) {
List<LockWaiter> lockWaiters = waiters.stream().skip(1).filter(LockWaiter::notTimedOut).collect(toList());
if (!lockWaiters.isEmpty()) {
return new AsyncLock(name, lockWaiters);
}
}
return null;
}

void acquire() {
LockWaiter waiter = waiters.get(0);
if (!waiter.acquire(this)) {
release();
}
}

@Override
public void release() {
AsyncLock asyncLock = localLocks.compute(name, (name, lock) -> lock == null ? null : lock.forNextWaiter());
if (asyncLock != null) {
asyncLock.acquire();
}
}
}

private static class LockWaiter {
final Context context;
final Handler<AsyncResult<Lock>> handler;
final Long timerId;
volatile boolean timedOut;
volatile boolean acquired;

LockWaiter(Context context, Handler<AsyncResult<Lock>> handler, long timeout) {
this.context = context;
this.handler = handler;
timerId = timeout != Long.MAX_VALUE ? context.owner().setTimer(timeout, tid -> timeout()) : null;
}

void timeout() {
if (!acquired) {
timedOut = true;
context.runOnContext(v -> handler.handle(Future.failedFuture(new VertxException("Timed out waiting to get lock"))));
}
}

boolean acquire(AsyncLock lock) {
if (!timedOut) {
if (timerId != null) {
context.owner().cancelTimer(timerId);
}
acquired = true;
context.runOnContext(v -> handler.handle(Future.succeededFuture(lock)));
}
return acquired;
}

boolean notTimedOut() {
return !timedOut;
}
}

private final ConcurrentMap<String, AsyncLock> localLocks = new ConcurrentHashMap<>();

public void acquire(Context context, String name, long timeout, Handler<AsyncResult<Lock>> handler) {
LockWaiter lockWaiter = new LockWaiter(context, handler, timeout);
AsyncLock asyncLock = localLocks.compute(name, (lockName, lock) -> {
if (lock == null) {
return new AsyncLock(lockName, lockWaiter);
}
return lock.addWaiter(lockWaiter);
});
if (asyncLock.firstWaiter()) {
asyncLock.acquire();
}
}
}
Expand Up @@ -41,14 +41,15 @@ public class SharedDataImpl implements SharedData {

private final VertxInternal vertx;
private final ClusterManager clusterManager;
private final LocalAsyncLocks localAsyncLocks;
private final ConcurrentMap<String, LocalAsyncMapImpl<?, ?>> localAsyncMaps = new ConcurrentHashMap<>();
private final ConcurrentMap<String, AsynchronousLock> localLocks = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Counter> localCounters = new ConcurrentHashMap<>();
private final ConcurrentMap<String, LocalMap<?, ?>> localMaps = new ConcurrentHashMap<>();

public SharedDataImpl(VertxInternal vertx, ClusterManager clusterManager) {
this.vertx = vertx;
this.clusterManager = clusterManager;
localAsyncLocks = clusterManager == null ? new LocalAsyncLocks() : null;
}

@Override
Expand Down Expand Up @@ -132,8 +133,7 @@ private <K, V> void getLocalAsyncMap(String name, Handler<AsyncResult<AsyncMap<K
}

private void getLocalLock(String name, long timeout, Handler<AsyncResult<Lock>> resultHandler) {
AsynchronousLock lock = localLocks.computeIfAbsent(name, n -> new AsynchronousLock(vertx));
lock.acquire(timeout, resultHandler);
localAsyncLocks.acquire(vertx.getOrCreateContext(), name, timeout, resultHandler);
}

private void getLocalCounter(String name, Handler<AsyncResult<Counter>> resultHandler) {
Expand Down
33 changes: 4 additions & 29 deletions src/test/java/io/vertx/test/fakecluster/FakeClusterManager.java
Expand Up @@ -23,7 +23,7 @@
import io.vertx.core.shareddata.Counter;
import io.vertx.core.shareddata.Lock;
import io.vertx.core.shareddata.impl.AsynchronousCounter;
import io.vertx.core.shareddata.impl.AsynchronousLock;
import io.vertx.core.shareddata.impl.LocalAsyncLocks;
import io.vertx.core.shareddata.impl.LocalAsyncMapImpl;
import io.vertx.core.spi.cluster.AsyncMultiMap;
import io.vertx.core.spi.cluster.ChoosableIterable;
Expand All @@ -50,7 +50,7 @@ public class FakeClusterManager implements ClusterManager {
private static ConcurrentMap<String, LocalAsyncMapImpl> asyncMaps = new ConcurrentHashMap<>();
private static ConcurrentMap<String, ConcurrentMap> asyncMultiMaps = new ConcurrentHashMap<>();
private static ConcurrentMap<String, Map> syncMaps = new ConcurrentHashMap<>();
private static ConcurrentMap<String, AsynchronousLock> locks = new ConcurrentHashMap<>();
private static LocalAsyncLocks localAsyncLocks = new LocalAsyncLocks();
private static ConcurrentMap<String, AtomicLong> counters = new ConcurrentHashMap<>();

private String nodeID;
Expand Down Expand Up @@ -149,13 +149,7 @@ public <K, V> Map<K, V> getSyncMap(String name) {

@Override
public void getLockWithTimeout(String name, long timeout, Handler<AsyncResult<Lock>> resultHandler) {
AsynchronousLock lock = new AsynchronousLock(vertx);
AsynchronousLock prev = locks.putIfAbsent(name, lock);
if (prev != null) {
lock = prev;
}
FakeLock flock = new FakeLock(lock);
flock.acquire(timeout, resultHandler);
localAsyncLocks.acquire(vertx.getOrCreateContext(), name, timeout, resultHandler);
}

@Override
Expand Down Expand Up @@ -225,30 +219,11 @@ public static void reset() {
nodes.clear();
asyncMaps.clear();
asyncMultiMaps.clear();
locks.clear();
localAsyncLocks = new LocalAsyncLocks();
counters.clear();
syncMaps.clear();
}

private class FakeLock implements Lock {

private final AsynchronousLock delegate;

public FakeLock(AsynchronousLock delegate) {
this.delegate = delegate;
}

public void acquire(long timeout, Handler<AsyncResult<Lock>> resultHandler) {
Context context = vertx.getOrCreateContext();
delegate.doAcquire(context, timeout, resultHandler);
}

@Override
public void release() {
delegate.release();
}
}

private class FakeAsyncMultiMap<K, V> implements AsyncMultiMap<K, V> {

private final ConcurrentMap<K, ChoosableSet<V>> map;
Expand Down

0 comments on commit 65de1c9

Please sign in to comment.