Skip to content

Commit

Permalink
Add some more flexibility to the multiproc scheduler
Browse files Browse the repository at this point in the history
There are two new options in the -threaded RTS:
 
  -qm       Don't automatically migrate threads between CPUs
  -qw       Migrate a thread to the current CPU when it is woken up

previously both of these were effectively off, i.e. threads were
migrated between CPUs willy-milly, and threads were always migrated to
the current CPU when woken up.  This is the first step in tweaking the
scheduling for more effective work balancing, there will no doubt be
more to come.
  • Loading branch information
Simon Marlow committed Mar 24, 2006
1 parent 354cefe commit 4368121
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 51 deletions.
2 changes: 2 additions & 0 deletions ghc/includes/RtsFlags.h
Expand Up @@ -164,6 +164,8 @@ struct PAR_FLAGS {
#ifdef THREADED_RTS
struct PAR_FLAGS {
nat nNodes; /* number of threads to run simultaneously */
rtsBool migrate; /* migrate threads between capabilities */
rtsBool wakeupMigrate; /* migrate a thread on wakeup */
unsigned int maxLocalSparks;
};
#endif /* THREADED_RTS */
Expand Down
55 changes: 29 additions & 26 deletions ghc/includes/TSO.h
Expand Up @@ -93,6 +93,8 @@ typedef StgWord32 StgThreadID;
*/
#define TSO_DIRTY 1

#define tsoDirty(tso) ((tso)->flags & TSO_DIRTY)

/*
* Type returned after running a thread. Values of this type
* include HeapOverflow, StackOverflow etc. See Constants.h for the
Expand Down Expand Up @@ -134,43 +136,44 @@ typedef union {
*/

typedef struct StgTSO_ {
StgHeader header;

struct StgTSO_* link; /* Links threads onto blocking queues */
struct StgTSO_* global_link; /* Links all threads together */

StgWord16 what_next; /* Values defined in Constants.h */
StgWord16 why_blocked; /* Values defined in Constants.h */
StgWord32 flags;
StgTSOBlockInfo block_info;
struct StgTSO_* blocked_exceptions;
StgThreadID id;
int saved_errno;
struct Task_* bound; // non-NULL for a bound thread
struct StgTRecHeader_ *trec; /* STM transaction record */

StgHeader header;

struct StgTSO_* link; /* Links threads onto blocking queues */
struct StgTSO_* global_link; /* Links all threads together */

StgWord16 what_next; /* Values defined in Constants.h */
StgWord16 why_blocked; /* Values defined in Constants.h */
StgWord32 flags;
StgTSOBlockInfo block_info;
struct StgTSO_* blocked_exceptions;
StgThreadID id;
int saved_errno;
struct Task_* bound;
struct Capability_* cap;
struct StgTRecHeader_ * trec; /* STM transaction record */

#ifdef TICKY_TICKY
/* TICKY-specific stuff would go here. */
/* TICKY-specific stuff would go here. */
#endif
#ifdef PROFILING
StgTSOProfInfo prof;
StgTSOProfInfo prof;
#endif
#ifdef PAR
StgTSOParInfo par;
StgTSOParInfo par;
#endif
#ifdef GRAN
StgTSOGranInfo gran;
StgTSOGranInfo gran;
#endif
#ifdef DIST
StgTSODistInfo dist;
StgTSODistInfo dist;
#endif

/* The thread stack... */
StgWord stack_size; /* stack size in *words* */
StgWord max_stack_size; /* maximum stack size in *words* */
StgPtr sp;

StgWord stack[FLEXIBLE_ARRAY];
/* The thread stack... */
StgWord32 stack_size; /* stack size in *words* */
StgWord32 max_stack_size; /* maximum stack size in *words* */
StgPtr sp;
StgWord stack[FLEXIBLE_ARRAY];
} StgTSO;

/* -----------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion ghc/rts/Apply.cmm
Expand Up @@ -32,7 +32,7 @@ stg_ap_0_fast
IF_DEBUG(sanity,
foreign "C" checkStackChunk(Sp "ptr",
CurrentTSO + TSO_OFFSET_StgTSO_stack +
WDS(StgTSO_stack_size(CurrentTSO)) "ptr") [R1]);
WDS(TO_W_(StgTSO_stack_size(CurrentTSO))) "ptr") [R1]);

ENTER();
}
Expand Down
40 changes: 38 additions & 2 deletions ghc/rts/Capability.c
Expand Up @@ -67,7 +67,9 @@ anyWorkForMe( Capability *cap, Task *task )
// other global condition to check, such as threads blocked on
// blackholes).
if (emptyRunQueue(cap)) {
return !emptySparkPoolCap(cap) || globalWorkToDo();
return !emptySparkPoolCap(cap)
|| !emptyWakeupQueue(cap)
|| globalWorkToDo();
} else
return cap->run_queue_hd->bound == NULL;
}
Expand Down Expand Up @@ -135,6 +137,8 @@ initCapability( Capability *cap, nat i )
cap->suspended_ccalling_tasks = NULL;
cap->returning_tasks_hd = NULL;
cap->returning_tasks_tl = NULL;
cap->wakeup_queue_hd = END_TSO_QUEUE;
cap->wakeup_queue_tl = END_TSO_QUEUE;
#endif

cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1;
Expand Down Expand Up @@ -296,7 +300,8 @@ releaseCapability_ (Capability* cap)

// If we have an unbound thread on the run queue, or if there's
// anything else to do, give the Capability to a worker thread.
if (!emptyRunQueue(cap) || !emptySparkPoolCap(cap) || globalWorkToDo()) {
if (!emptyRunQueue(cap) || !emptyWakeupQueue(cap)
|| !emptySparkPoolCap(cap) || globalWorkToDo()) {
if (cap->spare_workers) {
giveCapabilityToTask(cap,cap->spare_workers);
// The worker Task pops itself from the queue;
Expand Down Expand Up @@ -501,6 +506,37 @@ yieldCapability (Capability** pCap, Task *task)
return;
}

/* ----------------------------------------------------------------------------
* Wake up a thread on a Capability.
*
* This is used when the current Task is running on a Capability and
* wishes to wake up a thread on a different Capability.
* ------------------------------------------------------------------------- */

void
wakeupThreadOnCapability (Capability *cap, StgTSO *tso)
{
ASSERT(tso->cap == cap);
ASSERT(tso->bound ? tso->bound->cap == cap : 1);

ACQUIRE_LOCK(&cap->lock);
if (cap->running_task == NULL) {
// nobody is running this Capability, we can add our thread
// directly onto the run queue and start up a Task to run it.
appendToRunQueue(cap,tso);

// start it up
cap->running_task = myTask(); // precond for releaseCapability_()
releaseCapability_(cap);
} else {
appendToWakeupQueue(cap,tso);
// someone is running on this Capability, so it cannot be
// freed without first checking the wakeup queue (see
// releaseCapability_).
}
RELEASE_LOCK(&cap->lock);
}

/* ----------------------------------------------------------------------------
* prodCapabilities
*
Expand Down
13 changes: 12 additions & 1 deletion ghc/rts/Capability.h
Expand Up @@ -70,7 +70,7 @@ struct Capability_ {
// Worker Tasks waiting in the wings. Singly-linked.
Task *spare_workers;

// This lock protects running_task and returning_tasks_{hd,tl}.
// This lock protects running_task, returning_tasks_{hd,tl}, wakeup_queue.
Mutex lock;

// Tasks waiting to return from a foreign call, or waiting to make
Expand All @@ -80,6 +80,12 @@ struct Capability_ {
// check whether it is NULL without taking the lock, however.
Task *returning_tasks_hd; // Singly-linked, with head/tail
Task *returning_tasks_tl;

// A list of threads to append to this Capability's run queue at
// the earliest opportunity. These are threads that have been
// woken up by another Capability.
StgTSO *wakeup_queue_hd;
StgTSO *wakeup_queue_tl;
#endif

// Per-capability STM-related data
Expand Down Expand Up @@ -189,6 +195,11 @@ void yieldCapability (Capability** pCap, Task *task);
//
void waitForCapability (Task *task, Mutex *mutex, Capability **pCap);

// Wakes up a thread on a Capability (probably a different Capability
// from the one held by the current Task).
//
void wakeupThreadOnCapability (Capability *cap, StgTSO *tso);

// Wakes up a worker thread on just one Capability, used when we
// need to service some global event.
//
Expand Down
2 changes: 1 addition & 1 deletion ghc/rts/Exception.cmm
Expand Up @@ -380,7 +380,7 @@ retry_pop_stack:
* entry code in StgStartup.cmm.
*/
Sp = CurrentTSO + TSO_OFFSET_StgTSO_stack
+ WDS(StgTSO_stack_size(CurrentTSO)) - WDS(2);
+ WDS(TO_W_(StgTSO_stack_size(CurrentTSO))) - WDS(2);
Sp(1) = R1; /* save the exception */
Sp(0) = stg_enter_info; /* so that GC can traverse this stack */
StgTSO_what_next(CurrentTSO) = ThreadKilled::I16;
Expand Down
25 changes: 25 additions & 0 deletions ghc/rts/RtsFlags.c
Expand Up @@ -219,6 +219,8 @@ void initRtsFlagsDefaults(void)

#ifdef THREADED_RTS
RtsFlags.ParFlags.nNodes = 1;
RtsFlags.ParFlags.migrate = rtsTrue;
RtsFlags.ParFlags.wakeupMigrate = rtsFalse;
#endif

#ifdef PAR
Expand Down Expand Up @@ -437,6 +439,8 @@ usage_text[] = {
#endif /* DEBUG */
#if defined(THREADED_RTS)
" -N<n> Use <n> OS threads (default: 1)",
" -qm Don't automatically migrate threads between CPUs",
" -qw Migrate a thread to the current CPU when it is woken up",
#endif
#if defined(THREADED_RTS) || defined(PAR)
" -e<size> Size of spark pools (default 100)",
Expand Down Expand Up @@ -1049,6 +1053,25 @@ error = rtsTrue;
}
}
) break;

case 'q':
switch (rts_argv[arg][2]) {
case '\0':
errorBelch("incomplete RTS option: %s",rts_argv[arg]);
error = rtsTrue;
break;
case 'm':
RtsFlags.ParFlags.migrate = rtsFalse;
break;
case 'w':
RtsFlags.ParFlags.wakeupMigrate = rtsTrue;
break;
default:
errorBelch("unknown RTS option: %s",rts_argv[arg]);
error = rtsTrue;
break;
}
break;
#endif
/* =========== PARALLEL =========================== */
case 'e':
Expand All @@ -1063,10 +1086,12 @@ error = rtsTrue;
}
) break;

#ifdef PAR
case 'q':
PAR_BUILD_ONLY(
process_par_option(arg, rts_argc, rts_argv, &error);
) break;
#endif

/* =========== GRAN =============================== */

Expand Down

0 comments on commit 4368121

Please sign in to comment.