Skip to content

Commit

Permalink
Use message-passing to implement throwTo in the RTS
Browse files Browse the repository at this point in the history
This replaces some complicated locking schemes with message-passing
in the implementation of throwTo. The benefits are

 - previously it was impossible to guarantee that a throwTo from
   a thread running on one CPU to a thread running on another CPU
   would be noticed, and we had to rely on the GC to pick up these
   forgotten exceptions. This no longer happens.

 - the locking regime is simpler (though the code is about the same
   size)

 - threads can be unblocked from a blocked_exceptions queue without
   having to traverse the whole queue now.  It's a rare case, but
   replaces an O(n) operation with an O(1).

 - generally we move in the direction of sharing less between
   Capabilities (aka HECs), which will become important with other
   changes we have planned.

Also in this patch I replaced several STM-specific closure types with
a generic MUT_PRIM closure type, which allowed a lot of code in the GC
and other places to go away, hence the line-count reduction.  The
message-passing changes resulted in about a net zero line-count
difference.
  • Loading branch information
simonmar committed Mar 11, 2010
1 parent 12cfec9 commit 7408b39
Show file tree
Hide file tree
Showing 35 changed files with 682 additions and 903 deletions.
22 changes: 12 additions & 10 deletions includes/rts/Constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,25 +208,27 @@
#define NotBlocked 0
#define BlockedOnMVar 1
#define BlockedOnBlackHole 2
#define BlockedOnException 3
#define BlockedOnRead 4
#define BlockedOnWrite 5
#define BlockedOnDelay 6
#define BlockedOnSTM 7
#define BlockedOnRead 3
#define BlockedOnWrite 4
#define BlockedOnDelay 5
#define BlockedOnSTM 6

/* Win32 only: */
#define BlockedOnDoProc 8
#define BlockedOnDoProc 7

/* Only relevant for PAR: */
/* blocked on a remote closure represented by a Global Address: */
#define BlockedOnGA 9
#define BlockedOnGA 8
/* same as above but without sending a Fetch message */
#define BlockedOnGA_NoSend 10
#define BlockedOnGA_NoSend 9
/* Only relevant for THREADED_RTS: */
#define BlockedOnCCall 11
#define BlockedOnCCall_NoUnblockExc 12
#define BlockedOnCCall 10
#define BlockedOnCCall_NoUnblockExc 11
/* same as above but don't unblock async exceptions in resumeThread() */

/* Involved in a message sent to tso->msg_cap */
#define BlockedOnMsgWakeup 12
#define BlockedOnMsgThrowTo 13
/*
* These constants are returned to the scheduler by a thread that has
* stopped for one reason or another. See typedef StgThreadReturnCode
Expand Down
10 changes: 0 additions & 10 deletions includes/rts/storage/ClosureMacros.h
Original file line number Diff line number Diff line change
Expand Up @@ -335,18 +335,8 @@ closure_sizeW_ (StgClosure *p, StgInfoTable *info)
return tso_sizeW((StgTSO *)p);
case BCO:
return bco_sizeW((StgBCO *)p);
case TVAR_WATCH_QUEUE:
return sizeofW(StgTVarWatchQueue);
case TVAR:
return sizeofW(StgTVar);
case TREC_CHUNK:
return sizeofW(StgTRecChunk);
case TREC_HEADER:
return sizeofW(StgTRecHeader);
case ATOMIC_INVARIANT:
return sizeofW(StgAtomicInvariant);
case INVARIANT_CHECK_QUEUE:
return sizeofW(StgInvariantCheckQueue);
default:
return sizeW_fromITBL(info);
}
Expand Down
22 changes: 9 additions & 13 deletions includes/rts/storage/ClosureTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,14 @@
#define MUT_VAR_CLEAN 50
#define MUT_VAR_DIRTY 51
#define WEAK 52
#define STABLE_NAME 53
#define TSO 54
#define TVAR_WATCH_QUEUE 55
#define INVARIANT_CHECK_QUEUE 56
#define ATOMIC_INVARIANT 57
#define TVAR 58
#define TREC_CHUNK 59
#define TREC_HEADER 60
#define ATOMICALLY_FRAME 61
#define CATCH_RETRY_FRAME 62
#define CATCH_STM_FRAME 63
#define WHITEHOLE 64
#define N_CLOSURE_TYPES 65
#define PRIM 53
#define MUT_PRIM 54
#define TSO 55
#define TREC_CHUNK 56
#define ATOMICALLY_FRAME 57
#define CATCH_RETRY_FRAME 58
#define CATCH_STM_FRAME 59
#define WHITEHOLE 60
#define N_CLOSURE_TYPES 61

#endif /* RTS_STORAGE_CLOSURETYPES_H */
25 changes: 24 additions & 1 deletion includes/rts/storage/Closures.h
Original file line number Diff line number Diff line change
Expand Up @@ -390,10 +390,10 @@ typedef struct StgInvariantCheckQueue_ {

struct StgTRecHeader_ {
StgHeader header;
TRecState state;
struct StgTRecHeader_ *enclosing_trec;
StgTRecChunk *current_chunk;
StgInvariantCheckQueue *invariants_to_check;
TRecState state;
};

typedef struct {
Expand All @@ -416,4 +416,27 @@ typedef struct {
StgClosure *alt_code;
} StgCatchRetryFrame;

/* ----------------------------------------------------------------------------
Messages
------------------------------------------------------------------------- */

typedef struct Message_ {
StgHeader header;
struct Message_ *link;
} Message;

typedef struct MessageWakeup_ {
StgHeader header;
Message *link;
StgTSO *tso;
} MessageWakeup;

typedef struct MessageThrowTo_ {
StgHeader header;
Message *link;
StgTSO *source;
StgTSO *target;
StgClosure *exception;
} MessageThrowTo;

#endif /* RTS_STORAGE_CLOSURES_H */
26 changes: 19 additions & 7 deletions includes/rts/storage/SMPClosureOps.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#else

EXTERN_INLINE StgInfoTable *lockClosure(StgClosure *p);
EXTERN_INLINE StgInfoTable *tryLockClosure(StgClosure *p);
EXTERN_INLINE void unlockClosure(StgClosure *p, const StgInfoTable *info);

#if defined(THREADED_RTS)
Expand All @@ -43,11 +44,15 @@ EXTERN_INLINE StgInfoTable *lockClosure(StgClosure *p)
} while (1);
}

EXTERN_INLINE void unlockClosure(StgClosure *p, const StgInfoTable *info)
EXTERN_INLINE StgInfoTable *tryLockClosure(StgClosure *p)
{
// This is a strictly ordered write, so we need a write_barrier():
write_barrier();
p->header.info = info;
StgWord info;
info = xchg((P_)(void *)&p->header.info, (W_)&stg_WHITEHOLE_info);
if (info != (W_)&stg_WHITEHOLE_info) {
return (StgInfoTable *)info;
} else {
return NULL;
}
}

#else /* !THREADED_RTS */
Expand All @@ -56,12 +61,19 @@ EXTERN_INLINE StgInfoTable *
lockClosure(StgClosure *p)
{ return (StgInfoTable *)p->header.info; }

EXTERN_INLINE void
unlockClosure(StgClosure *p STG_UNUSED, const StgInfoTable *info STG_UNUSED)
{ /* nothing */ }
EXTERN_INLINE StgInfoTable *
tryLockClosure(StgClosure *p)
{ return (StgInfoTable *)p->header.info; }

#endif /* THREADED_RTS */

EXTERN_INLINE void unlockClosure(StgClosure *p, const StgInfoTable *info)
{
// This is a strictly ordered write, so we need a write_barrier():
write_barrier();
p->header.info = info;
}

// Handy specialised versions of lockClosure()/unlockClosure()
EXTERN_INLINE void lockTSO(StgTSO *tso);
EXTERN_INLINE void lockTSO(StgTSO *tso)
Expand Down
19 changes: 10 additions & 9 deletions includes/rts/storage/TSO.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ typedef struct {
/* Reason for thread being blocked. See comment above struct StgTso_. */
typedef union {
StgClosure *closure;
struct StgTSO_ *tso;
struct MessageThrowTo_ *throwto;
struct MessageWakeup_ *wakeup;
StgInt fd; /* StgInt instead of int, so that it's the same size as the ptrs */
#if defined(mingw32_HOST_OS)
StgAsyncIOResult *async_result;
Expand Down Expand Up @@ -87,7 +88,8 @@ typedef struct StgTSO_ {
will already be dirty.
*/

struct StgTSO_* global_link; /* Links all threads together */
struct StgTSO_* global_link; // Links threads on the
// generation->threads lists

StgWord dirty; /* non-zero => dirty */
/*
Expand All @@ -108,9 +110,9 @@ typedef struct StgTSO_ {
* setTSOLink().
*/

StgWord16 what_next; /* Values defined in Constants.h */
StgWord16 why_blocked; /* Values defined in Constants.h */
StgWord32 flags;
StgWord16 what_next; // Values defined in Constants.h
StgWord16 why_blocked; // Values defined in Constants.h
StgWord32 flags; // Values defined in Constants.h
StgTSOBlockInfo block_info;
StgThreadID id;
int saved_errno;
Expand All @@ -123,7 +125,7 @@ typedef struct StgTSO_ {
exceptions. In order to access this field, the TSO must be
locked using lockClosure/unlockClosure (see SMP.h).
*/
struct StgTSO_ * blocked_exceptions;
struct MessageThrowTo_ * blocked_exceptions;

#ifdef TICKY_TICKY
/* TICKY-specific stuff would go here. */
Expand Down Expand Up @@ -167,15 +169,15 @@ void setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target);
tso->why_blocked tso->block_info location
----------------------------------------------------------------------
NotBlocked NULL runnable_queue, or running
NotBlocked END_TSO_QUEUE runnable_queue, or running
BlockedOnBlackHole the BLACKHOLE blackhole_queue
BlockedOnMVar the MVAR the MVAR's queue
BlockedOnSTM END_TSO_QUEUE STM wait queue(s)
BlockedOnException the TSO TSO->blocked_exception
BlockedOnMsgThrowTo MessageThrowTo * TSO->blocked_exception
BlockedOnRead NULL blocked_queue
BlockedOnWrite NULL blocked_queue
Expand All @@ -189,7 +191,6 @@ void setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target);
tso->what_next == ThreadComplete or ThreadKilled
tso->link == (could be on some queue somewhere)
tso->su == tso->stack + tso->stack_size
tso->sp == tso->stack + tso->stack_size - 1 (i.e. top stack word)
tso->sp[0] == return value of thread, if what_next == ThreadComplete,
exception , if what_next == ThreadKilled
Expand Down
6 changes: 4 additions & 2 deletions includes/stg/MiscClosures.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ RTS_INFO(stg_MUT_ARR_PTRS_FROZEN0_info);
RTS_INFO(stg_MUT_VAR_CLEAN_info);
RTS_INFO(stg_MUT_VAR_DIRTY_info);
RTS_INFO(stg_END_TSO_QUEUE_info);
RTS_INFO(stg_MSG_WAKEUP_info);
RTS_INFO(stg_MSG_THROWTO_info);
RTS_INFO(stg_MUT_CONS_info);
RTS_INFO(stg_catch_info);
RTS_INFO(stg_PAP_info);
Expand Down Expand Up @@ -163,6 +165,8 @@ RTS_ENTRY(stg_MUT_ARR_PTRS_FROZEN0_entry);
RTS_ENTRY(stg_MUT_VAR_CLEAN_entry);
RTS_ENTRY(stg_MUT_VAR_DIRTY_entry);
RTS_ENTRY(stg_END_TSO_QUEUE_entry);
RTS_ENTRY(stg_MSG_WAKEUP_entry);
RTS_ENTRY(stg_MSG_THROWTO_entry);
RTS_ENTRY(stg_MUT_CONS_entry);
RTS_ENTRY(stg_catch_entry);
RTS_ENTRY(stg_PAP_entry);
Expand Down Expand Up @@ -205,8 +209,6 @@ RTS_CLOSURE(stg_END_STM_CHUNK_LIST_closure);
RTS_CLOSURE(stg_NO_TREC_closure);

RTS_ENTRY(stg_NO_FINALIZER_entry);
RTS_ENTRY(stg_END_EXCEPTION_LIST_entry);
RTS_ENTRY(stg_EXCEPTION_CONS_entry);

#if IN_STG_CODE
extern DLL_IMPORT_RTS StgWordArray stg_CHARLIKE_closure;
Expand Down
67 changes: 42 additions & 25 deletions rts/Capability.c
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,7 @@ initCapability( Capability *cap, nat i )
cap->suspended_ccalls = NULL;
cap->returning_tasks_hd = NULL;
cap->returning_tasks_tl = NULL;
cap->wakeup_queue_hd = END_TSO_QUEUE;
cap->wakeup_queue_tl = END_TSO_QUEUE;
cap->inbox = (Message*)END_TSO_QUEUE;
cap->sparks_created = 0;
cap->sparks_converted = 0;
cap->sparks_pruned = 0;
Expand Down Expand Up @@ -419,7 +418,7 @@ releaseCapability_ (Capability* cap,
// If we have an unbound thread on the run queue, or if there's
// anything else to do, give the Capability to a worker thread.
if (always_wakeup ||
!emptyRunQueue(cap) || !emptyWakeupQueue(cap) ||
!emptyRunQueue(cap) || !emptyInbox(cap) ||
!emptySparkPoolCap(cap) || globalWorkToDo()) {
if (cap->spare_workers) {
giveCapabilityToTask(cap,cap->spare_workers);
Expand Down Expand Up @@ -645,11 +644,11 @@ yieldCapability (Capability** pCap, Task *task)
* ------------------------------------------------------------------------- */

void
wakeupThreadOnCapability (Capability *my_cap,
wakeupThreadOnCapability (Capability *cap,
Capability *other_cap,
StgTSO *tso)
{
ACQUIRE_LOCK(&other_cap->lock);
MessageWakeup *msg;

// ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
if (tso->bound) {
Expand All @@ -658,27 +657,20 @@ wakeupThreadOnCapability (Capability *my_cap,
}
tso->cap = other_cap;

ASSERT(tso->bound ? tso->bound->task->cap == other_cap : 1);
ASSERT(tso->why_blocked != BlockedOnMsgWakeup ||
tso->block_info.closure->header.info == &stg_IND_info);

if (other_cap->running_task == NULL) {
// nobody is running this Capability, we can add our thread
// directly onto the run queue and start up a Task to run it.
ASSERT(tso->block_info.closure->header.info != &stg_MSG_WAKEUP_info);

other_cap->running_task = myTask();
// precond for releaseCapability_() and appendToRunQueue()
msg = (MessageWakeup*) allocate(cap, sizeofW(MessageWakeup));
msg->header.info = &stg_MSG_WAKEUP_info;
msg->tso = tso;
tso->block_info.closure = (StgClosure *)msg;
dirty_TSO(cap, tso);
write_barrier();
tso->why_blocked = BlockedOnMsgWakeup;

appendToRunQueue(other_cap,tso);

releaseCapability_(other_cap,rtsFalse);
} else {
appendToWakeupQueue(my_cap,other_cap,tso);
other_cap->context_switch = 1;
// someone is running on this Capability, so it cannot be
// freed without first checking the wakeup queue (see
// releaseCapability_).
}

RELEASE_LOCK(&other_cap->lock);
sendMessage(other_cap, (Message*)msg);
}

/* ----------------------------------------------------------------------------
Expand Down Expand Up @@ -881,8 +873,7 @@ markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta,
evac(user, (StgClosure **)(void *)&cap->run_queue_hd);
evac(user, (StgClosure **)(void *)&cap->run_queue_tl);
#if defined(THREADED_RTS)
evac(user, (StgClosure **)(void *)&cap->wakeup_queue_hd);
evac(user, (StgClosure **)(void *)&cap->wakeup_queue_tl);
evac(user, (StgClosure **)(void *)&cap->inbox);
#endif
for (incall = cap->suspended_ccalls; incall != NULL;
incall=incall->next) {
Expand Down Expand Up @@ -910,3 +901,29 @@ markCapabilities (evac_fn evac, void *user)
{
markSomeCapabilities(evac, user, 0, 1, rtsFalse);
}

/* -----------------------------------------------------------------------------
Messages
-------------------------------------------------------------------------- */

#ifdef THREADED_RTS

void sendMessage(Capability *cap, Message *msg)
{
ACQUIRE_LOCK(&cap->lock);

msg->link = cap->inbox;
cap->inbox = msg;

if (cap->running_task == NULL) {
cap->running_task = myTask();
// precond for releaseCapability_()
releaseCapability_(cap,rtsFalse);
} else {
contextSwitchCapability(cap);
}

RELEASE_LOCK(&cap->lock);
}

#endif // THREADED_RTS
Loading

0 comments on commit 7408b39

Please sign in to comment.