Skip to content

Commit

Permalink
support overlapping ensures with no retry, refs #143
Browse files Browse the repository at this point in the history
  • Loading branch information
richhickey committed Jul 3, 2009
1 parent c4a5cd2 commit 9617434
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 20 deletions.
98 changes: 79 additions & 19 deletions src/jvm/clojure/lang/LockingTransaction.java
Expand Up @@ -16,6 +16,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

@SuppressWarnings({"SynchronizeOnNonFinalField"})
public class LockingTransaction{
Expand Down Expand Up @@ -104,13 +105,32 @@ void stop(int status){
final HashSet<Ref> sets = new HashSet<Ref>();
final TreeMap<Ref, ArrayList<CFn>> commutes = new TreeMap<Ref, ArrayList<CFn>>();

final HashSet<Ref> ensures = new HashSet<Ref>(); //all hold readLock


void tryWriteLock(Ref ref){
try
{
if(!ref.lock.writeLock().tryLock(LOCK_WAIT_MSECS, TimeUnit.MILLISECONDS))
throw retryex;
}
catch(InterruptedException e)
{
throw retryex;
}
}

//returns the most recent val
Object lock(Ref ref){
boolean unlocked = false;
//can't upgrade readLock, so release it
releaseIfEnsured(ref);

boolean unlocked = true;
try
{
ref.lock.writeLock().lock();
tryWriteLock(ref);
unlocked = false;

if(ref.tvals != null && ref.tvals.point > readPoint)
throw retryex;
Info refinfo = ref.tinfo;
Expand All @@ -122,9 +142,23 @@ Object lock(Ref ref){
{
ref.lock.writeLock().unlock();
unlocked = true;
//stop prior to blocking
stop(RETRY);
synchronized(refinfo)
return blockAndBail(refinfo);
}
}
ref.tinfo = info;
return ref.tvals == null ? null : ref.tvals.val;
}
finally
{
if(!unlocked)
ref.lock.writeLock().unlock();
}
}

private Object blockAndBail(Info refinfo){
//stop prior to blocking
stop(RETRY);
synchronized(refinfo)
{
if(refinfo.running())
{
Expand All @@ -133,20 +167,18 @@ Object lock(Ref ref){
refinfo.wait(LOCK_WAIT_MSECS);
}
catch(InterruptedException e)
{
}
}
}
throw retryex;
{
}
}
ref.tinfo = info;
return ref.tvals == null ? null : ref.tvals.val;
}
finally
throw retryex;
}

private void releaseIfEnsured(Ref ref){
if(ensures.contains(ref))
{
if(!unlocked)
ref.lock.writeLock().unlock();
ensures.remove(ref);
ref.lock.readLock().unlock();
}
}

Expand Down Expand Up @@ -240,8 +272,14 @@ Object run(Callable fn) throws Exception{
for(Map.Entry<Ref, ArrayList<CFn>> e : commutes.entrySet())
{
Ref ref = e.getKey();
ref.lock.writeLock().lock();
boolean wasEnsured = ensures.contains(ref);
//can't upgrade readLock, so release it
releaseIfEnsured(ref);
tryWriteLock(ref);
locked.add(ref);
if(wasEnsured && ref.tvals != null && ref.tvals.point > readPoint)
throw retryex;

Info refinfo = ref.tinfo;
if(refinfo != null && refinfo != info && refinfo.running())
{
Expand All @@ -260,7 +298,7 @@ Object run(Callable fn) throws Exception{
{
if(!commutes.containsKey(ref))
{
ref.lock.writeLock().lock();
tryWriteLock(ref);
locked.add(ref);
}
}
Expand Down Expand Up @@ -319,6 +357,11 @@ else if((ref.faults.get() > 0 && hcount < ref.maxHistory)
locked.get(k).lock.writeLock().unlock();
}
locked.clear();
for(Ref r : ensures)
{
r.lock.readLock().unlock();
}
ensures.clear();
stop(done ? COMMITTED : RETRY);
try
{
Expand Down Expand Up @@ -391,10 +434,27 @@ Object doSet(Ref ref, Object val){
return val;
}

void doTouch(Ref ref){
void doEnsure(Ref ref){
if(!info.running())
throw retryex;
lock(ref);
if(ensures.contains(ref))
return;
ref.lock.readLock().lock();

Info refinfo = ref.tinfo;

//writer exists
if(refinfo != null && refinfo.running())
{
ref.lock.readLock().unlock();

if(refinfo != info) //not us, ensure is doomed
{
blockAndBail(refinfo);
}
}
else
ensures.add(ref);
}

Object doCommute(Ref ref, IFn fn, ISeq args) throws Exception{
Expand Down
2 changes: 1 addition & 1 deletion src/jvm/clojure/lang/Ref.java
Expand Up @@ -175,7 +175,7 @@ public Object alter(IFn fn, ISeq args) throws Exception{
}

public void touch(){
LockingTransaction.getEx().doTouch(this);
LockingTransaction.getEx().doEnsure(this);
}

//*/
Expand Down

0 comments on commit 9617434

Please sign in to comment.