Skip to content

Commit

Permalink
IGNITE-59 Refactoring CacheLock.
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergey Evdokimov committed Jan 25, 2015
1 parent 6accb8c commit dabe040
Show file tree
Hide file tree
Showing 18 changed files with 338 additions and 165 deletions.
Expand Up @@ -37,6 +37,12 @@ class CacheLockImpl<K> implements Lock {
/** */
private final Collection<? extends K> keys;

/** */
private int cntr;

/** */
private volatile Thread lockedThread;

/**
* @param delegate Delegate.
* @param keys Keys.
Expand All @@ -50,12 +56,25 @@ class CacheLockImpl<K> implements Lock {
@Override public void lock() {
try {
delegate.lockAll(keys, 0);

incrementLockCounter();
}
catch (IgniteCheckedException e) {
throw new CacheException(e.getMessage(), e);
}
}

/**
*
*/
private void incrementLockCounter() {
assert (lockedThread == null && cntr == 0) || (lockedThread == Thread.currentThread() && cntr > 0);

lockedThread = Thread.currentThread();

cntr++;
}

/** {@inheritDoc} */
@Override public void lockInterruptibly() throws InterruptedException {
tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
Expand All @@ -64,7 +83,12 @@ class CacheLockImpl<K> implements Lock {
/** {@inheritDoc} */
@Override public boolean tryLock() {
try {
return delegate.lockAll(keys, -1);
boolean res = delegate.lockAll(keys, -1);

if (res)
incrementLockCounter();

return res;
}
catch (IgniteCheckedException e) {
throw new CacheException(e.getMessage(), e);
Expand All @@ -76,27 +100,31 @@ class CacheLockImpl<K> implements Lock {
if (Thread.interrupted())
throw new InterruptedException();

try {
if (time <= 0)
return delegate.lockAll(keys, -1);
if (time <= 0)
return tryLock();

IgniteFuture<Boolean> fut = null;
try {
IgniteFuture<Boolean> fut = delegate.lockAllAsync(keys, unit.toMillis(time));

try {
fut = delegate.lockAllAsync(keys, unit.toMillis(time));
boolean res = fut.get();

return fut.get();
if (res)
incrementLockCounter();

return res;
}
catch (IgniteInterruptedException e) {
if (fut != null) {
if (!fut.cancel()) {
if (fut.isDone()) {
Boolean res = fut.get();
if (!fut.cancel()) {
if (fut.isDone()) {
Boolean res = fut.get();

Thread.currentThread().interrupt();

Thread.currentThread().interrupt();
if (res)
incrementLockCounter();

return res;
}
return res;
}
}

Expand All @@ -111,9 +139,43 @@ class CacheLockImpl<K> implements Lock {
}
}

/**
*
*/
private boolean isKeysLocked() {
for (K key : keys) {
if (delegate.isLocked(key))
return true;
}

return false;
}

/** {@inheritDoc} */
@Override public void unlock() {
try {
Thread lockedThread = this.lockedThread;

if (lockedThread != Thread.currentThread()) {
if (lockedThread == null) {
if (isKeysLocked()) {
throw new IllegalStateException("Failed to unlock keys, looks like lock has been obtain on " +
"another instance of Lock, that was returned by IgniteCache.lock(key). You have to call " +
"lock() and unlock() methods on the same instance of Lock [keys=" + keys + ']');
}
} else {
throw new IllegalStateException("Failed to unlock cache keys, keys are locked by another thread " +
"any threads [keys=" + keys + ", lockOwnerThread=" + lockedThread.getName() + ']');
}
}

assert cntr > 0;

cntr--;

if (cntr == 0)
this.lockedThread = null;

delegate.unlockAll(keys);
}
catch (IgniteCheckedException e) {
Expand Down
Expand Up @@ -179,12 +179,12 @@ public GridCacheContext<K, V> context() {

/** {@inheritDoc} */
@Override public Lock lock(K key) throws CacheException {
return lockAll(Collections.<K>singleton(key));
return lockAll(Collections.singleton(key));
}

/** {@inheritDoc} */
@Override public Lock lockAll(final Collection<? extends K> keys) {
return new CacheLockImpl<K>(delegate, keys);
return new CacheLockImpl<>(delegate, keys);
}

/** {@inheritDoc} */
Expand Down
Expand Up @@ -3323,13 +3323,15 @@ public void testLockUnlock() throws Exception {

assert !cache.isLocalLocked(key, false);

cache.lock(key).lock();
Lock lock = cache.lock(key);

lock.lock();

lockCnt.await();

assert cache.isLocalLocked(key, false);

cache.lock(key).unlock();
lock.unlock();

unlockCnt.await();

Expand Down Expand Up @@ -3707,12 +3709,14 @@ public void testLockUnlockAll() throws Exception {
assert !cache.isLocalLocked("key1", false);
assert !cache.isLocalLocked("key2", false);

cache.lockAll(ImmutableSet.of("key1", "key2")).lock();
Lock lock1_2 = cache.lockAll(ImmutableSet.of("key1", "key2"));

lock1_2.lock();

assert cache.isLocalLocked("key1", false);
assert cache.isLocalLocked("key2", false);

cache.lockAll(ImmutableSet.of("key1", "key2")).unlock();
lock1_2.unlock();

for (int i = 0; i < 100; i++)
if (cache.isLocalLocked("key1", false) || cache.isLocalLocked("key2", false))
Expand All @@ -3723,14 +3727,12 @@ public void testLockUnlockAll() throws Exception {
assert !cache.isLocalLocked("key1", false);
assert !cache.isLocalLocked("key2", false);

Lock lock = cache.lockAll(ImmutableSet.of("key1", "key2"));

lock.lock();
lock1_2.lock();

assert cache.isLocalLocked("key1", false);
assert cache.isLocalLocked("key2", false);

lock.unlock();
lock1_2.unlock();

for (int i = 0; i < 100; i++)
if (cache.isLocalLocked("key1", false) || cache.isLocalLocked("key2", false))
Expand Down

0 comments on commit dabe040

Please sign in to comment.