Skip to content

Commit

Permalink
fixes leak of mongo client on shutdown, and makes monitor threads shu…
Browse files Browse the repository at this point in the history
…tdown more graceful.
  • Loading branch information
john-morales committed Jul 20, 2013
1 parent 865f8d6 commit aa6542c
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 108 deletions.
2 changes: 1 addition & 1 deletion build.properties
Expand Up @@ -16,4 +16,4 @@

javac.source=1.5

lib.version=0.1.5
lib.version=0.1.6
28 changes: 28 additions & 0 deletions src/main/com/deftlabs/lock/mongo/DistributedLockSvcOptions.java
Expand Up @@ -101,6 +101,30 @@ public DistributedLockSvcOptions( final String pMongoUri,

public String getHostAddress() { return _hostAddress; }

/**
* Milliseconds between heartbeat checks.
*/
public long getHeartbeatFrequency() { return _heartbeatFrequency; }
public void setHeartbeatFrequency(final long pHeartbeatFrequency) {
_heartbeatFrequency = pHeartbeatFrequency;
}

/**
* Milliseconds between lock timeout checks.
*/
public long getTimeoutFrequency() { return _timeoutFrequency; }
public void setTimeoutFrequency(final long pTimeoutFrequency) {
_timeoutFrequency = pTimeoutFrequency;
}

/**
* Milliseconds between lock unlocked checks.
*/
public long getLockUnlockedFrequency() { return _lockUnlockedFrequency; }
public void setLockUnlockedFrequency(final long pLockUnlockedFrequency) {
_lockUnlockedFrequency = pLockUnlockedFrequency;
}

/**
* The default collection name is: lockHistory. Override here.
*/
Expand All @@ -122,5 +146,9 @@ public DistributedLockSvcOptions( final String pMongoUri,
private boolean _enableHistory = true;
private boolean _historyIsCapped = true;
private long _historySize = 209715200;

private long _heartbeatFrequency = 5000;
private long _timeoutFrequency = 60000;
private long _lockUnlockedFrequency = 1000;
}

179 changes: 89 additions & 90 deletions src/main/com/deftlabs/lock/mongo/impl/Monitor.java
Expand Up @@ -26,6 +26,8 @@

// Java
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -41,136 +43,133 @@ final class Monitor {
* closed properly (based on the lock/unlock) contract. This can happen when processes
* die unexpectedly (e.g., out of memory) or when they are not stopped properly (e.g., kill -9).
*/
static class LockHeartbeat extends Thread {
@Override public void run() {
while (_running) {
try {
for (final String lockName : _locks.keySet()) {
final DistributedLock lock = _locks.get(lockName);
static class LockHeartbeat extends MonitorThread {

final ObjectId lockId = lock.getLockId();
LockHeartbeat(final Mongo pMongo,
final DistributedLockSvcOptions pSvcOptions,
final Map<String, DistributedLock> pLocks) {
super("Mongo-Distributed-Lock-LockHeartbeat-" + System.currentTimeMillis(),
pMongo, pSvcOptions, pLocks);
}

if (!lock.isLocked() || lockId == null) continue;
@Override
boolean monitor() throws InterruptedException {
for (final String lockName : _locks.keySet()) {
final DistributedLock lock = _locks.get(lockName);

LockDao.heartbeat(_mongo, lockName, lockId, lock.getOptions(), _svcOptions);
}
final ObjectId lockId = lock.getLockId();

Thread.sleep(HEARTBEAT_FREQUENCY);
if (!lock.isLocked() || lockId == null) continue;

} catch (final InterruptedException ie) { break;
} catch (final Throwable t) { LOG.log(Level.SEVERE, t.getMessage(), t); }
LockDao.heartbeat(_mongo, lockName, lockId, lock.getOptions(), _svcOptions);
}
}

LockHeartbeat( final Mongo pMongo,
final DistributedLockSvcOptions pSvcOptions,
final Map<String, DistributedLock> pLocks)
{
super("Mongo-Distributed-Lock-LockHeartbeat-" + System.currentTimeMillis());
_mongo = pMongo;
_svcOptions = pSvcOptions;
_locks = pLocks;
}

private static final long HEARTBEAT_FREQUENCY = 5000;

void stopRunning() {
_running = false;
interrupt();
return _shutdown.await(_svcOptions.getHeartbeatFrequency(), TimeUnit.MILLISECONDS);
}

private volatile boolean _running = true;
private final Mongo _mongo;
private final DistributedLockSvcOptions _svcOptions;
private final Map<String, DistributedLock> _locks;
}

/**
* The lock timeout thread impl (see LockHeartbeat docs for more info). One lock
* timeout thread runs in each process this lock lib is running. This thread is
* responsible for cleaning up expired locks (based on time since last heartbeat).
*/
static class LockTimeout extends Thread {
@Override public void run() {
while (_running) {
try {

LockDao.expireInactiveLocks(_mongo, _svcOptions);

Thread.sleep(CHECK_FREQUENCY);
} catch (final InterruptedException ie) { break;
} catch (final Throwable t) { LOG.log(Level.SEVERE, t.getMessage(), t); }
}
}
static class LockTimeout extends MonitorThread {

LockTimeout(final Mongo pMongo,
final DistributedLockSvcOptions pSvcOptions)
{
super("Mongo-Distributed-Lock-LockTimeout-" + System.currentTimeMillis());
_mongo = pMongo;
_svcOptions = pSvcOptions;
LockTimeout(final Mongo pMongo, final DistributedLockSvcOptions pSvcOptions) {
super("Mongo-Distributed-Lock-LockTimeout-" + System.currentTimeMillis(), pMongo, pSvcOptions);
}

private static final long CHECK_FREQUENCY = 60000;

void stopRunning() {
_running = false;
interrupt();
@Override
boolean monitor() throws InterruptedException {
LockDao.expireInactiveLocks(_mongo, _svcOptions);
return _shutdown.await(_svcOptions.getTimeoutFrequency(), TimeUnit.MILLISECONDS);
}

private volatile boolean _running = true;

private final Mongo _mongo;
private final DistributedLockSvcOptions _svcOptions;
}

/**
* The lock unlocked thread is responsible for waking up local
* threads when a lock state changes.
*/
static class LockUnlocked extends Thread {
@Override public void run() {
while (_running) {
try {
for (final String lockName : _locks.keySet()) {
final DistributedLock lock = _locks.get(lockName);
static class LockUnlocked extends MonitorThread {

if (lock.isLocked()) continue;
LockUnlocked(final Mongo pMongo,
final DistributedLockSvcOptions pSvcOptions,
final Map<String, DistributedLock> pLocks) {
super("Mongo-Distributed-Lock-LockUnlocked-" + System.currentTimeMillis(),
pMongo, pSvcOptions, pLocks);
}

// Check to see if this is locked.
if (LockDao.isLocked(_mongo, lockName, _svcOptions)) continue;
@Override
boolean monitor() throws InterruptedException {
for (final String lockName : _locks.keySet()) {
final DistributedLock lock = _locks.get(lockName);

// The lock is not locked, wakeup any blocking threads.
lock.wakeupBlocked();
}
if (lock.isLocked()) continue;

Thread.sleep(FREQUENCY);
} catch (final InterruptedException ie) { break;
} catch (final Throwable t) { LOG.log(Level.SEVERE, t.getMessage(), t); }
// Check to see if this is locked.
if (LockDao.isLocked(_mongo, lockName, _svcOptions)) continue;

// The lock is not locked, wakeup any blocking threads.
lock.wakeupBlocked();
}

return _shutdown.await(_svcOptions.getLockUnlockedFrequency(), TimeUnit.MILLISECONDS);
}
}


LockUnlocked( final Mongo pMongo,
final DistributedLockSvcOptions pSvcOptions,
final Map<String, DistributedLock> pLocks)
{
super("Mongo-Distributed-Lock-LockUnlocked-" + System.currentTimeMillis());
private static abstract class MonitorThread extends Thread {

MonitorThread(final String pName,
final Mongo pMongo,
final DistributedLockSvcOptions pSvcOptions) {
this(pName, pMongo, pSvcOptions, null);
}

MonitorThread(final String pName,
final Mongo pMongo,
final DistributedLockSvcOptions pSvcOptions,
final Map<String, DistributedLock> pLocks) {
super(pName);
_mongo = pMongo;
_svcOptions = pSvcOptions;
_locks = pLocks;
_shutdown = new CountDownLatch(1);
_exited = new CountDownLatch(1);
setDaemon(true);
}

@Override public void run() {
boolean shutdown = false;
try {
while (!shutdown) {
try { shutdown = monitor();
} catch (final InterruptedException ie) { break;
} catch (final Throwable t) { LOG.log(Level.SEVERE, t.getMessage(), t); }
}
} finally {
_exited.countDown();
}
}

private static final long FREQUENCY = 1000;
/**
* Performs check and awaits shutdown signal for configured amount of milliseconds
* @return true if shutdown() was called, false otherwise.
*/
abstract boolean monitor() throws InterruptedException;

void stopRunning() {
_running = false;
interrupt();
void shutdown() throws InterruptedException {
_shutdown.countDown();
if (!_exited.await(10000, TimeUnit.MILLISECONDS)) {
this.interrupt();
}
}

private volatile boolean _running = true;
private final Mongo _mongo;
private final DistributedLockSvcOptions _svcOptions;
private final Map<String, DistributedLock> _locks;
final Mongo _mongo;
final DistributedLockSvcOptions _svcOptions;
final Map<String, DistributedLock> _locks;
final CountDownLatch _shutdown;
final CountDownLatch _exited;
}

private static final Logger LOG = Logger.getLogger("com.deftlabs.lock.mongo.Monitor");
Expand Down
32 changes: 15 additions & 17 deletions src/main/com/deftlabs/lock/mongo/impl/SvcImpl.java
Expand Up @@ -48,11 +48,11 @@ public SvcImpl(final DistributedLockSvcOptions pOptions) {
*/
@Override
public DistributedLock create( final String pLockName,
final DistributedLockOptions pLockOptions)
{
try {
_lock.lock();
final DistributedLockOptions pLockOptions) {
if ( !isRunning() ) throw new IllegalStateException("cannot create lock when service not running");

_lock.lock();
try {
if (_locks.containsKey(pLockName)) return _locks.get(pLockName);

final LockImpl lock = new LockImpl(_mongo, pLockName, pLockOptions, _options);
Expand All @@ -77,9 +77,8 @@ public DistributedLock create(final String pLockName) {

@Override
public void destroy(final DistributedLock pLock) {
_lock.lock();
try {
_lock.lock();

if (!_locks.containsKey(pLock.getName()))
{ throw new DistributedLockException("Lock has already been destroyed: " + pLock.getName()); }

Expand All @@ -97,10 +96,10 @@ public void destroy(final DistributedLock pLock) {
*/
@Override
public void startup() {
_running.set(true);
try {
_lock.lock();
if (!_running.compareAndSet(false, true)) throw new IllegalStateException("startup called but already running");

_lock.lock();
try {
_mongo = new Mongo(new MongoURI(_options.getMongoUri()));

// Init the db/collection.
Expand All @@ -127,24 +126,23 @@ public void startup() {
*/
@Override
public void shutdown() {
if (!_running.compareAndSet(true, false)) throw new IllegalStateException("shutdown called but not running");

if (!_running.get()) throw new IllegalStateException("shutdown called but not running");
_running.set(false);

_lock.lock();
try {
_lock.lock();

// Interrupt the locks.
for (final String lockName : _locks.keySet()) {
final DistributedLock lock = _locks.get(lockName);
if (lock == null) continue;
((LockImpl)lock).destroy();
}

_lockTimeout.stopRunning();
_lockHeartbeat.stopRunning();
_lockUnlocked.stopRunning();
_lockTimeout.shutdown();
_lockHeartbeat.shutdown();
_lockUnlocked.shutdown();

_locks.clear();
_mongo.close();
} catch (final Throwable t) { throw new DistributedLockException(t);
} finally { _lock.unlock(); }
}
Expand Down

0 comments on commit aa6542c

Please sign in to comment.