Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

New implementation of BLACKHOLEs

This replaces the global blackhole_queue with a clever scheme that
enables us to queue up blocked threads on the closure that they are
blocked on, while still avoiding atomic instructions in the common
case.

Advantages:

 - gets rid of a locked global data structure and some tricky GC code
   (replacing it with some per-thread data structures and different
   tricky GC code :)

 - wakeups are more prompt: parallel/concurrent performance should
   benefit.  I haven't seen anything dramatic in the parallel
   benchmarks so far, but a couple of threading benchmarks do improve
   a bit.

 - waking up a thread blocked on a blackhole is now O(1) (e.g. if
   it is the target of throwTo).

 - less sharing and better separation of Capabilities: communication
   is done with messages, the data structures are strictly owned by a
   Capability and cannot be modified except by sending messages.

 - this change will utlimately enable us to do more intelligent
   scheduling when threads block on each other.  This is what started
   off the whole thing, but it isn't done yet (#3838).

I'll be documenting all this on the wiki in due course.
  • Loading branch information...
commit 5d52d9b64c21dcf77849866584744722f8121389 1 parent 79957d7
@simonmar simonmar authored
Showing with 1,208 additions and 825 deletions.
  1. +2 −0  compiler/cmm/CLabel.hs
  2. +0 −1  compiler/codeGen/CgCallConv.hs
  3. +19 −7 compiler/codeGen/CgClosure.lhs
  4. +0 −1  compiler/codeGen/CgMonad.lhs
  5. +23 −5 compiler/codeGen/CgStackery.lhs
  6. +8 −0 includes/Rts.h
  7. +12 −0 includes/mkDerivedConstants.c
  8. +7 −1 includes/rts/storage/ClosureMacros.h
  9. +1 −1  includes/rts/storage/ClosureTypes.h
  10. +16 −1 includes/rts/storage/Closures.h
  11. +25 −1 includes/rts/storage/TSO.h
  12. +13 −7 includes/stg/MiscClosures.h
  13. +2 −61 rts/Capability.c
  14. +11 −8 rts/Capability.h
  15. +1 −1  rts/ClosureFlags.c
  16. +0 −2  rts/FrontPanel.c
  17. +19 −17 rts/HeapStackCheck.cmm
  18. +3 −1 rts/Interpreter.c
  19. +1 −1  rts/LdvProfile.c
  20. +4 −0 rts/Linker.c
  21. +296 −0 rts/Messages.c
  22. +18 −0 rts/Messages.h
  23. +0 −32 rts/PrimOps.cmm
  24. +7 −9 rts/Printer.c
  25. +1 −1  rts/ProfHeap.c
  26. +69 −106 rts/RaiseAsync.c
  27. +2 −7 rts/RetainerProfile.c
  28. +1 −1  rts/STM.c
  29. +30 −213 rts/Schedule.c
  30. +0 −9 rts/Schedule.h
  31. +88 −73 rts/StgMiscClosures.cmm
  32. +38 −28 rts/ThreadPaused.c
  33. +205 −9 rts/Threads.c
  34. +18 −3 rts/Threads.h
  35. +0 −2  rts/Timer.c
  36. +66 −34 rts/Updates.cmm
  37. +6 −17 rts/Updates.h
  38. +3 −2 rts/posix/OSMem.c
  39. +1 −4 rts/sm/Compact.c
  40. +68 −26 rts/sm/Evac.c
  41. +0 −7 rts/sm/GC.c
  42. +0 −58 rts/sm/MarkWeak.c
  43. +0 −1  rts/sm/MarkWeak.h
  44. +23 −4 rts/sm/Sanity.c
  45. +91 −44 rts/sm/Scav.c
  46. +9 −18 rts/sm/Storage.c
  47. +1 −1  utils/genapply/GenApply.hs
View
2  compiler/cmm/CLabel.hs
@@ -58,6 +58,7 @@ module CLabel (
mkSplitMarkerLabel,
mkDirty_MUT_VAR_Label,
mkUpdInfoLabel,
+ mkBHUpdInfoLabel,
mkIndStaticInfoLabel,
mkMainCapabilityLabel,
mkMAP_FROZEN_infoLabel,
@@ -400,6 +401,7 @@ mkStaticConEntryLabel name c = IdLabel name c StaticConEntry
mkSplitMarkerLabel = CmmLabel rtsPackageId (fsLit "__stg_split_marker") CmmCode
mkDirty_MUT_VAR_Label = CmmLabel rtsPackageId (fsLit "dirty_MUT_VAR") CmmCode
mkUpdInfoLabel = CmmLabel rtsPackageId (fsLit "stg_upd_frame") CmmInfo
+mkBHUpdInfoLabel = CmmLabel rtsPackageId (fsLit "stg_bh_upd_frame" ) CmmInfo
mkIndStaticInfoLabel = CmmLabel rtsPackageId (fsLit "stg_IND_STATIC") CmmInfo
mkMainCapabilityLabel = CmmLabel rtsPackageId (fsLit "MainCapability") CmmData
mkMAP_FROZEN_infoLabel = CmmLabel rtsPackageId (fsLit "stg_MUT_ARR_PTRS_FROZEN0") CmmInfo
View
1  compiler/codeGen/CgCallConv.hs
@@ -284,7 +284,6 @@ getSequelAmode
OnStack -> do { sp_rel <- getSpRelOffset virt_sp
; returnFC (CmmLoad sp_rel bWord) }
- UpdateCode -> returnFC (CmmLit (CmmLabel mkUpdInfoLabel))
CaseAlts lbl _ _ -> returnFC (CmmLit (CmmLabel lbl))
}
View
26 compiler/codeGen/CgClosure.lhs
@@ -474,7 +474,12 @@ emitBlackHoleCode is_single_entry = do
then do
tickyBlackHole (not is_single_entry)
let bh_info = CmmReg (CmmGlobal EagerBlackholeInfo)
- stmtC (CmmStore (CmmReg nodeReg) bh_info)
+ stmtsC [
+ CmmStore (cmmOffsetW (CmmReg nodeReg) fixedHdrSize)
+ (CmmReg (CmmGlobal CurrentTSO)),
+ CmmCall (CmmPrim MO_WriteBarrier) [] [] CmmUnsafe CmmMayReturn,
+ CmmStore (CmmReg nodeReg) bh_info
+ ]
else
nopC
\end{code}
@@ -489,17 +494,23 @@ setupUpdate closure_info code
= code
| not (isStaticClosure closure_info)
- = if closureUpdReqd closure_info
- then do { tickyPushUpdateFrame; pushUpdateFrame (CmmReg nodeReg) code }
- else do { tickyUpdateFrameOmitted; code }
-
+ = do
+ if not (closureUpdReqd closure_info)
+ then do tickyUpdateFrameOmitted; code
+ else do
+ tickyPushUpdateFrame
+ dflags <- getDynFlags
+ if not opt_SccProfilingOn && dopt Opt_EagerBlackHoling dflags
+ then pushBHUpdateFrame (CmmReg nodeReg) code
+ else pushUpdateFrame (CmmReg nodeReg) code
+
| otherwise -- A static closure
= do { tickyUpdateBhCaf closure_info
; if closureUpdReqd closure_info
then do -- Blackhole the (updatable) CAF:
{ upd_closure <- link_caf closure_info True
- ; pushUpdateFrame upd_closure code }
+ ; pushBHUpdateFrame upd_closure code }
else do
{ -- krc: removed some ticky-related code here.
; tickyUpdateFrameOmitted
@@ -553,7 +564,8 @@ link_caf cl_info _is_upd = do
{ -- Alloc black hole specifying CC_HDR(Node) as the cost centre
; let use_cc = costCentreFrom (CmmReg nodeReg)
blame_cc = use_cc
- ; hp_offset <- allocDynClosure bh_cl_info use_cc blame_cc []
+ tso = CmmReg (CmmGlobal CurrentTSO)
+ ; hp_offset <- allocDynClosure bh_cl_info use_cc blame_cc [(tso,fixedHdrSize)]
; hp_rel <- getHpRelOffset hp_offset
-- Call the RTS function newCAF to add the CAF to the CafList
View
1  compiler/codeGen/CgMonad.lhs
@@ -169,7 +169,6 @@ block.
\begin{code}
data Sequel
= OnStack -- Continuation is on the stack
- | UpdateCode -- Continuation is update
| CaseAlts
CLabel -- Jump to this; if the continuation is for a vectored
View
28 compiler/codeGen/CgStackery.lhs
@@ -17,7 +17,7 @@ module CgStackery (
setStackFrame, getStackFrame,
mkVirtStkOffsets, mkStkAmodes,
freeStackSlots,
- pushUpdateFrame, emitPushUpdateFrame,
+ pushUpdateFrame, pushBHUpdateFrame, emitPushUpdateFrame,
) where
#include "HsVersions.h"
@@ -265,6 +265,14 @@ to reflect the frame pushed.
\begin{code}
pushUpdateFrame :: CmmExpr -> Code -> Code
pushUpdateFrame updatee code
+ = pushSpecUpdateFrame mkUpdInfoLabel updatee code
+
+pushBHUpdateFrame :: CmmExpr -> Code -> Code
+pushBHUpdateFrame updatee code
+ = pushSpecUpdateFrame mkBHUpdInfoLabel updatee code
+
+pushSpecUpdateFrame :: CLabel -> CmmExpr -> Code -> Code
+pushSpecUpdateFrame lbl updatee code
= do {
when debugIsOn $ do
{ EndOfBlockInfo _ sequel <- getEndOfBlockInfo ;
@@ -277,15 +285,25 @@ pushUpdateFrame updatee code
-- The location of the lowest-address
-- word of the update frame itself
- ; setEndOfBlockInfo (EndOfBlockInfo vsp UpdateCode) $
- do { emitPushUpdateFrame frame_addr updatee
+ -- NB. we used to set the Sequel to 'UpdateCode' so
+ -- that we could jump directly to the update code if
+ -- we know that the next frame on the stack is an
+ -- update frame. However, the RTS can sometimes
+ -- change an update frame into something else (see
+ -- e.g. Note [upd-black-hole] in rts/sm/Scav.c), so we
+ -- no longer make this assumption.
+ ; setEndOfBlockInfo (EndOfBlockInfo vsp OnStack) $
+ do { emitSpecPushUpdateFrame lbl frame_addr updatee
; code }
}
emitPushUpdateFrame :: CmmExpr -> CmmExpr -> Code
-emitPushUpdateFrame frame_addr updatee = do
+emitPushUpdateFrame = emitSpecPushUpdateFrame mkUpdInfoLabel
+
+emitSpecPushUpdateFrame :: CLabel -> CmmExpr -> CmmExpr -> Code
+emitSpecPushUpdateFrame lbl frame_addr updatee = do
stmtsC [ -- Set the info word
- CmmStore frame_addr (mkLblExpr mkUpdInfoLabel)
+ CmmStore frame_addr (mkLblExpr lbl)
, -- And the updatee
CmmStore (cmmOffsetB frame_addr off_updatee) updatee ]
initUpdFrameProf frame_addr
View
8 includes/Rts.h
@@ -106,10 +106,18 @@ void _assertFail(const char *filename, unsigned int linenum)
else \
_assertFail(__FILE__, __LINE__)
+#define CHECKM(predicate, msg, ...) \
+ if (predicate) \
+ /*null*/; \
+ else \
+ barf(msg, ##__VA_ARGS__)
+
#ifndef DEBUG
#define ASSERT(predicate) /* nothing */
+#define ASSERTM(predicate,msg,...) /* nothing */
#else
#define ASSERT(predicate) CHECK(predicate)
+#define ASSERTM(predicate,msg,...) CHECKM(predicate,msg,##__VA_ARGS__)
#endif /* DEBUG */
/*
View
12 includes/mkDerivedConstants.c
@@ -291,6 +291,7 @@ main(int argc, char *argv[])
closure_field(StgTSO, trec);
closure_field(StgTSO, flags);
closure_field(StgTSO, dirty);
+ closure_field(StgTSO, bq);
closure_field_("StgTSO_CCCS", StgTSO, prof.CCCS);
tso_field(StgTSO, sp);
tso_field_offset(StgTSO, stack);
@@ -382,6 +383,17 @@ main(int argc, char *argv[])
closure_size(StgStableName);
closure_field(StgStableName,sn);
+ closure_size(StgBlockingQueue);
+ closure_field(StgBlockingQueue, bh);
+ closure_field(StgBlockingQueue, owner);
+ closure_field(StgBlockingQueue, queue);
+ closure_field(StgBlockingQueue, link);
+
+ closure_size(MessageBlackHole);
+ closure_field(MessageBlackHole, link);
+ closure_field(MessageBlackHole, tso);
+ closure_field(MessageBlackHole, bh);
+
struct_field_("RtsFlags_ProfFlags_showCCSOnException",
RTS_FLAGS, ProfFlags.showCCSOnException);
struct_field_("RtsFlags_DebugFlags_apply",
View
8 includes/rts/storage/ClosureMacros.h
@@ -129,6 +129,12 @@
SET_HDR(c,info,costCentreStack); \
(c)->words = n_words;
+// Use when changing a closure from one kind to another
+#define OVERWRITE_INFO(c, new_info) \
+ LDV_RECORD_DEAD_FILL_SLOP_DYNAMIC((StgClosure *)(c)); \
+ SET_INFO((c), (new_info)); \
+ LDV_RECORD_CREATE(c);
+
/* -----------------------------------------------------------------------------
How to get hold of the static link field for a static closure.
-------------------------------------------------------------------------- */
@@ -249,7 +255,7 @@ INLINE_HEADER StgOffset THUNK_SELECTOR_sizeW ( void )
{ return sizeofW(StgSelector); }
INLINE_HEADER StgOffset BLACKHOLE_sizeW ( void )
-{ return sizeofW(StgHeader)+MIN_PAYLOAD_SIZE; }
+{ return sizeofW(StgInd); } // a BLACKHOLE is a kind of indirection
/* --------------------------------------------------------------------------
Sizes of closures
View
2  includes/rts/storage/ClosureTypes.h
@@ -62,7 +62,7 @@
#define UPDATE_FRAME 38
#define CATCH_FRAME 39
#define STOP_FRAME 40
-#define CAF_BLACKHOLE 41
+#define BLOCKING_QUEUE 41
#define BLACKHOLE 42
#define MVAR_CLEAN 43
#define MVAR_DIRTY 44
View
17 includes/rts/storage/Closures.h
@@ -127,6 +127,14 @@ typedef struct {
StgInfoTable *saved_info;
} StgIndStatic;
+typedef struct StgBlockingQueue_ {
+ StgHeader header;
+ struct StgBlockingQueue_ *link; // here so it looks like an IND
+ StgClosure *bh; // the BLACKHOLE
+ StgTSO *owner;
+ struct MessageBlackHole_ *queue;
+} StgBlockingQueue;
+
typedef struct {
StgHeader header;
StgWord words;
@@ -433,10 +441,17 @@ typedef struct MessageWakeup_ {
typedef struct MessageThrowTo_ {
StgHeader header;
- Message *link;
+ struct MessageThrowTo_ *link;
StgTSO *source;
StgTSO *target;
StgClosure *exception;
} MessageThrowTo;
+typedef struct MessageBlackHole_ {
+ StgHeader header;
+ struct MessageBlackHole_ *link;
+ StgTSO *tso;
+ StgClosure *bh;
+} MessageBlackHole;
+
#endif /* RTS_STORAGE_CLOSURES_H */
View
26 includes/rts/storage/TSO.h
@@ -46,6 +46,7 @@ typedef struct {
/* Reason for thread being blocked. See comment above struct StgTso_. */
typedef union {
StgClosure *closure;
+ struct MessageBlackHole_ *bh;
struct MessageThrowTo_ *throwto;
struct MessageWakeup_ *wakeup;
StgInt fd; /* StgInt instead of int, so that it's the same size as the ptrs */
@@ -78,12 +79,17 @@ typedef struct StgTSO_ {
*/
struct StgTSO_* _link;
/*
+ Currently used for linking TSOs on:
+ * cap->run_queue_{hd,tl}
+ * MVAR queue
+ * (non-THREADED_RTS); the blocked_queue
+ * and pointing to the relocated version of a ThreadRelocated
+
NOTE!!! do not modify _link directly, it is subject to
a write barrier for generational GC. Instead use the
setTSOLink() function. Exceptions to this rule are:
* setting the link field to END_TSO_QUEUE
- * putting a TSO on the blackhole_queue
* setting the link field of the currently running TSO, as it
will already be dirty.
*/
@@ -127,6 +133,12 @@ typedef struct StgTSO_ {
*/
struct MessageThrowTo_ * blocked_exceptions;
+ /*
+ A list of StgBlockingQueue objects, representing threads blocked
+ on thunks that are under evaluation by this thread.
+ */
+ struct StgBlockingQueue_ *bq;
+
#ifdef TICKY_TICKY
/* TICKY-specific stuff would go here. */
#endif
@@ -152,6 +164,18 @@ typedef struct StgTSO_ {
void dirty_TSO (Capability *cap, StgTSO *tso);
void setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target);
+// Apply to a TSO before looking at it if you are not sure whether it
+// might be ThreadRelocated or not (basically, that's most of the time
+// unless the TSO is the current TSO).
+//
+INLINE_HEADER StgTSO * deRefTSO(StgTSO *tso)
+{
+ while (tso->what_next == ThreadRelocated) {
+ tso = tso->_link;
+ }
+ return tso;
+}
+
/* -----------------------------------------------------------------------------
Invariants:
View
20 includes/stg/MiscClosures.h
@@ -44,6 +44,7 @@
/* Stack frames */
RTS_RET_INFO(stg_upd_frame_info);
+RTS_RET_INFO(stg_bh_upd_frame_info);
RTS_RET_INFO(stg_marked_upd_frame_info);
RTS_RET_INFO(stg_noupd_frame_info);
RTS_RET_INFO(stg_catch_frame_info);
@@ -54,6 +55,7 @@ RTS_RET_INFO(stg_catch_stm_frame_info);
RTS_RET_INFO(stg_unblockAsyncExceptionszh_ret_info);
RTS_ENTRY(stg_upd_frame_ret);
+RTS_ENTRY(stg_bh_upd_frame_ret);
RTS_ENTRY(stg_marked_upd_frame_ret);
// RTS_FUN(stg_interp_constr_entry);
@@ -90,12 +92,12 @@ RTS_INFO(stg_IND_STATIC_info);
RTS_INFO(stg_IND_PERM_info);
RTS_INFO(stg_IND_OLDGEN_info);
RTS_INFO(stg_IND_OLDGEN_PERM_info);
-RTS_INFO(stg_CAF_UNENTERED_info);
-RTS_INFO(stg_CAF_ENTERED_info);
-RTS_INFO(stg_WHITEHOLE_info);
RTS_INFO(stg_BLACKHOLE_info);
-RTS_INFO(__stg_EAGER_BLACKHOLE_info);
RTS_INFO(stg_CAF_BLACKHOLE_info);
+RTS_INFO(__stg_EAGER_BLACKHOLE_info);
+RTS_INFO(stg_WHITEHOLE_info);
+RTS_INFO(stg_BLOCKING_QUEUE_CLEAN_info);
+RTS_INFO(stg_BLOCKING_QUEUE_DIRTY_info);
RTS_FUN_INFO(stg_BCO_info);
RTS_INFO(stg_EVACUATED_info);
@@ -115,7 +117,9 @@ RTS_INFO(stg_MUT_VAR_CLEAN_info);
RTS_INFO(stg_MUT_VAR_DIRTY_info);
RTS_INFO(stg_END_TSO_QUEUE_info);
RTS_INFO(stg_MSG_WAKEUP_info);
+RTS_INFO(stg_MSG_TRY_WAKEUP_info);
RTS_INFO(stg_MSG_THROWTO_info);
+RTS_INFO(stg_MSG_BLACKHOLE_info);
RTS_INFO(stg_MUT_CONS_info);
RTS_INFO(stg_catch_info);
RTS_INFO(stg_PAP_info);
@@ -142,12 +146,10 @@ RTS_ENTRY(stg_IND_STATIC_entry);
RTS_ENTRY(stg_IND_PERM_entry);
RTS_ENTRY(stg_IND_OLDGEN_entry);
RTS_ENTRY(stg_IND_OLDGEN_PERM_entry);
-RTS_ENTRY(stg_CAF_UNENTERED_entry);
-RTS_ENTRY(stg_CAF_ENTERED_entry);
RTS_ENTRY(stg_WHITEHOLE_entry);
RTS_ENTRY(stg_BLACKHOLE_entry);
-RTS_ENTRY(__stg_EAGER_BLACKHOLE_entry);
RTS_ENTRY(stg_CAF_BLACKHOLE_entry);
+RTS_ENTRY(__stg_EAGER_BLACKHOLE_entry);
RTS_ENTRY(stg_BCO_entry);
RTS_ENTRY(stg_EVACUATED_entry);
RTS_ENTRY(stg_WEAK_entry);
@@ -166,7 +168,9 @@ RTS_ENTRY(stg_MUT_VAR_CLEAN_entry);
RTS_ENTRY(stg_MUT_VAR_DIRTY_entry);
RTS_ENTRY(stg_END_TSO_QUEUE_entry);
RTS_ENTRY(stg_MSG_WAKEUP_entry);
+RTS_ENTRY(stg_MSG_TRY_WAKEUP_entry);
RTS_ENTRY(stg_MSG_THROWTO_entry);
+RTS_ENTRY(stg_MSG_BLACKHOLE_entry);
RTS_ENTRY(stg_MUT_CONS_entry);
RTS_ENTRY(stg_catch_entry);
RTS_ENTRY(stg_PAP_entry);
@@ -404,6 +408,8 @@ RTS_FUN(stg_PAP_apply);
RTS_RET_INFO(stg_enter_info);
RTS_ENTRY(stg_enter_ret);
+RTS_RET_INFO(stg_enter_checkbh_info);
+RTS_ENTRY(stg_enter_checkbh_ret);
RTS_RET_INFO(stg_gc_void_info);
RTS_ENTRY(stg_gc_void_ret);
View
63 rts/Capability.c
@@ -62,9 +62,8 @@ Capability * rts_unsafeGetMyCapability (void)
STATIC_INLINE rtsBool
globalWorkToDo (void)
{
- return blackholes_need_checking
- || sched_state >= SCHED_INTERRUPTING
- ;
+ return sched_state >= SCHED_INTERRUPTING
+ || recent_activity == ACTIVITY_INACTIVE; // need to check for deadlock
}
#endif
@@ -637,43 +636,6 @@ yieldCapability (Capability** pCap, Task *task)
}
/* ----------------------------------------------------------------------------
- * Wake up a thread on a Capability.
- *
- * This is used when the current Task is running on a Capability and
- * wishes to wake up a thread on a different Capability.
- * ------------------------------------------------------------------------- */
-
-void
-wakeupThreadOnCapability (Capability *cap,
- Capability *other_cap,
- StgTSO *tso)
-{
- MessageWakeup *msg;
-
- // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
- if (tso->bound) {
- ASSERT(tso->bound->task->cap == tso->cap);
- tso->bound->task->cap = other_cap;
- }
- tso->cap = other_cap;
-
- ASSERT(tso->why_blocked != BlockedOnMsgWakeup ||
- tso->block_info.closure->header.info == &stg_IND_info);
-
- ASSERT(tso->block_info.closure->header.info != &stg_MSG_WAKEUP_info);
-
- msg = (MessageWakeup*) allocate(cap, sizeofW(MessageWakeup));
- msg->header.info = &stg_MSG_WAKEUP_info;
- msg->tso = tso;
- tso->block_info.closure = (StgClosure *)msg;
- dirty_TSO(cap, tso);
- write_barrier();
- tso->why_blocked = BlockedOnMsgWakeup;
-
- sendMessage(other_cap, (Message*)msg);
-}
-
-/* ----------------------------------------------------------------------------
* prodCapability
*
* If a Capability is currently idle, wake up a Task on it. Used to
@@ -906,24 +868,3 @@ markCapabilities (evac_fn evac, void *user)
Messages
-------------------------------------------------------------------------- */
-#ifdef THREADED_RTS
-
-void sendMessage(Capability *cap, Message *msg)
-{
- ACQUIRE_LOCK(&cap->lock);
-
- msg->link = cap->inbox;
- cap->inbox = msg;
-
- if (cap->running_task == NULL) {
- cap->running_task = myTask();
- // precond for releaseCapability_()
- releaseCapability_(cap,rtsFalse);
- } else {
- contextSwitchCapability(cap);
- }
-
- RELEASE_LOCK(&cap->lock);
-}
-
-#endif // THREADED_RTS
View
19 rts/Capability.h
@@ -201,6 +201,8 @@ void waitForReturnCapability (Capability **cap/*in/out*/, Task *task);
INLINE_HEADER void recordMutableCap (StgClosure *p, Capability *cap, nat gen);
+INLINE_HEADER void recordClosureMutated (Capability *cap, StgClosure *p);
+
#if defined(THREADED_RTS)
// Gives up the current capability IFF there is a higher-priority
@@ -222,12 +224,6 @@ void yieldCapability (Capability** pCap, Task *task);
//
void waitForCapability (Task *task, Mutex *mutex, Capability **pCap);
-// Wakes up a thread on a Capability (probably a different Capability
-// from the one held by the current Task).
-//
-void wakeupThreadOnCapability (Capability *my_cap, Capability *other_cap,
- StgTSO *tso);
-
// Wakes up a worker thread on just one Capability, used when we
// need to service some global event.
//
@@ -289,8 +285,6 @@ void traverseSparkQueues (evac_fn evac, void *user);
INLINE_HEADER rtsBool emptyInbox(Capability *cap);;
-void sendMessage (Capability *cap, Message *msg);
-
#endif // THREADED_RTS
/* -----------------------------------------------------------------------------
@@ -316,6 +310,15 @@ recordMutableCap (StgClosure *p, Capability *cap, nat gen)
*bd->free++ = (StgWord)p;
}
+INLINE_HEADER void
+recordClosureMutated (Capability *cap, StgClosure *p)
+{
+ bdescr *bd;
+ bd = Bdescr((StgPtr)p);
+ if (bd->gen_no != 0) recordMutableCap(p,cap,bd->gen_no);
+}
+
+
#if defined(THREADED_RTS)
INLINE_HEADER rtsBool
emptySparkPoolCap (Capability *cap)
View
2  rts/ClosureFlags.c
@@ -62,8 +62,8 @@ StgWord16 closure_flags[] = {
[UPDATE_FRAME] = ( _BTM ),
[CATCH_FRAME] = ( _BTM ),
[STOP_FRAME] = ( _BTM ),
- [CAF_BLACKHOLE] = ( _BTM|_NS| _UPT ),
[BLACKHOLE] = ( _NS| _UPT ),
+ [BLOCKING_QUEUE] = ( _NS| _MUT|_UPT ),
[MVAR_CLEAN] = (_HNF| _NS| _MUT|_UPT ),
[MVAR_DIRTY] = (_HNF| _NS| _MUT|_UPT ),
[ARR_WORDS] = (_HNF| _NS| _UPT ),
View
2  rts/FrontPanel.c
@@ -662,8 +662,6 @@ residencyCensus( void )
type = Thunk;
break;
- case CAF_BLACKHOLE:
- case EAGER_BLACKHOLE:
case BLACKHOLE:
/* case BLACKHOLE_BQ: FIXME: case does not exist */
size = sizeW_fromITBL(info);
View
36 rts/HeapStackCheck.cmm
@@ -159,6 +159,24 @@ __stg_gc_enter_1
}
/* -----------------------------------------------------------------------------
+ stg_enter_checkbh is just like stg_enter, except that we also call
+ checkBlockingQueues(). The point of this is that the GC can
+ replace an stg_marked_upd_frame with an stg_enter_checkbh if it
+ finds that the BLACKHOLE has already been updated by another
+ thread. It would be unsafe to use stg_enter, because there might
+ be an orphaned BLOCKING_QUEUE now.
+ -------------------------------------------------------------------------- */
+
+INFO_TABLE_RET( stg_enter_checkbh, RET_SMALL, P_ unused)
+{
+ R1 = Sp(1);
+ Sp_adj(2);
+ foreign "C" checkBlockingQueues(MyCapability() "ptr",
+ CurrentTSO) [R1];
+ ENTER();
+}
+
+/* -----------------------------------------------------------------------------
Heap checks in Primitive case alternatives
A primitive case alternative is entered with a value either in
@@ -593,11 +611,7 @@ INFO_TABLE_RET( stg_block_putmvar, RET_SMALL, P_ unused1, P_ unused2 )
// code fragment executed just before we return to the scheduler
stg_block_putmvar_finally
{
-#ifdef THREADED_RTS
unlockClosure(R3, stg_MVAR_DIRTY_info);
-#else
- SET_INFO(R3, stg_MVAR_DIRTY_info);
-#endif
jump StgReturn;
}
@@ -611,24 +625,12 @@ stg_block_putmvar
BLOCK_BUT_FIRST(stg_block_putmvar_finally);
}
-// code fragment executed just before we return to the scheduler
-stg_block_blackhole_finally
-{
-#if defined(THREADED_RTS)
- // The last thing we do is release sched_lock, which is
- // preventing other threads from accessing blackhole_queue and
- // picking up this thread before we are finished with it.
- RELEASE_LOCK(sched_mutex "ptr");
-#endif
- jump StgReturn;
-}
-
stg_block_blackhole
{
Sp_adj(-2);
Sp(1) = R1;
Sp(0) = stg_enter_info;
- BLOCK_BUT_FIRST(stg_block_blackhole_finally);
+ BLOCK_GENERIC;
}
INFO_TABLE_RET( stg_block_throwto, RET_SMALL, P_ unused, P_ unused )
View
4 rts/Interpreter.c
@@ -21,6 +21,7 @@
#include "Disassembler.h"
#include "Interpreter.h"
#include "ThreadPaused.h"
+#include "Threads.h"
#include <string.h> /* for memcpy */
#ifdef HAVE_ERRNO_H
@@ -443,7 +444,8 @@ interpretBCO (Capability* cap)
// to a PAP by the GC, violating the invariant that PAPs
// always contain a tagged pointer to the function.
INTERP_TICK(it_retto_UPDATE);
- UPD_IND(cap, ((StgUpdateFrame *)Sp)->updatee, tagged_obj);
+ updateThunk(cap, cap->r.rCurrentTSO,
+ ((StgUpdateFrame *)Sp)->updatee, tagged_obj);
Sp += sizeofW(StgUpdateFrame);
goto do_return;
View
2  rts/LdvProfile.c
@@ -140,7 +140,7 @@ processHeapClosureForDead( StgClosure *c )
case FUN_1_1:
case FUN_0_2:
case BLACKHOLE:
- case CAF_BLACKHOLE:
+ case BLOCKING_QUEUE:
case IND_PERM:
case IND_OLDGEN_PERM:
/*
View
4 rts/Linker.c
@@ -877,7 +877,10 @@ typedef struct _RtsSymbolVal {
SymI_HasProto(stable_ptr_table) \
SymI_HasProto(stackOverflow) \
SymI_HasProto(stg_CAF_BLACKHOLE_info) \
+ SymI_HasProto(stg_BLACKHOLE_info) \
SymI_HasProto(__stg_EAGER_BLACKHOLE_info) \
+ SymI_HasProto(stg_BLOCKING_QUEUE_CLEAN_info) \
+ SymI_HasProto(stg_BLOCKING_QUEUE_DIRTY_info) \
SymI_HasProto(startTimer) \
SymI_HasProto(stg_MVAR_CLEAN_info) \
SymI_HasProto(stg_MVAR_DIRTY_info) \
@@ -941,6 +944,7 @@ typedef struct _RtsSymbolVal {
SymI_HasProto(stg_sel_8_upd_info) \
SymI_HasProto(stg_sel_9_upd_info) \
SymI_HasProto(stg_upd_frame_info) \
+ SymI_HasProto(stg_bh_upd_frame_info) \
SymI_HasProto(suspendThread) \
SymI_HasProto(stg_takeMVarzh) \
SymI_HasProto(stg_threadStatuszh) \
View
296 rts/Messages.c
@@ -0,0 +1,296 @@
+/* ---------------------------------------------------------------------------
+ *
+ * (c) The GHC Team, 2010
+ *
+ * Inter-Capability message passing
+ *
+ * --------------------------------------------------------------------------*/
+
+#include "Rts.h"
+#include "Messages.h"
+#include "Trace.h"
+#include "Capability.h"
+#include "Schedule.h"
+#include "Threads.h"
+#include "RaiseAsync.h"
+#include "sm/Storage.h"
+
+/* ----------------------------------------------------------------------------
+ Send a message to another Capability
+ ------------------------------------------------------------------------- */
+
+#ifdef THREADED_RTS
+
+void sendMessage(Capability *from_cap, Capability *to_cap, Message *msg)
+{
+ ACQUIRE_LOCK(&to_cap->lock);
+
+#ifdef DEBUG
+ {
+ const StgInfoTable *i = msg->header.info;
+ if (i != &stg_MSG_WAKEUP_info &&
+ i != &stg_MSG_THROWTO_info &&
+ i != &stg_MSG_BLACKHOLE_info &&
+ i != &stg_MSG_TRY_WAKEUP_info &&
+ i != &stg_IND_info && // can happen if a MSG_BLACKHOLE is revoked
+ i != &stg_WHITEHOLE_info) {
+ barf("sendMessage: %p", i);
+ }
+ }
+#endif
+
+ msg->link = to_cap->inbox;
+ to_cap->inbox = msg;
+
+ recordClosureMutated(from_cap,(StgClosure*)msg);
+
+ if (to_cap->running_task == NULL) {
+ to_cap->running_task = myTask();
+ // precond for releaseCapability_()
+ releaseCapability_(to_cap,rtsFalse);
+ } else {
+ contextSwitchCapability(to_cap);
+ }
+
+ RELEASE_LOCK(&to_cap->lock);
+}
+
+#endif /* THREADED_RTS */
+
+/* ----------------------------------------------------------------------------
+ Handle a message
+ ------------------------------------------------------------------------- */
+
+#ifdef THREADED_RTS
+
+void
+executeMessage (Capability *cap, Message *m)
+{
+ const StgInfoTable *i;
+
+loop:
+ write_barrier(); // allow m->header to be modified by another thread
+ i = m->header.info;
+ if (i == &stg_MSG_WAKEUP_info)
+ {
+ // the plan is to eventually get rid of these and use
+ // TRY_WAKEUP instead.
+ MessageWakeup *w = (MessageWakeup *)m;
+ StgTSO *tso = w->tso;
+ debugTraceCap(DEBUG_sched, cap, "message: wakeup thread %ld",
+ (lnat)tso->id);
+ ASSERT(tso->cap == cap);
+ ASSERT(tso->why_blocked == BlockedOnMsgWakeup);
+ ASSERT(tso->block_info.closure == (StgClosure *)m);
+ tso->why_blocked = NotBlocked;
+ appendToRunQueue(cap, tso);
+ }
+ else if (i == &stg_MSG_TRY_WAKEUP_info)
+ {
+ StgTSO *tso = ((MessageWakeup *)m)->tso;
+ debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld",
+ (lnat)tso->id);
+ tryWakeupThread(cap, tso);
+ }
+ else if (i == &stg_MSG_THROWTO_info)
+ {
+ MessageThrowTo *t = (MessageThrowTo *)m;
+ nat r;
+ const StgInfoTable *i;
+
+ i = lockClosure((StgClosure*)m);
+ if (i != &stg_MSG_THROWTO_info) {
+ unlockClosure((StgClosure*)m, i);
+ goto loop;
+ }
+
+ debugTraceCap(DEBUG_sched, cap, "message: throwTo %ld -> %ld",
+ (lnat)t->source->id, (lnat)t->target->id);
+
+ ASSERT(t->source->why_blocked == BlockedOnMsgThrowTo);
+ ASSERT(t->source->block_info.closure == (StgClosure *)m);
+
+ r = throwToMsg(cap, t);
+
+ switch (r) {
+ case THROWTO_SUCCESS:
+ ASSERT(t->source->sp[0] == (StgWord)&stg_block_throwto_info);
+ t->source->sp += 3;
+ unblockOne(cap, t->source);
+ // this message is done
+ unlockClosure((StgClosure*)m, &stg_IND_info);
+ break;
+ case THROWTO_BLOCKED:
+ // unlock the message
+ unlockClosure((StgClosure*)m, &stg_MSG_THROWTO_info);
+ break;
+ }
+ }
+ else if (i == &stg_MSG_BLACKHOLE_info)
+ {
+ nat r;
+ MessageBlackHole *b = (MessageBlackHole*)m;
+
+ r = messageBlackHole(cap, b);
+ if (r == 0) {
+ tryWakeupThread(cap, b->tso);
+ }
+ return;
+ }
+ else if (i == &stg_IND_info)
+ {
+ // message was revoked
+ return;
+ }
+ else if (i == &stg_WHITEHOLE_info)
+ {
+ goto loop;
+ }
+ else
+ {
+ barf("executeMessage: %p", i);
+ }
+}
+
+#endif
+
+/* ----------------------------------------------------------------------------
+ Handle a MSG_BLACKHOLE message
+
+ This is called from two places: either we just entered a BLACKHOLE
+ (stg_BLACKHOLE_info), or we received a MSG_BLACKHOLE in our
+ cap->inbox.
+
+ We need to establish whether the BLACKHOLE belongs to
+ this Capability, and
+ - if so, arrange to block the current thread on it
+ - otherwise, forward the message to the right place
+
+ Returns:
+ - 0 if the blocked thread can be woken up by the caller
+ - 1 if the thread is still blocked, and we promise to send a MSG_TRY_WAKEUP
+ at some point in the future.
+
+ ------------------------------------------------------------------------- */
+
+nat messageBlackHole(Capability *cap, MessageBlackHole *msg)
+{
+ const StgInfoTable *info;
+ StgClosure *p;
+ StgBlockingQueue *bq;
+ StgClosure *bh = msg->bh;
+ StgTSO *owner;
+
+ debugTraceCap(DEBUG_sched, cap, "message: thread %d blocking on blackhole %p",
+ (lnat)msg->tso->id, msg->bh);
+
+ info = bh->header.info;
+
+ // If we got this message in our inbox, it might be that the
+ // BLACKHOLE has already been updated, and GC has shorted out the
+ // indirection, so the pointer no longer points to a BLACKHOLE at
+ // all.
+ if (info != &stg_BLACKHOLE_info &&
+ info != &stg_CAF_BLACKHOLE_info &&
+ info != &stg_WHITEHOLE_info) {
+ // if it is a WHITEHOLE, then a thread is in the process of
+ // trying to BLACKHOLE it. But we know that it was once a
+ // BLACKHOLE, so there is at least a valid pointer in the
+ // payload, so we can carry on.
+ return 0;
+ }
+
+ // we know at this point that the closure
+loop:
+ p = ((StgInd*)bh)->indirectee;
+ info = p->header.info;
+
+ if (info == &stg_IND_info)
+ {
+ // This could happen, if e.g. we got a BLOCKING_QUEUE that has
+ // just been replaced with an IND by another thread in
+ // updateThunk(). In which case, if we read the indirectee
+ // again we should get the value.
+ goto loop;
+ }
+
+ else if (info == &stg_TSO_info)
+ {
+ owner = deRefTSO((StgTSO *)p);
+
+#ifdef THREADED_RTS
+ if (owner->cap != cap) {
+ sendMessage(cap, owner->cap, (Message*)msg);
+ debugTraceCap(DEBUG_sched, cap, "forwarding message to cap %d", owner->cap->no);
+ return 1;
+ }
+#endif
+ // owner is the owner of the BLACKHOLE, and resides on this
+ // Capability. msg->tso is the first thread to block on this
+ // BLACKHOLE, so we first create a BLOCKING_QUEUE object.
+
+ bq = (StgBlockingQueue*)allocate(cap, sizeofW(StgBlockingQueue));
+
+ // initialise the BLOCKING_QUEUE object
+ SET_HDR(bq, &stg_BLOCKING_QUEUE_DIRTY_info, CCS_SYSTEM);
+ bq->bh = bh;
+ bq->queue = msg;
+ bq->owner = owner;
+
+ msg->link = (MessageBlackHole*)END_TSO_QUEUE;
+
+ // All BLOCKING_QUEUES are linked in a list on owner->bq, so
+ // that we can search through them in the event that there is
+ // a collision to update a BLACKHOLE and a BLOCKING_QUEUE
+ // becomes orphaned (see updateThunk()).
+ bq->link = owner->bq;
+ owner->bq = bq;
+ dirty_TSO(cap, owner); // we modified owner->bq
+
+ // point to the BLOCKING_QUEUE from the BLACKHOLE
+ write_barrier(); // make the BQ visible
+ ((StgInd*)bh)->indirectee = (StgClosure *)bq;
+ recordClosureMutated(cap,bh); // bh was mutated
+
+ debugTraceCap(DEBUG_sched, cap, "thread %d blocked on thread %d",
+ (lnat)msg->tso->id, (lnat)owner->id);
+
+ return 1; // blocked
+ }
+ else if (info == &stg_BLOCKING_QUEUE_CLEAN_info ||
+ info == &stg_BLOCKING_QUEUE_DIRTY_info)
+ {
+ StgBlockingQueue *bq = (StgBlockingQueue *)p;
+
+ ASSERT(bq->bh == bh);
+
+ owner = deRefTSO(bq->owner);
+
+ ASSERT(owner != END_TSO_QUEUE);
+
+#ifdef THREADED_RTS
+ if (owner->cap != cap) {
+ sendMessage(cap, owner->cap, (Message*)msg);
+ debugTraceCap(DEBUG_sched, cap, "forwarding message to cap %d", owner->cap->no);
+ return 1;
+ }
+#endif
+
+ msg->link = bq->queue;
+ bq->queue = msg;
+ recordClosureMutated(cap,(StgClosure*)msg);
+
+ if (info == &stg_BLOCKING_QUEUE_CLEAN_info) {
+ bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info;
+ recordClosureMutated(cap,bq);
+ }
+
+ debugTraceCap(DEBUG_sched, cap, "thread %d blocked on thread %d",
+ (lnat)msg->tso->id, (lnat)owner->id);
+
+ return 1; // blocked
+ }
+
+ return 0; // not blocked
+}
+
View
18 rts/Messages.h
@@ -0,0 +1,18 @@
+/* ---------------------------------------------------------------------------
+ *
+ * (c) The GHC Team, 2010
+ *
+ * Inter-Capability message passing
+ *
+ * --------------------------------------------------------------------------*/
+
+BEGIN_RTS_PRIVATE
+
+nat messageBlackHole(Capability *cap, MessageBlackHole *msg);
+
+#ifdef THREADED_RTS
+void executeMessage (Capability *cap, Message *m);
+void sendMessage (Capability *from_cap, Capability *to_cap, Message *msg);
+#endif
+
+END_RTS_PRIVATE
View
32 rts/PrimOps.cmm
@@ -1204,11 +1204,7 @@ stg_takeMVarzh
StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
}
-#if defined(THREADED_RTS)
unlockClosure(mvar, stg_MVAR_DIRTY_info);
-#else
- SET_INFO(mvar,stg_MVAR_DIRTY_info);
-#endif
RET_P(val);
}
else
@@ -1216,11 +1212,7 @@ stg_takeMVarzh
/* No further putMVars, MVar is now empty */
StgMVar_value(mvar) = stg_END_TSO_QUEUE_closure;
-#if defined(THREADED_RTS)
unlockClosure(mvar, stg_MVAR_DIRTY_info);
-#else
- SET_INFO(mvar,stg_MVAR_DIRTY_info);
-#endif
RET_P(val);
}
@@ -1279,21 +1271,13 @@ stg_tryTakeMVarzh
if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
}
-#if defined(THREADED_RTS)
unlockClosure(mvar, stg_MVAR_DIRTY_info);
-#else
- SET_INFO(mvar,stg_MVAR_DIRTY_info);
-#endif
}
else
{
/* No further putMVars, MVar is now empty */
StgMVar_value(mvar) = stg_END_TSO_QUEUE_closure;
-#if defined(THREADED_RTS)
unlockClosure(mvar, stg_MVAR_DIRTY_info);
-#else
- SET_INFO(mvar,stg_MVAR_DIRTY_info);
-#endif
}
RET_NP(1, val);
@@ -1360,11 +1344,7 @@ stg_putMVarzh
StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
}
-#if defined(THREADED_RTS)
unlockClosure(mvar, stg_MVAR_DIRTY_info);
-#else
- SET_INFO(mvar,stg_MVAR_DIRTY_info);
-#endif
jump %ENTRY_CODE(Sp(0));
}
else
@@ -1372,11 +1352,7 @@ stg_putMVarzh
/* No further takes, the MVar is now full. */
StgMVar_value(mvar) = val;
-#if defined(THREADED_RTS)
unlockClosure(mvar, stg_MVAR_DIRTY_info);
-#else
- SET_INFO(mvar,stg_MVAR_DIRTY_info);
-#endif
jump %ENTRY_CODE(Sp(0));
}
@@ -1429,22 +1405,14 @@ stg_tryPutMVarzh
StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
}
-#if defined(THREADED_RTS)
unlockClosure(mvar, stg_MVAR_DIRTY_info);
-#else
- SET_INFO(mvar,stg_MVAR_DIRTY_info);
-#endif
}
else
{
/* No further takes, the MVar is now full. */
StgMVar_value(mvar) = R2;
-#if defined(THREADED_RTS)
unlockClosure(mvar, stg_MVAR_DIRTY_info);
-#else
- SET_INFO(mvar,stg_MVAR_DIRTY_info);
-#endif
}
RET_N(1);
View
16 rts/Printer.c
@@ -257,6 +257,12 @@ printClosure( StgClosure *obj )
debugBelch(")\n");
break;
+ case BLACKHOLE:
+ debugBelch("BLACKHOLE(");
+ printPtr((StgPtr)((StgInd*)obj)->indirectee);
+ debugBelch(")\n");
+ break;
+
/* Cannot happen -- use default case.
case RET_BCO:
case RET_SMALL:
@@ -296,14 +302,6 @@ printClosure( StgClosure *obj )
break;
}
- case CAF_BLACKHOLE:
- debugBelch("CAF_BH");
- break;
-
- case BLACKHOLE:
- debugBelch("BH\n");
- break;
-
case ARR_WORDS:
{
StgWord i;
@@ -1122,8 +1120,8 @@ char *closure_type_names[] = {
[UPDATE_FRAME] = "UPDATE_FRAME",
[CATCH_FRAME] = "CATCH_FRAME",
[STOP_FRAME] = "STOP_FRAME",
- [CAF_BLACKHOLE] = "CAF_BLACKHOLE",
[BLACKHOLE] = "BLACKHOLE",
+ [BLOCKING_QUEUE] = "BLOCKING_QUEUE",
[MVAR_CLEAN] = "MVAR_CLEAN",
[MVAR_DIRTY] = "MVAR_DIRTY",
[ARR_WORDS] = "ARR_WORDS",
View
2  rts/ProfHeap.c
@@ -878,8 +878,8 @@ heapCensusChain( Census *census, bdescr *bd )
case IND_PERM:
case IND_OLDGEN:
case IND_OLDGEN_PERM:
- case CAF_BLACKHOLE:
case BLACKHOLE:
+ case BLOCKING_QUEUE:
case FUN_1_0:
case FUN_0_1:
case FUN_1_1:
View
175 rts/RaiseAsync.c
@@ -18,6 +18,7 @@
#include "STM.h"
#include "sm/Sanity.h"
#include "Profiling.h"
+#include "Messages.h"
#if defined(mingw32_HOST_OS)
#include "win32/IOManager.h"
#endif
@@ -66,13 +67,12 @@ void
throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception,
rtsBool stop_at_atomically)
{
+ tso = deRefTSO(tso);
+
// Thread already dead?
if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
return;
}
- while (tso->what_next == ThreadRelocated) {
- tso = tso->_link;
- }
// Remove it from any blocking queues
removeFromQueues(cap,tso);
@@ -83,13 +83,12 @@ throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception,
void
suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here)
{
+ tso = deRefTSO(tso);
+
// Thread already dead?
if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
return;
}
- while (tso->what_next == ThreadRelocated) {
- tso = tso->_link;
- }
// Remove it from any blocking queues
removeFromQueues(cap,tso);
@@ -164,7 +163,7 @@ throwTo (Capability *cap, // the Capability we hold
msg = (MessageThrowTo *) allocate(cap, sizeofW(MessageThrowTo));
// message starts locked; the caller has to unlock it when it is
// ready.
- msg->header.info = &stg_WHITEHOLE_info;
+ SET_HDR(msg, &stg_WHITEHOLE_info, CCS_SYSTEM);
msg->source = source;
msg->target = target;
msg->exception = exception;
@@ -185,14 +184,24 @@ throwToMsg (Capability *cap, MessageThrowTo *msg)
{
StgWord status;
StgTSO *target = msg->target;
+ Capability *target_cap;
+ goto check_target;
+
+retry:
+ write_barrier();
+ debugTrace(DEBUG_sched, "throwTo: retrying...");
+
+check_target:
ASSERT(target != END_TSO_QUEUE);
// follow ThreadRelocated links in the target first
- while (target->what_next == ThreadRelocated) {
- target = target->_link;
- // No, it might be a WHITEHOLE:
- // ASSERT(get_itbl(target)->type == TSO);
+ target = deRefTSO(target);
+
+ // Thread already dead?
+ if (target->what_next == ThreadComplete
+ || target->what_next == ThreadKilled) {
+ return THROWTO_SUCCESS;
}
debugTraceCap(DEBUG_sched, cap,
@@ -204,18 +213,10 @@ throwToMsg (Capability *cap, MessageThrowTo *msg)
traceThreadStatus(DEBUG_sched, target);
#endif
- goto check_target;
-retry:
- write_barrier();
- debugTrace(DEBUG_sched, "throwTo: retrying...");
-
-check_target:
- ASSERT(target != END_TSO_QUEUE);
-
- // Thread already dead?
- if (target->what_next == ThreadComplete
- || target->what_next == ThreadKilled) {
- return THROWTO_SUCCESS;
+ target_cap = target->cap;
+ if (target->cap != cap) {
+ throwToSendMsg(cap, target_cap, msg);
+ return THROWTO_BLOCKED;
}
status = target->why_blocked;
@@ -282,28 +283,19 @@ throwToMsg (Capability *cap, MessageThrowTo *msg)
have also seen the write to Q.
*/
{
- Capability *target_cap;
-
write_barrier();
- target_cap = target->cap;
- if (target_cap != cap) {
- throwToSendMsg(cap, target_cap, msg);
- return THROWTO_BLOCKED;
+ if ((target->flags & TSO_BLOCKEX) == 0) {
+ // It's on our run queue and not blocking exceptions
+ raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
+ return THROWTO_SUCCESS;
} else {
- if ((target->flags & TSO_BLOCKEX) == 0) {
- // It's on our run queue and not blocking exceptions
- raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
- return THROWTO_SUCCESS;
- } else {
- blockedThrowTo(cap,target,msg);
- return THROWTO_BLOCKED;
- }
+ blockedThrowTo(cap,target,msg);
+ return THROWTO_BLOCKED;
}
}
case BlockedOnMsgThrowTo:
{
- Capability *target_cap;
const StgInfoTable *i;
MessageThrowTo *m;
@@ -340,13 +332,6 @@ throwToMsg (Capability *cap, MessageThrowTo *msg)
goto retry;
}
- target_cap = target->cap;
- if (target_cap != cap) {
- unlockClosure((StgClosure*)m, i);
- throwToSendMsg(cap, target_cap, msg);
- return THROWTO_BLOCKED;
- }
-
if ((target->flags & TSO_BLOCKEX) &&
((target->flags & TSO_INTERRUPTIBLE) == 0)) {
unlockClosure((StgClosure*)m, i);
@@ -358,7 +343,6 @@ throwToMsg (Capability *cap, MessageThrowTo *msg)
unlockClosure((StgClosure*)m, &stg_IND_info);
raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
- unblockOne(cap, target);
return THROWTO_SUCCESS;
}
@@ -400,48 +384,30 @@ throwToMsg (Capability *cap, MessageThrowTo *msg)
if ((target->flags & TSO_BLOCKEX) &&
((target->flags & TSO_INTERRUPTIBLE) == 0)) {
- Capability *target_cap = target->cap;
- if (target->cap != cap) {
- throwToSendMsg(cap,target_cap,msg);
- } else {
- blockedThrowTo(cap,target,msg);
- }
+ blockedThrowTo(cap,target,msg);
unlockClosure((StgClosure *)mvar, info);
return THROWTO_BLOCKED;
} else {
removeThreadFromMVarQueue(cap, mvar, target);
raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
- unblockOne(cap, target);
- unlockClosure((StgClosure *)mvar, info);
+ if (info == &stg_MVAR_CLEAN_info) {
+ dirty_MVAR(&cap->r,(StgClosure*)mvar);
+ }
+ unlockClosure((StgClosure *)mvar, &stg_MVAR_DIRTY_info);
return THROWTO_SUCCESS;
}
}
case BlockedOnBlackHole:
{
- ACQUIRE_LOCK(&sched_mutex);
- // double checking the status after the memory barrier:
- if (target->why_blocked != BlockedOnBlackHole) {
- RELEASE_LOCK(&sched_mutex);
- goto retry;
- }
-
- if (target->flags & TSO_BLOCKEX) {
- Capability *target_cap = target->cap;
- if (target->cap != cap) {
- throwToSendMsg(cap,target_cap,msg);
- } else {
- blockedThrowTo(cap,target,msg);
- }
- RELEASE_LOCK(&sched_mutex);
- return THROWTO_BLOCKED; // caller releases lock
- } else {
- removeThreadFromQueue(cap, &blackhole_queue, target);
- raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
- unblockOne(cap, target);
- RELEASE_LOCK(&sched_mutex);
- return THROWTO_SUCCESS;
- }
+ // Revoke the message by replacing it with IND. We're not
+ // locking anything here, so we might still get a TRY_WAKEUP
+ // message from the owner of the blackhole some time in the
+ // future, but that doesn't matter.
+ ASSERT(target->block_info.bh->header.info == &stg_MSG_BLACKHOLE_info);
+ OVERWRITE_INFO(target->block_info.bh, &stg_IND_info);
+ raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
+ return THROWTO_SUCCESS;
}
case BlockedOnSTM:
@@ -454,35 +420,19 @@ throwToMsg (Capability *cap, MessageThrowTo *msg)
}
if ((target->flags & TSO_BLOCKEX) &&
((target->flags & TSO_INTERRUPTIBLE) == 0)) {
- Capability *target_cap = target->cap;
- if (target->cap != cap) {
- throwToSendMsg(cap,target_cap,msg);
- } else {
- blockedThrowTo(cap,target,msg);
- }
+ blockedThrowTo(cap,target,msg);
unlockTSO(target);
return THROWTO_BLOCKED;
} else {
raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
- unblockOne(cap, target);
unlockTSO(target);
return THROWTO_SUCCESS;
}
case BlockedOnCCall:
case BlockedOnCCall_NoUnblockExc:
- {
- Capability *target_cap;
-
- target_cap = target->cap;
- if (target_cap != cap) {
- throwToSendMsg(cap, target_cap, msg);
- return THROWTO_BLOCKED;
- }
-
blockedThrowTo(cap,target,msg);
return THROWTO_BLOCKED;
- }
#ifndef THREADEDED_RTS
case BlockedOnRead:
@@ -515,9 +465,9 @@ throwToSendMsg (Capability *cap STG_UNUSED,
{
#ifdef THREADED_RTS
- debugTrace(DEBUG_sched, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no);
+ debugTraceCap(DEBUG_sched, cap, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no);
- sendMessage(target_cap, (Message*)msg);
+ sendMessage(cap, target_cap, (Message*)msg);
#endif
}
@@ -532,7 +482,7 @@ blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg)
ASSERT(target->cap == cap);
- msg->link = (Message*)target->blocked_exceptions;
+ msg->link = target->blocked_exceptions;
target->blocked_exceptions = msg;
dirty_TSO(cap,target); // we modified the blocked_exceptions queue
}
@@ -571,7 +521,7 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso)
if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE &&
(tso->flags & TSO_BLOCKEX) != 0) {
- debugTrace(DEBUG_sched, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id);
+ debugTraceCap(DEBUG_sched, cap, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id);
}
if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE
@@ -664,16 +614,18 @@ removeFromQueues(Capability *cap, StgTSO *tso)
case BlockedOnMVar:
removeThreadFromMVarQueue(cap, (StgMVar *)tso->block_info.closure, tso);
+ // we aren't doing a write barrier here: the MVar is supposed to
+ // be already locked, so replacing the info pointer would unlock it.
goto done;
case BlockedOnBlackHole:
- removeThreadFromQueue(cap, &blackhole_queue, tso);
+ // nothing to do
goto done;
case BlockedOnMsgWakeup:
{
// kill the message, atomically:
- tso->block_info.wakeup->header.info = &stg_IND_info;
+ OVERWRITE_INFO(tso->block_info.wakeup, &stg_IND_info);
break;
}
@@ -725,7 +677,8 @@ removeFromQueues(Capability *cap, StgTSO *tso)
* asynchronous exception in an existing thread.
*
* We first remove the thread from any queue on which it might be
- * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
+ * blocked. The possible blockages are MVARs, BLOCKING_QUEUESs, and
+ * TSO blocked_exception queues.
*
* We strip the stack down to the innermost CATCH_FRAME, building
* thunks in the heap for all the active computations, so they can
@@ -764,8 +717,8 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
StgClosure *updatee;
nat i;
- debugTrace(DEBUG_sched,
- "raising exception in thread %ld.", (long)tso->id);
+ debugTraceCap(DEBUG_sched, cap,
+ "raising exception in thread %ld.", (long)tso->id);
#if defined(PROFILING)
/*
@@ -784,6 +737,15 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
tso->what_next != ThreadKilled &&
tso->what_next != ThreadRelocated);
+ // only if we own this TSO (except that deleteThread() calls this
+ ASSERT(tso->cap == cap);
+
+ // wake it up
+ if (tso->why_blocked != NotBlocked && tso->why_blocked != BlockedOnMsgWakeup) {
+ tso->why_blocked = NotBlocked;
+ appendToRunQueue(cap,tso);
+ }
+
// mark it dirty; we're about to change its stack.
dirty_TSO(cap, tso);
@@ -871,7 +833,8 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
// Perform the update
// TODO: this may waste some work, if the thunk has
// already been updated by another thread.
- UPD_IND(cap, ((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap);
+ updateThunk(cap, tso,
+ ((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap);
}
sp += sizeofW(StgUpdateFrame) - 1;
@@ -963,8 +926,8 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
{
StgTRecHeader *trec = tso -> trec;
StgTRecHeader *outer = trec -> enclosing_trec;
- debugTrace(DEBUG_stm,
- "found atomically block delivering async exception");
+ debugTraceCap(DEBUG_stm, cap,
+ "found atomically block delivering async exception");
stmAbortTransaction(cap, trec);
stmFreeAbortedTRec(cap, trec);
tso -> trec = outer;
View
9 rts/RetainerProfile.c
@@ -453,8 +453,6 @@ push( StgClosure *c, retainer c_child_r, StgClosure **first_child )
// no child, no SRT
case CONSTR_0_1:
case CONSTR_0_2:
- case CAF_BLACKHOLE:
- case BLACKHOLE:
case ARR_WORDS:
*first_child = NULL;
return;
@@ -470,6 +468,7 @@ push( StgClosure *c, retainer c_child_r, StgClosure **first_child )
case IND_PERM:
case IND_OLDGEN_PERM:
case IND_OLDGEN:
+ case BLACKHOLE:
*first_child = ((StgInd *)c)->indirectee;
return;
case CONSTR_1_0:
@@ -916,8 +915,6 @@ pop( StgClosure **c, StgClosure **cp, retainer *r )
// no child (fixed), no SRT
case CONSTR_0_1:
case CONSTR_0_2:
- case CAF_BLACKHOLE:
- case BLACKHOLE:
case ARR_WORDS:
// one child (fixed), no SRT
case MUT_VAR_CLEAN:
@@ -1059,13 +1056,11 @@ isRetainer( StgClosure *c )
case FUN_0_2:
// partial applications
case PAP:
- // blackholes
- case CAF_BLACKHOLE:
- case BLACKHOLE:
// indirection
case IND_PERM:
case IND_OLDGEN_PERM:
case IND_OLDGEN:
+ case BLACKHOLE:
// static objects
case CONSTR_STATIC:
case FUN_STATIC:
View
2  rts/STM.c
@@ -377,7 +377,7 @@ static void unpark_tso(Capability *cap, StgTSO *tso) {
lockTSO(tso);
if (tso -> why_blocked == BlockedOnSTM) {
TRACE("unpark_tso on tso=%p", tso);
- unblockOne(cap,tso);
+ tryWakeupThread(cap,tso);
} else {
TRACE("spurious unpark_tso on tso=%p", tso);
}
View
243 rts/Schedule.c
@@ -39,6 +39,7 @@
#include "Threads.h"
#include "Timer.h"
#include "ThreadPaused.h"
+#include "Messages.h"
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
@@ -66,17 +67,6 @@ StgTSO *blocked_queue_tl = NULL;
StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
#endif
-/* Threads blocked on blackholes.
- * LOCK: sched_mutex+capability, or all capabilities
- */
-StgTSO *blackhole_queue = NULL;
-
-/* The blackhole_queue should be checked for threads to wake up. See
- * Schedule.h for more thorough comment.
- * LOCK: none (doesn't matter if we miss an update)
- */
-rtsBool blackholes_need_checking = rtsFalse;
-
/* Set to true when the latest garbage collection failed to reclaim
* enough space, and the runtime should proceed to shut itself down in
* an orderly fashion (emitting profiling info etc.)
@@ -140,7 +130,6 @@ static void scheduleYield (Capability **pcap, Task *task, rtsBool);
static void scheduleStartSignalHandlers (Capability *cap);
static void scheduleCheckBlockedThreads (Capability *cap);
static void scheduleProcessInbox(Capability *cap);
-static void scheduleCheckBlackHoles (Capability *cap);
static void scheduleDetectDeadlock (Capability *cap, Task *task);
static void schedulePushWork(Capability *cap, Task *task);
#if defined(THREADED_RTS)
@@ -159,8 +148,6 @@ static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
static Capability *scheduleDoGC(Capability *cap, Task *task,
rtsBool force_major);
-static rtsBool checkBlackHoles(Capability *cap);
-
static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
static StgTSO *threadStackUnderflow(Capability *cap, Task *task, StgTSO *tso);
@@ -433,9 +420,6 @@ schedule (Capability *initialCapability, Task *task)
startHeapProfTimer();
- // Check for exceptions blocked on this thread
- maybePerformBlockedException (cap, t);
-
// ----------------------------------------------------------------------
// Run the current thread
@@ -506,13 +490,6 @@ schedule (Capability *initialCapability, Task *task)
// happened. So find the new location:
t = cap->r.rCurrentTSO;
- // We have run some Haskell code: there might be blackhole-blocked
- // threads to wake up now.
- // Lock-free test here should be ok, we're just setting a flag.
- if ( blackhole_queue != END_TSO_QUEUE ) {
- blackholes_need_checking = rtsTrue;
- }
-
// And save the current errno in this thread.
// XXX: possibly bogus for SMP because this thread might already
// be running again, see code below.
@@ -612,12 +589,6 @@ scheduleFindWork (Capability *cap)
{
scheduleStartSignalHandlers(cap);
- // Only check the black holes here if we've nothing else to do.
- // During normal execution, the black hole list only gets checked
- // at GC time, to avoid repeatedly traversing this possibly long
- // list each time around the scheduler.
- if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
-
scheduleProcessInbox(cap);
scheduleCheckBlockedThreads(cap);
@@ -674,7 +645,6 @@ scheduleYield (Capability **pcap, Task *task, rtsBool force_yield)
!shouldYieldCapability(cap,task) &&
(!emptyRunQueue(cap) ||
!emptyInbox(cap) ||
- blackholes_need_checking ||
sched_state >= SCHED_INTERRUPTING))
return;
@@ -863,125 +833,11 @@ scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
//
if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
{
- awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
+ awaitEvent (emptyRunQueue(cap));
}
#endif
}
-
-/* ----------------------------------------------------------------------------
- * Check for threads woken up by other Capabilities
- * ------------------------------------------------------------------------- */
-
-#if defined(THREADED_RTS)
-static void
-executeMessage (Capability *cap, Message *m)
-{
- const StgInfoTable *i;
-
-loop:
- write_barrier(); // allow m->header to be modified by another thread
- i = m->header.info;
- if (i == &stg_MSG_WAKEUP_info)
- {
- MessageWakeup *w = (MessageWakeup *)m;
- StgTSO *tso = w->tso;
- debugTraceCap(DEBUG_sched, cap, "message: wakeup thread %ld",
- (lnat)tso->id);
- ASSERT(tso->cap == cap);
- ASSERT(tso->why_blocked == BlockedOnMsgWakeup);
- ASSERT(tso->block_info.closure == (StgClosure *)m);
- tso->why_blocked = NotBlocked;
- appendToRunQueue(cap, tso);
- }
- else if (i == &stg_MSG_THROWTO_info)
- {
- MessageThrowTo *t = (MessageThrowTo *)m;
- nat r;
- const StgInfoTable *i;
-
- i = lockClosure((StgClosure*)m);
- if (i != &stg_MSG_THROWTO_info) {
- unlockClosure((StgClosure*)m, i);
- goto loop;
- }
-
- debugTraceCap(DEBUG_sched, cap, "message: throwTo %ld -> %ld",
- (lnat)t->source->id, (lnat)t->target->id);
-
- ASSERT(t->source->why_blocked == BlockedOnMsgThrowTo);
- ASSERT(t->source->block_info.closure == (StgClosure *)m);
-
- r = throwToMsg(cap, t);
-
- switch (r) {
- case THROWTO_SUCCESS:
- ASSERT(t->source->sp[0] == (StgWord)&stg_block_throwto_info);
- t->source->sp += 3;
- unblockOne(cap, t->source);
- // this message is done
- unlockClosure((StgClosure*)m, &stg_IND_info);
- break;
- case THROWTO_BLOCKED:
- // unlock the message
- unlockClosure((StgClosure*)m, &stg_MSG_THROWTO_info);
- break;
- }
- }
- else if (i == &stg_IND_info)
- {
- // message was revoked
- return;
- }
- else if (i == &stg_WHITEHOLE_info)
- {
- goto loop;
- }
- else
- {
- barf("executeMessage: %p", i);
- }
-}
-#endif
-
-static void
-scheduleProcessInbox (Capability *cap USED_IF_THREADS)
-{
-#if defined(THREADED_RTS)
- Message *m;
-
- while (!emptyInbox(cap)) {
- ACQUIRE_LOCK(&cap->lock);
- m = cap->inbox;
- cap->inbox = m->link;
- RELEASE_LOCK(&cap->lock);
- executeMessage(cap, (Message *)m);
- }
-#endif
-}
-
-/* ----------------------------------------------------------------------------
- * Check for threads blocked on BLACKHOLEs that can be woken up
- * ------------------------------------------------------------------------- */
-static void
-scheduleCheckBlackHoles (Capability *cap)
-{
- if ( blackholes_need_checking ) // check without the lock first
- {
- ACQUIRE_LOCK(&sched_mutex);
- if ( blackholes_need_checking ) {
- blackholes_need_checking = rtsFalse;
- // important that we reset the flag *before* checking the
- // blackhole queue, otherwise we could get deadlock. This
- // happens as follows: we wake up a thread that
- // immediately runs on another Capability, blocks on a
- // blackhole, and then we reset the blackholes_need_checking flag.
- checkBlackHoles(cap);
- }
- RELEASE_LOCK(&sched_mutex);
- }
-}
-
/* ----------------------------------------------------------------------------
* Detect deadlock conditions and attempt to resolve them.
* ------------------------------------------------------------------------- */
@@ -1090,6 +946,26 @@ scheduleSendPendingMessages(void)
#endif
/* ----------------------------------------------------------------------------
+ * Process message in the current Capability's inbox
+ * ------------------------------------------------------------------------- */
+
+static void
+scheduleProcessInbox (Capability *cap USED_IF_THREADS)
+{
+#if defined(THREADED_RTS)
+ Message *m;
+
+ while (!emptyInbox(cap)) {
+ ACQUIRE_LOCK(&cap->lock);
+ m = cap->inbox;
+ cap->inbox = m->link;
+ RELEASE_LOCK(&cap->lock);
+ executeMessage(cap, (Message *)m);
+ }
+#endif
+}
+
+/* ----------------------------------------------------------------------------
* Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
* ------------------------------------------------------------------------- */
@@ -1499,9 +1375,6 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
}
- // do this while the other Capabilities stop:
- if (cap) scheduleCheckBlackHoles(cap);
-
if (gc_type == PENDING_GC_SEQ)
{
// single-threaded GC: grab all the capabilities
@@ -1529,11 +1402,6 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
waitForGcThreads(cap);
}
-#else /* !THREADED_RTS */
-
- // do this while the other Capabilities stop:
- if (cap) scheduleCheckBlackHoles(cap);
-
#endif
IF_DEBUG(scheduler, printAllThreads());
@@ -2093,8 +1961,6 @@ initScheduler(void)
sleeping_queue = END_TSO_QUEUE;
#endif
- blackhole_queue = END_TSO_QUEUE;
-
sched_state = SCHED_RUNNING;
recent_activity = ACTIVITY_YES;
@@ -2347,8 +2213,9 @@ threadStackOverflow(Capability *cap, StgTSO *tso)
* of the stack, so we don't attempt to scavenge any part of the
* dead TSO's stack.
*/
- tso->what_next = ThreadRelocated;
setTSOLink(cap,tso,dest);
+ write_barrier(); // other threads seeing ThreadRelocated will look at _link
+ tso->what_next = ThreadRelocated;
tso->sp = (P_)&(tso->stack[tso->stack_size]);
tso->why_blocked = NotBlocked;
@@ -2405,8 +2272,9 @@ threadStackUnderflow (Capability *cap, Task *task, StgTSO *tso)
debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
(long)tso->id, tso_size_w, tso_sizeW(new_tso));
- tso->what_next = ThreadRelocated;
tso->_link = new_tso; // no write barrier reqd: same generation
+ write_barrier(); // other threads seeing ThreadRelocated will look at _link
+ tso->what_next = ThreadRelocated;
// The TSO attached to this Task may have moved, so update the
// pointer to it.
@@ -2458,57 +2326,6 @@ void wakeUpRts(void)
#endif
/* -----------------------------------------------------------------------------
- * checkBlackHoles()
- *
- * Check the blackhole_queue for threads that can be woken up. We do
- * this periodically: before every GC, and whenever the run queue is
- * empty.
- *
- * An elegant solution might be to just wake up all the blocked
- * threads with awakenBlockedQueue occasionally: they'll go back to
- * sleep again if the object is still a BLACKHOLE. Unfortunately this
- * doesn't give us a way to tell whether we've actually managed to
- * wake up any threads, so we would be busy-waiting.
- *
- * -------------------------------------------------------------------------- */
-
-static rtsBool
-checkBlackHoles (Capability *cap)
-{
- StgTSO **prev, *t;
- rtsBool any_woke_up = rtsFalse;
- StgHalfWord type;
-
- // blackhole_queue is global:
- ASSERT_LOCK_HELD(&sched_mutex);
-
- debugTrace(DEBUG_sched, "checking threads blocked on black holes");
-
- // ASSUMES: sched_mutex
- prev = &blackhole_queue;
- t = blackhole_queue;
- while (t != END_TSO_QUEUE) {
- if (t->what_next == ThreadRelocated) {
- t = t->_link;
- continue;
- }
- ASSERT(t->why_blocked == BlockedOnBlackHole);
- type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
- if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
- IF_DEBUG(sanity,checkTSO(t));
- t = unblockOne(cap, t);
- *prev = t;
- any_woke_up = rtsTrue;
- } else {
- prev = &t->_link;
- t = t->_link;
- }
- }
-
- return any_woke_up;
-}
-
-/* -----------------------------------------------------------------------------
Deleting threads
This is used for interruption (^C) and forking, and corresponds to
@@ -2517,7 +2334,7 @@ checkBlackHoles (Capability *cap)
-------------------------------------------------------------------------- */
static void
-deleteThread (Capability *cap, StgTSO *tso)
+deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
{
// NOTE: must only be called on a TSO that we have exclusive
// access to, because we will call throwToSingleThreaded() below.
@@ -2526,7 +2343,7 @@ deleteThread (Capability *cap, StgTSO *tso)
if (tso->why_blocked != BlockedOnCCall &&
tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
- throwToSingleThreaded(cap,tso,NULL);
+ throwToSingleThreaded(tso->cap,tso,NULL);
}
}
@@ -2599,8 +2416,8 @@ raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
SET_HDR(raise_closure, &stg_raise_info, CCCS);
raise_closure->payload[0] = exception;
}
- UPD_IND(cap, ((StgUpdateFrame *)p)->updatee,
- (StgClosure *)raise_closure);
+ updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
+ (StgClosure *)raise_closure);
p = next;
continue;
View
9 rts/Schedule.h
@@ -86,15 +86,6 @@ extern StgTSO *blocked_queue_hd, *blocked_queue_tl;
extern StgTSO *sleeping_queue;
#endif
-/* Set to rtsTrue if there are threads on the blackhole_queue, and
- * it is possible that one or more of them may be available to run.
- * This flag is set to rtsFalse after we've checked the queue, and
- * set to rtsTrue just before we run some Haskell code. It is used
- * to decide whether we should yield the Capability or not.
- * Locks required : none (see scheduleCheckBlackHoles()).
- */
-extern rtsBool blackholes_need_checking;
-
extern rtsBool heap_overflow;
#if defined(THREADED_RTS)
View
161 rts/StgMiscClosures.cmm
@@ -283,96 +283,105 @@ INFO_TABLE(stg_IND_OLDGEN_PERM,1,0,IND_OLDGEN_PERM,"IND_OLDGEN_PERM","IND_OLDGEN
waiting for the evaluation of the closure to finish.
------------------------------------------------------------------------- */
-/* Note: a BLACKHOLE must be big enough to be
- * overwritten with an indirection/evacuee/catch. Thus we claim it
- * has 1 non-pointer word of payload.
- */
-INFO_TABLE(stg_BLACKHOLE,0,1,BLACKHOLE,"BLACKHOLE","BLACKHOLE")
+INFO_TABLE(stg_BLACKHOLE,1,0,BLACKHOLE,"BLACKHOLE","BLACKHOLE")
{
- TICK_ENT_BH();
-
-#ifdef THREADED_RTS
- // foreign "C" debugBelch("BLACKHOLE entry\n");
-#endif
-
- /* Actually this is not necessary because R1 is about to be destroyed. */
- LDV_ENTER(R1);
+ W_ r, p, info, bq, msg, owner, bd;
-#if defined(THREADED_RTS)
- ACQUIRE_LOCK(sched_mutex "ptr");
- // released in stg_block_blackhole_finally
-#endif
-
- /* Put ourselves on the blackhole queue */
- StgTSO__link(CurrentTSO) = W_[blackhole_queue];
- W_[blackhole_queue] = CurrentTSO;
-
- /* jot down why and on what closure we are blocked */
- StgTSO_why_blocked(CurrentTSO) = BlockedOnBlackHole::I16;
- StgTSO_block_info(CurrentTSO) = R1;
+ TICK_ENT_DYN_IND(); /* tick */
- jump stg_block_blackhole;
+retry:
+ p = StgInd_indirectee(R1);
+ if (GETTAG(p) != 0) {
+ R1 = p;
+ jump %ENTRY_CODE(Sp(0));
+ }
+
+ info = StgHeader_info(p);
+ if (info == stg_IND_info) {
+ // This could happen, if e.g. we got a BLOCKING_QUEUE that has
+ // just been replaced with an IND by another thread in
+ // wakeBlockingQueue().
+ goto retry;
+ }
+
+ if (info == stg_TSO_info ||
+ info == stg_BLOCKING_QUEUE_CLEAN_info ||
+ info == stg_BLOCKING_QUEUE_DIRTY_info)
+ {
+ ("ptr" msg) = foreign "C" allocate(MyCapability() "ptr",
+ BYTES_TO_WDS(SIZEOF_MessageBlackHole)) [R1];
+
+ StgHeader_info(msg) = stg_MSG_BLACKHOLE_info;
+ MessageBlackHole_tso(msg) = CurrentTSO;
+ MessageBlackHole_bh(msg) = R1;
+
+ (r) = foreign "C" messageBlackHole(MyCapability() "ptr", msg "ptr") [R1];
+
+ if (r == 0) {
+ goto retry;
+ } else {
+ StgTSO_why_blocked(CurrentTSO) = BlockedOnBlackHole::I16;
+ StgTSO_block_info(CurrentTSO) = msg;
+ jump stg_block_blackhole;
+ }
+ }
+ else
+ {
+ R1 = p;
+ ENTER();
+ }
}
-/* identical to BLACKHOLEs except for the infotag */
-INFO_TABLE(stg_CAF_BLACKHOLE,0,1,CAF_BLACKHOLE,"CAF_BLACKHOLE","CAF_BLACKHOLE")
+INFO_TABLE(__stg_EAGER_BLACKHOLE,1,0,BLACKHOLE,"BLACKHOLE","BLACKHOLE")
{
- TICK_ENT_BH();
- LDV_ENTER(R1);
-
-#if defined(THREADED_RTS)
- // foreign "C" debugBelch("BLACKHOLE entry\n");
-#endif
-
-#if defined(THREADED_RTS)
- ACQUIRE_LOCK(sched_mutex "ptr");
- // released in stg_block_blackhole_finally
-#endif
-
- /* Put ourselves on the blackhole queue */
- StgTSO__link(CurrentTSO) = W_[blackhole_queue];
- W_[blackhole_queue] = CurrentTSO;
-
- /* jot down why and on what closure we are blocked */
- StgTSO_why_blocked(CurrentTSO) = BlockedOnBlackHole::I16;
- StgTSO_block_info(CurrentTSO) = R1;
-
- jump stg_block_blackhole;
+ jump ENTRY_LBL(stg_BLACKHOLE);
}
-INFO_TABLE(__stg_EAGER_BLACKHOLE,0,1,BLACKHOLE,"EAGER_BLACKHOLE","EAGER_BLACKHOLE")
+// CAF_BLACKHOLE is allocated when entering a CAF. The reason it is
+// distinct from BLACKHOLE is so that we can tell the difference
+// between an update frame on the stack that points to a CAF under
+// evaluation, and one that points to a closure that is under
+// evaluation by another thread (a BLACKHOLE). See threadPaused().
+//
+INFO_TABLE(stg_CAF_BLACKHOLE,1,0,BLACKHOLE,"BLACKHOLE","BLACKHOLE")
{
- TICK_ENT_BH();
-
-#ifdef THREADED_RTS
- // foreign "C" debugBelch("BLACKHOLE entry\n");
-#endif
-
- /* Actually this is not necessary because R1 is about to be destroyed. */
- LDV_ENTER(R1);
-
-#if defined(THREADED_RTS)
- ACQUIRE_LOCK(sched_mutex "ptr");
- // released in stg_block_blackhole_finally
-#endif
-
- /* Put ourselves on the blackhole queue */
- StgTSO__link(CurrentTSO) = W_[blackhole_queue];
- W_[blackhole_queue] = CurrentTSO;
+ jump ENTRY_LBL(stg_BLACKHOLE);
+}
- /* jot down why and on what closure we are blocked */
- StgTSO_why_blocked(CurrentTSO) = BlockedOnBlackHole::I16;
- StgTSO_block_info(CurrentTSO) = R1;
+INFO_TABLE(stg_BLOCKING_QUEUE_CLEAN,4,0,BLOCKING_QUEUE,"BLOCKING_QUEUE","BLOCKING_QUEUE")
+{ foreign "C" barf("BLOCKING_QUEUE_CLEAN object entered!") never returns; }
+
- jump stg_block_blackhole;
-}
+INFO_TABLE(stg_BLOCKING_QUEUE_DIRTY,4,0,BLOCKING_QUEUE,"BLOCKING_QUEUE","BLOCKING_QUEUE")
+{ foreign "C" barf("BLOCKING_QUEUE_DIRTY object entered!") never returns; }
+
/* ----------------------------------------------------------------------------
Whiteholes are used for the "locked" state of a closure (see lockClosure())
------------------------------------------------------------------------- */
INFO_TABLE(stg_WHITEHOLE, 0,0, WHITEHOLE, "WHITEHOLE", "WHITEHOLE")
-{ foreign "C" barf("WHITEHOLE object entered!") never returns; }
+{
+#if defined(THREADED_RTS)
+ W_ info, i;
+
+ i = 0;
+loop:
+ // spin until the WHITEHOLE is updated
+ info = StgHeader_info(R1);
+ if (info == stg_WHITEHOLE_info) {
+ i = i + 1;
+ if (i == SPIN_COUNT) {
+ i = 0;
+ foreign "C" yieldThread() [R1];
+ }
+ goto loop;
+ }
+ jump %ENTRY_CODE(info);
+#else
+ foreign "C" barf("WHITEHOLE object entered!") never returns;
+#endif
+}
/* ----------------------------------------------------------------------------
Some static info tables for things that don't get entered, and
@@ -485,9 +494,15 @@ CLOSURE(stg_NO_TREC_closure,stg_NO_TREC);
INFO_TABLE_CONSTR(stg_MSG_WAKEUP,2,0,0,PRIM,"MSG_WAKEUP","MSG_WAKEUP")
{ foreign "C" barf("MSG_WAKEUP object entered!") never returns; }
+INFO_TABLE_CONSTR(stg_MSG_TRY_WAKEUP,2,0,0,PRIM,"MSG_TRY_WAKEUP","MSG_TRY_WAKEUP")
+{ foreign "C" barf("MSG_TRY_WAKEUP object entered!") never returns; }
+
INFO_TABLE_CONSTR(stg_MSG_THROWTO,4,0,0,PRIM,"MSG_THROWTO","MSG_THROWTO")
{ foreign "C" barf("MSG_THROWTO object entered!") never returns; }
+INFO_TABLE_CONSTR(stg_MSG_BLACKHOLE,3,0,0,PRIM,"MSG_BLACKHOLE","MSG_BLACKHOLE")
+{ foreign "C" barf("MSG_BLACKHOLE object entered!") never returns; }
+
/* ----------------------------------------------------------------------------
END_TSO_QUEUE
View
66 rts/ThreadPaused.c
@@ -14,6 +14,7 @@
#include "Updates.h"
#include "RaiseAsync.h"
#include "Trace.h"
+#include "Threads.h"
#include <string.h> // for memmove()
@@ -75,7 +76,7 @@ stackSqueeze(Capability *cap, StgTSO *tso, StgPtr bottom)