Skip to content

Commit

Permalink
Refactored: made InMemoryDatabaseTable use GroupLock
Browse files Browse the repository at this point in the history
  • Loading branch information
luontola committed Nov 19, 2008
1 parent a923f7c commit c45b605
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 63 deletions.
Expand Up @@ -31,11 +31,8 @@

package net.orfjackal.dimdwarf.db.inmemory;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import java.util.*;
import java.util.concurrent.locks.*;

/**
* @author Esko Luontola
Expand All @@ -44,7 +41,7 @@
public class GroupLock<T> {

private final Set<T> lockedKeys = new HashSet<T>();
private final ReentrantLock lock = new ReentrantLock();
private final Lock lock = new ReentrantLock();

public LockHandle tryLock(T... keys) throws IllegalStateException {
return tryLock(Arrays.asList(keys));
Expand Down Expand Up @@ -93,7 +90,7 @@ private class MyLockHandle implements LockHandle {
private Collection<T> keys;

public MyLockHandle(Collection<T> keys) {
this.keys = keys;
this.keys = Collections.unmodifiableCollection(new ArrayList<T>(keys));
}

public void unlock() {
Expand Down
Expand Up @@ -31,14 +31,9 @@

package net.orfjackal.dimdwarf.db.inmemory;

import net.orfjackal.dimdwarf.db.Blob;
import net.orfjackal.dimdwarf.db.OptimisticLockException;
import net.orfjackal.dimdwarf.db.*;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.*;

/**
* This class is thread-safe.
Expand All @@ -49,7 +44,7 @@
class InMemoryDatabaseTable implements PersistedDatabaseTable {

private final RevisionMap<Blob, Blob> revisions;
private final ConcurrentMap<Blob, MyCommitHandle> lockedForCommit = new ConcurrentHashMap<Blob, MyCommitHandle>();
private final GroupLock<Blob> keysLockedForCommit = new GroupLock<Blob>();

public InMemoryDatabaseTable(RevisionCounter revisionCounter) {
revisions = new RevisionMap<Blob, Blob>(revisionCounter);
Expand Down Expand Up @@ -79,46 +74,12 @@ public CommitHandle prepare(Map<Blob, Blob> updates, long revision) {
return new MyCommitHandle(updates, revision);
}

// TODO: refactor this locking logic out of this class or simplify it - maybe create GroupLock which locks a set of keys simultanously

private synchronized void checkForConflicts(Set<Blob> keys, long visibleRevision) {
for (Blob key : keys) {
long lastWrite = revisions.getLatestRevisionForKey(key);
if (lastWrite > visibleRevision) {
throw new OptimisticLockException("Key " + key + " already modified in revision " + lastWrite);
}
}
}

private synchronized void lock(MyCommitHandle h, Set<Blob> keys) {
for (Blob key : keys) {
MyCommitHandle alreadyLocked = lockedForCommit.putIfAbsent(key, h);
if (alreadyLocked != null) {
throw new OptimisticLockException("Key " + key + " already locked by " + alreadyLocked);
}
}
}

private synchronized void putAll(MyCommitHandle h, Map<Blob, Blob> updates) {
for (Map.Entry<Blob, Blob> update : updates.entrySet()) {
assert lockedForCommit.get(update.getKey()).equals(h);
revisions.put(update.getKey(), update.getValue());
}
}

private synchronized void unlock(MyCommitHandle h, Set<Blob> keys) {
for (Blob key : keys) {
if (lockedForCommit.containsKey(key)) {
boolean wasLockedByMe = lockedForCommit.remove(key, h);
assert wasLockedByMe : "key = " + key;
}
}
}

private class MyCommitHandle implements CommitHandle {

private final Map<Blob, Blob> updates;
private final long visibleRevision;
private LockHandle lockHandle;

public MyCommitHandle(Map<Blob, Blob> updates, long visibleRevision) {
this.updates = new HashMap<Blob, Blob>(updates);
Expand All @@ -127,17 +88,46 @@ public MyCommitHandle(Map<Blob, Blob> updates, long visibleRevision) {
}

private void prepare() {
checkForConflicts(updates.keySet(), visibleRevision);
lock(this, updates.keySet());
lockHandle = keysLockedForCommit.tryLock(updates.keySet());
try {
checkForConflicts();
} catch (OptimisticLockException e) {
lockHandle.unlock();
throw e;
}
}

private void checkForConflicts() throws OptimisticLockException {
for (Blob key : updates.keySet()) {
checkForModifiedInOtherTransaction(key);
}
}

private void checkForModifiedInOtherTransaction(Blob key) throws OptimisticLockException {
long lastWrite = revisions.getLatestRevisionForKey(key);
if (lastWrite > visibleRevision) {
throw new OptimisticLockException("Key " + key + " already modified in revision " + lastWrite);
}
}

public void commit() {
putAll(this, updates);
unlock(this, updates.keySet());
commitUpdates();
lockHandle.unlock();
}

private void commitUpdates() {
for (Map.Entry<Blob, Blob> update : updates.entrySet()) {
commitUpdate(update.getKey(), update.getValue());
}
}

private void commitUpdate(Blob key, Blob value) {
assert keysLockedForCommit.isLocked(key);
revisions.put(key, value);
}

public void rollback() {
unlock(this, updates.keySet());
lockHandle.unlock();
}
}
}
Expand Up @@ -31,16 +31,11 @@

package net.orfjackal.dimdwarf.db.inmemory;

import jdave.Block;
import jdave.Group;
import jdave.Specification;
import jdave.*;
import jdave.junit4.JDaveRunner;
import net.orfjackal.dimdwarf.db.Blob;
import net.orfjackal.dimdwarf.db.*;
import static net.orfjackal.dimdwarf.db.Blob.EMPTY_BLOB;
import net.orfjackal.dimdwarf.db.DatabaseTable;
import net.orfjackal.dimdwarf.tx.TransactionCoordinator;
import net.orfjackal.dimdwarf.tx.TransactionException;
import net.orfjackal.dimdwarf.tx.TransactionImpl;
import net.orfjackal.dimdwarf.tx.*;
import org.junit.runner.RunWith;
import org.slf4j.Logger;

Expand Down Expand Up @@ -309,6 +304,15 @@ public void onlyTheFirstToPrepareAndCommitWillSucceed() {
tx1PreparesAndCommitsBeforeTx2();
specify(readInNewTransaction(key), should.equal(value1));
}

/**
* Checks that InMemoryDatabaseTable releases its commit locks if there is a modification conflict.
*/
public void theKeyMayBeUpdatedInALaterTransaction() {
tx1PreparesAndCommitsBeforeTx2();
updateInNewTransaction(key, value2);
specify(readInNewTransaction(key), should.equal(value2));
}
}

public class IfTwoTransactionsDeleteAnEntryWithTheSameKey {
Expand Down

0 comments on commit c45b605

Please sign in to comment.