Permalink
Browse files

Use message-passing to implement throwTo in the RTS

This replaces some complicated locking schemes with message-passing
in the implementation of throwTo. The benefits are

 - previously it was impossible to guarantee that a throwTo from
   a thread running on one CPU to a thread running on another CPU
   would be noticed, and we had to rely on the GC to pick up these
   forgotten exceptions. This no longer happens.

 - the locking regime is simpler (though the code is about the same
   size)

 - threads can be unblocked from a blocked_exceptions queue without
   having to traverse the whole queue now.  It's a rare case, but
   replaces an O(n) operation with an O(1).

 - generally we move in the direction of sharing less between
   Capabilities (aka HECs), which will become important with other
   changes we have planned.

Also in this patch I replaced several STM-specific closure types with
a generic MUT_PRIM closure type, which allowed a lot of code in the GC
and other places to go away, hence the line-count reduction.  The
message-passing changes resulted in about a net zero line-count
difference.
  • Loading branch information...
simonmar committed Mar 11, 2010
1 parent 12cfec9 commit 7408b39235bccdcde48df2a73337ff976fbc09b7
View
@@ -208,25 +208,27 @@
#define NotBlocked 0
#define BlockedOnMVar 1
#define BlockedOnBlackHole 2
-#define BlockedOnException 3
-#define BlockedOnRead 4
-#define BlockedOnWrite 5
-#define BlockedOnDelay 6
-#define BlockedOnSTM 7
+#define BlockedOnRead 3
+#define BlockedOnWrite 4
+#define BlockedOnDelay 5
+#define BlockedOnSTM 6
/* Win32 only: */
-#define BlockedOnDoProc 8
+#define BlockedOnDoProc 7
/* Only relevant for PAR: */
/* blocked on a remote closure represented by a Global Address: */
-#define BlockedOnGA 9
+#define BlockedOnGA 8
/* same as above but without sending a Fetch message */
-#define BlockedOnGA_NoSend 10
+#define BlockedOnGA_NoSend 9
/* Only relevant for THREADED_RTS: */
-#define BlockedOnCCall 11
-#define BlockedOnCCall_NoUnblockExc 12
+#define BlockedOnCCall 10
+#define BlockedOnCCall_NoUnblockExc 11
/* same as above but don't unblock async exceptions in resumeThread() */
+/* Involved in a message sent to tso->msg_cap */
+#define BlockedOnMsgWakeup 12
+#define BlockedOnMsgThrowTo 13
/*
* These constants are returned to the scheduler by a thread that has
* stopped for one reason or another. See typedef StgThreadReturnCode
@@ -335,18 +335,8 @@ closure_sizeW_ (StgClosure *p, StgInfoTable *info)
return tso_sizeW((StgTSO *)p);
case BCO:
return bco_sizeW((StgBCO *)p);
- case TVAR_WATCH_QUEUE:
- return sizeofW(StgTVarWatchQueue);
- case TVAR:
- return sizeofW(StgTVar);
case TREC_CHUNK:
return sizeofW(StgTRecChunk);
- case TREC_HEADER:
- return sizeofW(StgTRecHeader);
- case ATOMIC_INVARIANT:
- return sizeofW(StgAtomicInvariant);
- case INVARIANT_CHECK_QUEUE:
- return sizeofW(StgInvariantCheckQueue);
default:
return sizeW_fromITBL(info);
}
@@ -74,18 +74,14 @@
#define MUT_VAR_CLEAN 50
#define MUT_VAR_DIRTY 51
#define WEAK 52
-#define STABLE_NAME 53
-#define TSO 54
-#define TVAR_WATCH_QUEUE 55
-#define INVARIANT_CHECK_QUEUE 56
-#define ATOMIC_INVARIANT 57
-#define TVAR 58
-#define TREC_CHUNK 59
-#define TREC_HEADER 60
-#define ATOMICALLY_FRAME 61
-#define CATCH_RETRY_FRAME 62
-#define CATCH_STM_FRAME 63
-#define WHITEHOLE 64
-#define N_CLOSURE_TYPES 65
+#define PRIM 53
+#define MUT_PRIM 54
+#define TSO 55
+#define TREC_CHUNK 56
+#define ATOMICALLY_FRAME 57
+#define CATCH_RETRY_FRAME 58
+#define CATCH_STM_FRAME 59
+#define WHITEHOLE 60
+#define N_CLOSURE_TYPES 61
#endif /* RTS_STORAGE_CLOSURETYPES_H */
@@ -390,10 +390,10 @@ typedef struct StgInvariantCheckQueue_ {
struct StgTRecHeader_ {
StgHeader header;
- TRecState state;
struct StgTRecHeader_ *enclosing_trec;
StgTRecChunk *current_chunk;
StgInvariantCheckQueue *invariants_to_check;
+ TRecState state;
};
typedef struct {
@@ -416,4 +416,27 @@ typedef struct {
StgClosure *alt_code;
} StgCatchRetryFrame;
+/* ----------------------------------------------------------------------------
+ Messages
+ ------------------------------------------------------------------------- */
+
+typedef struct Message_ {
+ StgHeader header;
+ struct Message_ *link;
+} Message;
+
+typedef struct MessageWakeup_ {
+ StgHeader header;
+ Message *link;
+ StgTSO *tso;
+} MessageWakeup;
+
+typedef struct MessageThrowTo_ {
+ StgHeader header;
+ Message *link;
+ StgTSO *source;
+ StgTSO *target;
+ StgClosure *exception;
+} MessageThrowTo;
+
#endif /* RTS_STORAGE_CLOSURES_H */
@@ -18,6 +18,7 @@
#else
EXTERN_INLINE StgInfoTable *lockClosure(StgClosure *p);
+EXTERN_INLINE StgInfoTable *tryLockClosure(StgClosure *p);
EXTERN_INLINE void unlockClosure(StgClosure *p, const StgInfoTable *info);
#if defined(THREADED_RTS)
@@ -43,11 +44,15 @@ EXTERN_INLINE StgInfoTable *lockClosure(StgClosure *p)
} while (1);
}
-EXTERN_INLINE void unlockClosure(StgClosure *p, const StgInfoTable *info)
+EXTERN_INLINE StgInfoTable *tryLockClosure(StgClosure *p)
{
- // This is a strictly ordered write, so we need a write_barrier():
- write_barrier();
- p->header.info = info;
+ StgWord info;
+ info = xchg((P_)(void *)&p->header.info, (W_)&stg_WHITEHOLE_info);
+ if (info != (W_)&stg_WHITEHOLE_info) {
+ return (StgInfoTable *)info;
+ } else {
+ return NULL;
+ }
}
#else /* !THREADED_RTS */
@@ -56,12 +61,19 @@ EXTERN_INLINE StgInfoTable *
lockClosure(StgClosure *p)
{ return (StgInfoTable *)p->header.info; }
-EXTERN_INLINE void
-unlockClosure(StgClosure *p STG_UNUSED, const StgInfoTable *info STG_UNUSED)
-{ /* nothing */ }
+EXTERN_INLINE StgInfoTable *
+tryLockClosure(StgClosure *p)
+{ return (StgInfoTable *)p->header.info; }
#endif /* THREADED_RTS */
+EXTERN_INLINE void unlockClosure(StgClosure *p, const StgInfoTable *info)
+{
+ // This is a strictly ordered write, so we need a write_barrier():
+ write_barrier();
+ p->header.info = info;
+}
+
// Handy specialised versions of lockClosure()/unlockClosure()
EXTERN_INLINE void lockTSO(StgTSO *tso);
EXTERN_INLINE void lockTSO(StgTSO *tso)
View
@@ -46,7 +46,8 @@ typedef struct {
/* Reason for thread being blocked. See comment above struct StgTso_. */
typedef union {
StgClosure *closure;
- struct StgTSO_ *tso;
+ struct MessageThrowTo_ *throwto;
+ struct MessageWakeup_ *wakeup;
StgInt fd; /* StgInt instead of int, so that it's the same size as the ptrs */
#if defined(mingw32_HOST_OS)
StgAsyncIOResult *async_result;
@@ -87,7 +88,8 @@ typedef struct StgTSO_ {
will already be dirty.
*/
- struct StgTSO_* global_link; /* Links all threads together */
+ struct StgTSO_* global_link; // Links threads on the
+ // generation->threads lists
StgWord dirty; /* non-zero => dirty */
/*
@@ -108,9 +110,9 @@ typedef struct StgTSO_ {
* setTSOLink().
*/
- StgWord16 what_next; /* Values defined in Constants.h */
- StgWord16 why_blocked; /* Values defined in Constants.h */
- StgWord32 flags;
+ StgWord16 what_next; // Values defined in Constants.h
+ StgWord16 why_blocked; // Values defined in Constants.h
+ StgWord32 flags; // Values defined in Constants.h
StgTSOBlockInfo block_info;
StgThreadID id;
int saved_errno;
@@ -123,7 +125,7 @@ typedef struct StgTSO_ {
exceptions. In order to access this field, the TSO must be
locked using lockClosure/unlockClosure (see SMP.h).
*/
- struct StgTSO_ * blocked_exceptions;
+ struct MessageThrowTo_ * blocked_exceptions;
#ifdef TICKY_TICKY
/* TICKY-specific stuff would go here. */
@@ -167,15 +169,15 @@ void setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target);
tso->why_blocked tso->block_info location
----------------------------------------------------------------------
- NotBlocked NULL runnable_queue, or running
+ NotBlocked END_TSO_QUEUE runnable_queue, or running
BlockedOnBlackHole the BLACKHOLE blackhole_queue
BlockedOnMVar the MVAR the MVAR's queue
BlockedOnSTM END_TSO_QUEUE STM wait queue(s)
- BlockedOnException the TSO TSO->blocked_exception
+ BlockedOnMsgThrowTo MessageThrowTo * TSO->blocked_exception
BlockedOnRead NULL blocked_queue
BlockedOnWrite NULL blocked_queue
@@ -189,7 +191,6 @@ void setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target);
tso->what_next == ThreadComplete or ThreadKilled
tso->link == (could be on some queue somewhere)
- tso->su == tso->stack + tso->stack_size
tso->sp == tso->stack + tso->stack_size - 1 (i.e. top stack word)
tso->sp[0] == return value of thread, if what_next == ThreadComplete,
exception , if what_next == ThreadKilled
@@ -114,6 +114,8 @@ RTS_INFO(stg_MUT_ARR_PTRS_FROZEN0_info);
RTS_INFO(stg_MUT_VAR_CLEAN_info);
RTS_INFO(stg_MUT_VAR_DIRTY_info);
RTS_INFO(stg_END_TSO_QUEUE_info);
+RTS_INFO(stg_MSG_WAKEUP_info);
+RTS_INFO(stg_MSG_THROWTO_info);
RTS_INFO(stg_MUT_CONS_info);
RTS_INFO(stg_catch_info);
RTS_INFO(stg_PAP_info);
@@ -163,6 +165,8 @@ RTS_ENTRY(stg_MUT_ARR_PTRS_FROZEN0_entry);
RTS_ENTRY(stg_MUT_VAR_CLEAN_entry);
RTS_ENTRY(stg_MUT_VAR_DIRTY_entry);
RTS_ENTRY(stg_END_TSO_QUEUE_entry);
+RTS_ENTRY(stg_MSG_WAKEUP_entry);
+RTS_ENTRY(stg_MSG_THROWTO_entry);
RTS_ENTRY(stg_MUT_CONS_entry);
RTS_ENTRY(stg_catch_entry);
RTS_ENTRY(stg_PAP_entry);
@@ -205,8 +209,6 @@ RTS_CLOSURE(stg_END_STM_CHUNK_LIST_closure);
RTS_CLOSURE(stg_NO_TREC_closure);
RTS_ENTRY(stg_NO_FINALIZER_entry);
-RTS_ENTRY(stg_END_EXCEPTION_LIST_entry);
-RTS_ENTRY(stg_EXCEPTION_CONS_entry);
#if IN_STG_CODE
extern DLL_IMPORT_RTS StgWordArray stg_CHARLIKE_closure;
View
@@ -223,8 +223,7 @@ initCapability( Capability *cap, nat i )
cap->suspended_ccalls = NULL;
cap->returning_tasks_hd = NULL;
cap->returning_tasks_tl = NULL;
- cap->wakeup_queue_hd = END_TSO_QUEUE;
- cap->wakeup_queue_tl = END_TSO_QUEUE;
+ cap->inbox = (Message*)END_TSO_QUEUE;
cap->sparks_created = 0;
cap->sparks_converted = 0;
cap->sparks_pruned = 0;
@@ -419,7 +418,7 @@ releaseCapability_ (Capability* cap,
// If we have an unbound thread on the run queue, or if there's
// anything else to do, give the Capability to a worker thread.
if (always_wakeup ||
- !emptyRunQueue(cap) || !emptyWakeupQueue(cap) ||
+ !emptyRunQueue(cap) || !emptyInbox(cap) ||
!emptySparkPoolCap(cap) || globalWorkToDo()) {
if (cap->spare_workers) {
giveCapabilityToTask(cap,cap->spare_workers);
@@ -645,11 +644,11 @@ yieldCapability (Capability** pCap, Task *task)
* ------------------------------------------------------------------------- */
void
-wakeupThreadOnCapability (Capability *my_cap,
+wakeupThreadOnCapability (Capability *cap,
Capability *other_cap,
StgTSO *tso)
{
- ACQUIRE_LOCK(&other_cap->lock);
+ MessageWakeup *msg;
// ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
if (tso->bound) {
@@ -658,27 +657,20 @@ wakeupThreadOnCapability (Capability *my_cap,
}
tso->cap = other_cap;
- ASSERT(tso->bound ? tso->bound->task->cap == other_cap : 1);
+ ASSERT(tso->why_blocked != BlockedOnMsgWakeup ||
+ tso->block_info.closure->header.info == &stg_IND_info);
- if (other_cap->running_task == NULL) {
- // nobody is running this Capability, we can add our thread
- // directly onto the run queue and start up a Task to run it.
+ ASSERT(tso->block_info.closure->header.info != &stg_MSG_WAKEUP_info);
- other_cap->running_task = myTask();
- // precond for releaseCapability_() and appendToRunQueue()
+ msg = (MessageWakeup*) allocate(cap, sizeofW(MessageWakeup));
+ msg->header.info = &stg_MSG_WAKEUP_info;
+ msg->tso = tso;
+ tso->block_info.closure = (StgClosure *)msg;
+ dirty_TSO(cap, tso);
+ write_barrier();
+ tso->why_blocked = BlockedOnMsgWakeup;
- appendToRunQueue(other_cap,tso);
-
- releaseCapability_(other_cap,rtsFalse);
- } else {
- appendToWakeupQueue(my_cap,other_cap,tso);
- other_cap->context_switch = 1;
- // someone is running on this Capability, so it cannot be
- // freed without first checking the wakeup queue (see
- // releaseCapability_).
- }
-
- RELEASE_LOCK(&other_cap->lock);
+ sendMessage(other_cap, (Message*)msg);
}
/* ----------------------------------------------------------------------------
@@ -881,8 +873,7 @@ markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta,
evac(user, (StgClosure **)(void *)&cap->run_queue_hd);
evac(user, (StgClosure **)(void *)&cap->run_queue_tl);
#if defined(THREADED_RTS)
- evac(user, (StgClosure **)(void *)&cap->wakeup_queue_hd);
- evac(user, (StgClosure **)(void *)&cap->wakeup_queue_tl);
+ evac(user, (StgClosure **)(void *)&cap->inbox);
#endif
for (incall = cap->suspended_ccalls; incall != NULL;
incall=incall->next) {
@@ -910,3 +901,29 @@ markCapabilities (evac_fn evac, void *user)
{
markSomeCapabilities(evac, user, 0, 1, rtsFalse);
}
+
+/* -----------------------------------------------------------------------------
+ Messages
+ -------------------------------------------------------------------------- */
+
+#ifdef THREADED_RTS
+
+void sendMessage(Capability *cap, Message *msg)
+{
+ ACQUIRE_LOCK(&cap->lock);
+
+ msg->link = cap->inbox;
+ cap->inbox = msg;
+
+ if (cap->running_task == NULL) {
+ cap->running_task = myTask();
+ // precond for releaseCapability_()
+ releaseCapability_(cap,rtsFalse);
+ } else {
+ contextSwitchCapability(cap);
+ }
+
+ RELEASE_LOCK(&cap->lock);
+}
+
+#endif // THREADED_RTS
Oops, something went wrong.

0 comments on commit 7408b39

Please sign in to comment.