Permalink
Browse files

Change the representation of the MVar blocked queue

The list of threads blocked on an MVar is now represented as a list of
separately allocated objects rather than being linked through the TSOs
themselves.  This lets us remove a TSO from the list in O(1) time
rather than O(n) time, by marking the list object.  Removing this
linear component fixes some pathalogical performance cases where many
threads were blocked on an MVar and became unreachable simultaneously
(nofib/smp/threads007), or when sending an asynchronous exception to a
TSO in a long list of thread blocked on an MVar.

MVar performance has actually improved by a few percent as a result of
this change, slightly to my surprise.

This is the final cleanup in the sequence, which let me remove the old
way of waking up threads (unblockOne(), MSG_WAKEUP) in favour of the
new way (tryWakeupThread and MSG_TRY_WAKEUP, which is idempotent).  It
is now the case that only the Capability that owns a TSO may modify
its state (well, almost), and this simplifies various things.  More of
the RTS is based on message-passing between Capabilities now.
  • Loading branch information...
1 parent 7c4cb84 commit f4692220c7cbdadaa633f50eb2b30b59edb30183 @simonmar simonmar committed Apr 1, 2010
@@ -372,6 +372,10 @@ main(int argc, char *argv[])
closure_field(StgMVar,tail);
closure_field(StgMVar,value);
+ closure_size(StgMVarTSOQueue);
+ closure_field(StgMVarTSOQueue, link);
+ closure_field(StgMVarTSOQueue, tso);
+
closure_size(StgBCO);
closure_field(StgBCO, instrs);
closure_field(StgBCO, literals);
View
@@ -227,8 +227,12 @@
/* same as above but don't unblock async exceptions in resumeThread() */
/* Involved in a message sent to tso->msg_cap */
-#define BlockedOnMsgWakeup 12
-#define BlockedOnMsgThrowTo 13
+#define BlockedOnMsgThrowTo 12
+
+/* The thread is not on any run queues, but can be woken up
+ by tryWakeupThread() */
+#define ThreadMigrating 13
+
/*
* These constants are returned to the scheduler by a thread that has
* stopped for one reason or another. See typedef StgThreadReturnCode
@@ -109,7 +109,8 @@ typedef struct bdescr_ {
#else
-INLINE_HEADER bdescr *Bdescr(StgPtr p)
+EXTERN_INLINE bdescr *Bdescr(StgPtr p);
+EXTERN_INLINE bdescr *Bdescr(StgPtr p)
{
return (bdescr *)
((((W_)p & MBLOCK_MASK & ~BLOCK_MASK) >> (BLOCK_SHIFT-BDESCR_SHIFT))
@@ -305,11 +305,17 @@ typedef struct {
/* Concurrent communication objects */
+typedef struct StgMVarTSOQueue_ {
+ StgHeader header;
+ struct StgMVarTSOQueue_ *link;
+ struct StgTSO_ *tso;
+} StgMVarTSOQueue;
+
typedef struct {
- StgHeader header;
- struct StgTSO_ *head;
- struct StgTSO_ *tail;
- StgClosure* value;
+ StgHeader header;
+ struct StgMVarTSOQueue_ *head;
+ struct StgMVarTSOQueue_ *tail;
+ StgClosure* value;
} StgMVar;
@@ -82,7 +82,6 @@ typedef struct StgTSO_ {
/*
Currently used for linking TSOs on:
* cap->run_queue_{hd,tl}
- * MVAR queue
* (non-THREADED_RTS); the blocked_queue
* and pointing to the relocated version of a ThreadRelocated
@@ -42,10 +42,10 @@
# define RTS_FUN(f) RTS_FUN_INFO(f##_info)
# define RTS_THUNK(f) RTS_THUNK_INFO(f##_info)
#else
-# define RTS_RET(f) RTS_INFO(f##_info) RTS_FUN_DECL(f##_ret)
-# define RTS_ENTRY(f) RTS_INFO(f##_info) RTS_FUN_DECL(f##_entry)
-# define RTS_FUN(f) RTS_FUN_INFO(f##_info) RTS_FUN_DECL(f##_entry)
-# define RTS_THUNK(f) RTS_THUNK_INFO(f##_info) RTS_FUN_DECL(f##_entry)
+# define RTS_RET(f) RTS_INFO(f##_info); RTS_FUN_DECL(f##_ret)
+# define RTS_ENTRY(f) RTS_INFO(f##_info); RTS_FUN_DECL(f##_entry)
+# define RTS_FUN(f) RTS_FUN_INFO(f##_info); RTS_FUN_DECL(f##_entry)
+# define RTS_THUNK(f) RTS_THUNK_INFO(f##_info); RTS_FUN_DECL(f##_entry)
#endif
/* Stack frames */
@@ -109,7 +109,6 @@ RTS_ENTRY(stg_MUT_ARR_PTRS_FROZEN0);
RTS_ENTRY(stg_MUT_VAR_CLEAN);
RTS_ENTRY(stg_MUT_VAR_DIRTY);
RTS_ENTRY(stg_END_TSO_QUEUE);
-RTS_ENTRY(stg_MSG_WAKEUP);
RTS_ENTRY(stg_MSG_TRY_WAKEUP);
RTS_ENTRY(stg_MSG_THROWTO);
RTS_ENTRY(stg_MSG_BLACKHOLE);
View
@@ -199,9 +199,9 @@ extern volatile StgWord waiting_for_gc;
//
void waitForReturnCapability (Capability **cap/*in/out*/, Task *task);
-INLINE_HEADER void recordMutableCap (StgClosure *p, Capability *cap, nat gen);
+EXTERN_INLINE void recordMutableCap (StgClosure *p, Capability *cap, nat gen);
-INLINE_HEADER void recordClosureMutated (Capability *cap, StgClosure *p);
+EXTERN_INLINE void recordClosureMutated (Capability *cap, StgClosure *p);
#if defined(THREADED_RTS)
@@ -291,7 +291,7 @@ INLINE_HEADER rtsBool emptyInbox(Capability *cap);;
* INLINE functions... private below here
* -------------------------------------------------------------------------- */
-INLINE_HEADER void
+EXTERN_INLINE void
recordMutableCap (StgClosure *p, Capability *cap, nat gen)
{
bdescr *bd;
@@ -310,7 +310,7 @@ recordMutableCap (StgClosure *p, Capability *cap, nat gen)
*bd->free++ = (StgWord)p;
}
-INLINE_HEADER void
+EXTERN_INLINE void
recordClosureMutated (Capability *cap, StgClosure *p)
{
bdescr *bd;
View
@@ -481,9 +481,13 @@ INFO_TABLE_RET( stg_gc_gen, RET_DYN )
stg_gc_gen
{
+ // Hack; see Note [mvar-heap-check] in PrimOps.cmm
+ if (R10 == stg_putMVarzh || R10 == stg_takeMVarzh) {
+ unlockClosure(R1, stg_MVAR_DIRTY_info)
+ }
SAVE_EVERYTHING;
GC_GENERIC
-}
+}
// A heap check at an unboxed tuple return point. The return address
// is on the stack, and we can find it by using the offsets given
@@ -583,11 +587,7 @@ INFO_TABLE_RET( stg_block_takemvar, RET_SMALL, P_ unused )
// code fragment executed just before we return to the scheduler
stg_block_takemvar_finally
{
-#ifdef THREADED_RTS
unlockClosure(R3, stg_MVAR_DIRTY_info);
-#else
- SET_INFO(R3, stg_MVAR_DIRTY_info);
-#endif
jump StgReturn;
}
View
@@ -5,3 +5,4 @@
#include "PosixSource.h"
#include "Rts.h"
#include "Schedule.h"
+#include "Capability.h"
View
@@ -28,8 +28,7 @@ void sendMessage(Capability *from_cap, Capability *to_cap, Message *msg)
#ifdef DEBUG
{
const StgInfoTable *i = msg->header.info;
- if (i != &stg_MSG_WAKEUP_info &&
- i != &stg_MSG_THROWTO_info &&
+ if (i != &stg_MSG_THROWTO_info &&
i != &stg_MSG_BLACKHOLE_info &&
i != &stg_MSG_TRY_WAKEUP_info &&
i != &stg_IND_info && // can happen if a MSG_BLACKHOLE is revoked
@@ -71,21 +70,7 @@ executeMessage (Capability *cap, Message *m)
loop:
write_barrier(); // allow m->header to be modified by another thread
i = m->header.info;
- if (i == &stg_MSG_WAKEUP_info)
- {
- // the plan is to eventually get rid of these and use
- // TRY_WAKEUP instead.
- MessageWakeup *w = (MessageWakeup *)m;
- StgTSO *tso = w->tso;
- debugTraceCap(DEBUG_sched, cap, "message: wakeup thread %ld",
- (lnat)tso->id);
- ASSERT(tso->cap == cap);
- ASSERT(tso->why_blocked == BlockedOnMsgWakeup);
- ASSERT(tso->block_info.closure == (StgClosure *)m);
- tso->why_blocked = NotBlocked;
- appendToRunQueue(cap, tso);
- }
- else if (i == &stg_MSG_TRY_WAKEUP_info)
+ if (i == &stg_MSG_TRY_WAKEUP_info)
{
StgTSO *tso = ((MessageWakeup *)m)->tso;
debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld",
Oops, something went wrong.

0 comments on commit f469222

Please sign in to comment.