Skip to content

Commit

Permalink
Implement atomicReadMVar, fixing #4001.
Browse files Browse the repository at this point in the history
We add the invariant to the MVar blocked threads queue that
threads blocked on an atomic read are always at the front of
the queue.  This invariant is easy to maintain, since takers
are only ever added to the end of the queue.

Signed-off-by: Edward Z. Yang <ezyang@mit.edu>
  • Loading branch information
ezyang committed Jul 9, 2013
1 parent ca9a431 commit 70e2063
Show file tree
Hide file tree
Showing 15 changed files with 147 additions and 18 deletions.
9 changes: 9 additions & 0 deletions compiler/prelude/primops.txt.pp
Expand Up @@ -1717,6 +1717,15 @@
out_of_line = True
has_side_effects = True
primop AtomicReadMVarOp "atomicReadMVar#" GenPrimOp
MVar# s a -> State# s -> (# State# s, a #)
{If {\tt MVar\#} is empty, block until it becomes full.
Then read its contents without modifying the MVar, without possibility
of intervention from other threads.}
with
out_of_line = True
has_side_effects = True
primop SameMVarOp "sameMVar#" GenPrimOp
MVar# s a -> MVar# s a -> Bool
Expand Down
25 changes: 13 additions & 12 deletions includes/rts/Constants.h
Expand Up @@ -202,31 +202,32 @@
*/
#define NotBlocked 0
#define BlockedOnMVar 1
#define BlockedOnBlackHole 2
#define BlockedOnRead 3
#define BlockedOnWrite 4
#define BlockedOnDelay 5
#define BlockedOnSTM 6
#define BlockedOnMVarRead 2
#define BlockedOnBlackHole 3
#define BlockedOnRead 4
#define BlockedOnWrite 5
#define BlockedOnDelay 6
#define BlockedOnSTM 7

/* Win32 only: */
#define BlockedOnDoProc 7
#define BlockedOnDoProc 8

/* Only relevant for PAR: */
/* blocked on a remote closure represented by a Global Address: */
#define BlockedOnGA 8
#define BlockedOnGA 9
/* same as above but without sending a Fetch message */
#define BlockedOnGA_NoSend 9
#define BlockedOnGA_NoSend 10
/* Only relevant for THREADED_RTS: */
#define BlockedOnCCall 10
#define BlockedOnCCall_Interruptible 11
#define BlockedOnCCall 11
#define BlockedOnCCall_Interruptible 12
/* same as above but permit killing the worker thread */

/* Involved in a message sent to tso->msg_cap */
#define BlockedOnMsgThrowTo 12
#define BlockedOnMsgThrowTo 13

/* The thread is not on any run queues, but can be woken up
by tryWakeupThread() */
#define ThreadMigrating 13
#define ThreadMigrating 14

/*
* These constants are returned to the scheduler by a thread that has
Expand Down
3 changes: 3 additions & 0 deletions includes/stg/MiscClosures.h
Expand Up @@ -293,7 +293,9 @@ RTS_FUN_DECL(stg_block_noregs);
RTS_FUN_DECL(stg_block_blackhole);
RTS_FUN_DECL(stg_block_blackhole_finally);
RTS_FUN_DECL(stg_block_takemvar);
RTS_FUN_DECL(stg_block_atomicreadmvar);
RTS_RET(stg_block_takemvar);
RTS_RET(stg_block_atomicreadmvar);
RTS_FUN_DECL(stg_block_putmvar);
RTS_RET(stg_block_putmvar);
#ifdef mingw32_HOST_OS
Expand Down Expand Up @@ -376,6 +378,7 @@ RTS_FUN_DECL(stg_isEmptyMVarzh);
RTS_FUN_DECL(stg_newMVarzh);
RTS_FUN_DECL(stg_takeMVarzh);
RTS_FUN_DECL(stg_putMVarzh);
RTS_FUN_DECL(stg_atomicReadMVarzh);
RTS_FUN_DECL(stg_tryTakeMVarzh);
RTS_FUN_DECL(stg_tryPutMVarzh);

Expand Down
31 changes: 29 additions & 2 deletions rts/HeapStackCheck.cmm
Expand Up @@ -487,11 +487,11 @@ stg_block_noregs
/* -----------------------------------------------------------------------------
* takeMVar/putMVar-specific blocks
*
* Stack layout for a thread blocked in takeMVar:
* Stack layout for a thread blocked in takeMVar/atomicReadMVar:
*
* ret. addr
* ptr to MVar (R1)
* stg_block_takemvar_info
* stg_block_takemvar_info (or stg_block_readmvar_info)
*
* Stack layout for a thread blocked in putMVar:
*
Expand Down Expand Up @@ -531,6 +531,33 @@ stg_block_takemvar /* mvar passed in R1 */
BLOCK_BUT_FIRST(stg_block_takemvar_finally);
}

INFO_TABLE_RET ( stg_block_atomicreadmvar, RET_SMALL, W_ info_ptr, P_ mvar )
return ()
{
jump stg_atomicReadMVarzh(mvar);
}

// code fragment executed just before we return to the scheduler
stg_block_atomicreadmvar_finally
{
W_ r1, r3;
r1 = R1;
r3 = R3;
unlockClosure(R3, stg_MVAR_DIRTY_info);
R1 = r1;
R3 = r3;
jump StgReturn [R1];
}

stg_block_atomicreadmvar /* mvar passed in R1 */
{
Sp_adj(-2);
Sp(1) = R1;
Sp(0) = stg_block_atomicreadmvar_info;
R3 = R1; // mvar communicated to stg_block_atomicreadmvar_finally in R3
BLOCK_BUT_FIRST(stg_block_atomicreadmvar_finally);
}

INFO_TABLE_RET( stg_block_putmvar, RET_SMALL, W_ info_ptr,
P_ mvar, P_ val )
return ()
Expand Down
2 changes: 2 additions & 0 deletions rts/Linker.c
Expand Up @@ -1058,6 +1058,7 @@ typedef struct _RtsSymbolVal {
SymI_HasProto(stg_yield_to_interpreter) \
SymI_HasProto(stg_block_noregs) \
SymI_HasProto(stg_block_takemvar) \
SymI_HasProto(stg_block_atomicreadmvar) \
SymI_HasProto(stg_block_putmvar) \
MAIN_CAP_SYM \
SymI_HasProto(MallocFailHook) \
Expand Down Expand Up @@ -1314,6 +1315,7 @@ typedef struct _RtsSymbolVal {
SymI_HasProto(stg_bh_upd_frame_info) \
SymI_HasProto(suspendThread) \
SymI_HasProto(stg_takeMVarzh) \
SymI_HasProto(stg_atomicReadMVarzh) \
SymI_HasProto(stg_threadStatuszh) \
SymI_HasProto(stg_tryPutMVarzh) \
SymI_HasProto(stg_tryTakeMVarzh) \
Expand Down
79 changes: 76 additions & 3 deletions rts/PrimOps.cmm
Expand Up @@ -1433,16 +1433,19 @@ loop:
goto loop;
}

// There are takeMVar(s) waiting: wake up the first one
// There are atomicReadMVar/takeMVar(s) waiting: wake up the first one

tso = StgMVarTSOQueue_tso(q);
StgMVar_head(mvar) = StgMVarTSOQueue_link(q);
if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
}

ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16);
ASSERT(StgTSO_block_info(tso) == mvar);
// save why_blocked here, because waking up the thread destroys
// this information
W_ why_blocked;
why_blocked = TO_W_(StgTSO_why_blocked(tso));

// actually perform the takeMVar
W_ stack;
Expand All @@ -1458,6 +1461,15 @@ loop:

ccall tryWakeupThread(MyCapability() "ptr", tso);

// If it was an atomicReadMVar, then we can still do work,
// so loop back. (XXX: This could take a while)
if (why_blocked == BlockedOnMVarRead) {
q = StgMVarTSOQueue_link(q);
goto loop;
}

ASSERT(why_blocked == BlockedOnMVar);

unlockClosure(mvar, info);
return ();
}
Expand Down Expand Up @@ -1512,8 +1524,11 @@ loop:
StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
}

ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16);
ASSERT(StgTSO_block_info(tso) == mvar);
// save why_blocked here, because waking up the thread destroys
// this information
W_ why_blocked;
why_blocked = TO_W_(StgTSO_why_blocked(tso));

// actually perform the takeMVar
W_ stack;
Expand All @@ -1529,10 +1544,68 @@ loop:

ccall tryWakeupThread(MyCapability() "ptr", tso);

// If it was an atomicReadMVar, then we can still do work,
// so loop back. (XXX: This could take a while)
if (why_blocked == BlockedOnMVarRead) {
q = StgMVarTSOQueue_link(q);
goto loop;
}

ASSERT(why_blocked == BlockedOnMVar);

unlockClosure(mvar, info);
return (1);
}

stg_atomicReadMVarzh ( P_ mvar, /* :: MVar a */ )
{
W_ val, info, tso, q;

#if defined(THREADED_RTS)
("ptr" info) = ccall lockClosure(mvar "ptr");
#else
info = GET_INFO(mvar);
#endif

if (info == stg_MVAR_CLEAN_info) {
ccall dirty_MVAR(BaseReg "ptr", mvar "ptr");
}

/* If the MVar is empty, put ourselves on the blocked readers
* list and wait until we're woken up.
*/
if (StgMVar_value(mvar) == stg_END_TSO_QUEUE_closure) {

ALLOC_PRIM_WITH_CUSTOM_FAILURE
(SIZEOF_StgMVarTSOQueue,
unlockClosure(mvar, stg_MVAR_DIRTY_info);
GC_PRIM_P(stg_atomicReadMVarzh, mvar));

q = Hp - SIZEOF_StgMVarTSOQueue + WDS(1);

// readMVars are pushed to the front of the queue, so
// they get handled immediately
SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM);
StgMVarTSOQueue_link(q) = StgMVar_head(mvar);
StgMVarTSOQueue_tso(q) = CurrentTSO;

StgTSO__link(CurrentTSO) = q;
StgTSO_block_info(CurrentTSO) = mvar;
StgTSO_why_blocked(CurrentTSO) = BlockedOnMVarRead::I16;
StgMVar_head(mvar) = q;

if (StgMVar_tail(mvar) == stg_END_TSO_QUEUE_closure) {
StgMVar_tail(mvar) = q;
}

jump stg_block_atomicreadmvar(mvar);
}

val = StgMVar_value(mvar);

unlockClosure(mvar, stg_MVAR_DIRTY_info);
return (val);
}

/* -----------------------------------------------------------------------------
Stable pointer primitives
Expand Down
4 changes: 3 additions & 1 deletion rts/RaiseAsync.c
Expand Up @@ -294,6 +294,7 @@ throwToMsg (Capability *cap, MessageThrowTo *msg)
}

case BlockedOnMVar:
case BlockedOnMVarRead:
{
/*
To establish ownership of this TSO, we need to acquire a
Expand All @@ -318,7 +319,7 @@ throwToMsg (Capability *cap, MessageThrowTo *msg)

// we have the MVar, let's check whether the thread
// is still blocked on the same MVar.
if (target->why_blocked != BlockedOnMVar
if ((target->why_blocked != BlockedOnMVar && target->why_blocked != BlockedOnMVarRead)
|| (StgMVar *)target->block_info.closure != mvar) {
unlockClosure((StgClosure *)mvar, info);
goto retry;
Expand Down Expand Up @@ -637,6 +638,7 @@ removeFromQueues(Capability *cap, StgTSO *tso)
goto done;

case BlockedOnMVar:
case BlockedOnMVarRead:
removeFromMVarBlockedQueue(tso);
goto done;

Expand Down
1 change: 1 addition & 0 deletions rts/RaiseAsync.h
Expand Up @@ -49,6 +49,7 @@ interruptible(StgTSO *t)
{
switch (t->why_blocked) {
case BlockedOnMVar:
case BlockedOnMVarRead:
case BlockedOnMsgThrowTo:
case BlockedOnRead:
case BlockedOnWrite:
Expand Down
1 change: 1 addition & 0 deletions rts/RetainerProfile.c
Expand Up @@ -1672,6 +1672,7 @@ retainClosure( StgClosure *c0, StgClosure *cp0, retainer r0 )
retainClosure(tso->bq, c, c_child_r);
retainClosure(tso->trec, c, c_child_r);
if ( tso->why_blocked == BlockedOnMVar
|| tso->why_blocked == BlockedOnMVarRead
|| tso->why_blocked == BlockedOnBlackHole
|| tso->why_blocked == BlockedOnMsgThrowTo
) {
Expand Down
2 changes: 2 additions & 0 deletions rts/Schedule.c
Expand Up @@ -947,6 +947,7 @@ scheduleDetectDeadlock (Capability **pcap, Task *task)
case BlockedOnBlackHole:
case BlockedOnMsgThrowTo:
case BlockedOnMVar:
case BlockedOnMVarRead:
throwToSingleThreaded(cap, task->incall->tso,
(StgClosure *)nonTermination_closure);
return;
Expand Down Expand Up @@ -2843,6 +2844,7 @@ resurrectThreads (StgTSO *threads)

switch (tso->why_blocked) {
case BlockedOnMVar:
case BlockedOnMVarRead:
/* Called by GC - sched_mutex lock is currently held. */
throwToSingleThreaded(cap, tso,
(StgClosure *)blockedIndefinitelyOnMVar_closure);
Expand Down
4 changes: 4 additions & 0 deletions rts/Threads.c
Expand Up @@ -255,6 +255,7 @@ tryWakeupThread (Capability *cap, StgTSO *tso)
switch (tso->why_blocked)
{
case BlockedOnMVar:
case BlockedOnMVarRead:
{
if (tso->_link == END_TSO_QUEUE) {
tso->block_info.closure = (StgClosure*)END_TSO_QUEUE;
Expand Down Expand Up @@ -734,6 +735,9 @@ printThreadBlockage(StgTSO *tso)
case BlockedOnMVar:
debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
break;
case BlockedOnMVarRead:
debugBelch("is blocked on atomic MVar read @ %p", tso->block_info.closure);
break;
case BlockedOnBlackHole:
debugBelch("is blocked on a black hole %p",
((StgBlockingQueue*)tso->block_info.bh->bh));
Expand Down
1 change: 1 addition & 0 deletions rts/Trace.c
Expand Up @@ -179,6 +179,7 @@ static char *thread_stop_reasons[] = {
[ThreadFinished] = "finished",
[THREAD_SUSPENDED_FOREIGN_CALL] = "suspended while making a foreign call",
[6 + BlockedOnMVar] = "blocked on an MVar",
[6 + BlockedOnMVarRead] = "blocked on an atomic MVar read",
[6 + BlockedOnBlackHole] = "blocked on a black hole",
[6 + BlockedOnRead] = "blocked on a read operation",
[6 + BlockedOnWrite] = "blocked on a write operation",
Expand Down
1 change: 1 addition & 0 deletions rts/sm/Compact.c
Expand Up @@ -442,6 +442,7 @@ thread_TSO (StgTSO *tso)
thread_(&tso->global_link);

if ( tso->why_blocked == BlockedOnMVar
|| tso->why_blocked == BlockedOnMVarRead
|| tso->why_blocked == BlockedOnBlackHole
|| tso->why_blocked == BlockedOnMsgThrowTo
|| tso->why_blocked == NotBlocked
Expand Down
1 change: 1 addition & 0 deletions rts/sm/Sanity.c
Expand Up @@ -519,6 +519,7 @@ checkTSO(StgTSO *tso)
info == &stg_WHITEHOLE_info); // happens due to STM doing lockTSO()

if ( tso->why_blocked == BlockedOnMVar
|| tso->why_blocked == BlockedOnMVarRead
|| tso->why_blocked == BlockedOnBlackHole
|| tso->why_blocked == BlockedOnMsgThrowTo
|| tso->why_blocked == NotBlocked
Expand Down
1 change: 1 addition & 0 deletions rts/sm/Scav.c
Expand Up @@ -71,6 +71,7 @@ scavengeTSO (StgTSO *tso)

evacuate((StgClosure **)&tso->_link);
if ( tso->why_blocked == BlockedOnMVar
|| tso->why_blocked == BlockedOnMVarRead
|| tso->why_blocked == BlockedOnBlackHole
|| tso->why_blocked == BlockedOnMsgThrowTo
|| tso->why_blocked == NotBlocked
Expand Down

0 comments on commit 70e2063

Please sign in to comment.