From 06c7e2fbbeb4a2f5816d6e0e950e75d46907872e Mon Sep 17 00:00:00 2001 From: kpamnany Date: Tue, 27 Jun 2017 17:53:49 +0530 Subject: [PATCH] Integrating partr in progress Added partr code. Abstracted interface to threading infrastructure. Original threading working. Parallel task runtime compiles. Task merge in progress. --- Make.inc | 1 + src/Makefile | 7 +- src/atomics.h | 6 + src/forkjoin-ti.c | 366 +++++++++++++++++++ src/forkjoin-ti.h | 21 ++ src/gc.c | 4 + src/init.c | 4 + src/julia.h | 107 ++++++ src/julia_internal.h | 17 +- src/julia_threads.h | 5 + src/locks.h | 16 + src/options.h | 4 + src/partr.c | 854 +++++++++++++++++++++++++++++++++++++++++++ src/partr.h | 53 +++ src/task.c | 12 + src/threadgroup.c | 206 ----------- src/threadgroup.h | 44 --- src/threading.c | 364 ++---------------- src/threading.h | 74 ++-- 19 files changed, 1539 insertions(+), 626 deletions(-) create mode 100644 src/forkjoin-ti.c create mode 100644 src/forkjoin-ti.h create mode 100644 src/partr.c create mode 100644 src/partr.h delete mode 100644 src/threadgroup.c delete mode 100644 src/threadgroup.h diff --git a/Make.inc b/Make.inc index e7768166993d6..994da057a3947 100644 --- a/Make.inc +++ b/Make.inc @@ -998,6 +998,7 @@ endif # Threads ifneq ($(JULIA_THREADS), 0) JCPPFLAGS += -DJULIA_ENABLE_THREADING -DJULIA_NUM_THREADS=$(JULIA_THREADS) +//JCPPFLAGS += -DJULIA_ENABLE_THREADING -DJULIA_NUM_THREADS=$(JULIA_THREADS) -DJULIA_ENABLE_PARTR endif # Intel VTune Amplifier diff --git a/src/Makefile b/src/Makefile index d3ff989009006..a6e88a4d31b05 100644 --- a/src/Makefile +++ b/src/Makefile @@ -39,9 +39,8 @@ SRCS := \ jltypes gf typemap ast builtins module interpreter symbol \ dlload sys init task array dump staticdata toplevel jl_uv datatype \ simplevector APInt-C runtime_intrinsics runtime_ccall precompile \ - threadgroup threading stackwalk gc gc-debug gc-pages method \ - jlapi signal-handling safepoint jloptions timing subtype rtutils \ - crc32c + threading forkjoin-ti partr stackwalk gc gc-debug gc-pages method \ + jlapi signal-handling safepoint jloptions timing subtype rtutils crc32c ifeq ($(USEMSVC), 1) SRCS += getopt @@ -197,7 +196,7 @@ $(BUILDDIR)/gc-debug.o $(BUILDDIR)/gc-debug.dbg.obj: $(SRCDIR)/gc.h $(BUILDDIR)/gc-pages.o $(BUILDDIR)/gc-pages.dbg.obj: $(SRCDIR)/gc.h $(BUILDDIR)/signal-handling.o $(BUILDDIR)/signal-handling.dbg.obj: $(addprefix $(SRCDIR)/,signals-*.c) $(BUILDDIR)/dump.o $(BUILDDIR)/dump.dbg.obj: $(addprefix $(SRCDIR)/,common_symbols1.inc common_symbols2.inc) -$(addprefix $(BUILDDIR)/,threading.o threading.dbg.obj gc.o gc.dbg.obj init.c init.dbg.obj task.o task.dbg.obj): $(addprefix $(SRCDIR)/,threading.h threadgroup.h) +$(addprefix $(BUILDDIR)/,threading.o threading.dbg.obj gc.o gc.dbg.obj init.c init.dbg.obj task.o task.dbg.obj): $(addprefix $(SRCDIR)/,threading.h) $(addprefix $(BUILDDIR)/,APInt-C.o APInt-C.dbg.obj runtime_intrinsics.o runtime_intrinsics.dbg.obj): $(SRCDIR)/APInt-C.h # archive library file rules diff --git a/src/atomics.h b/src/atomics.h index 493f0297892bc..ebfc66bbd83f4 100644 --- a/src/atomics.h +++ b/src/atomics.h @@ -62,8 +62,12 @@ // the __atomic builtins or c11 atomics with GNU extension or c11 _Generic # define jl_atomic_compare_exchange(obj, expected, desired) \ __sync_val_compare_and_swap(obj, expected, desired) +# define jl_atomic_bool_compare_exchange(obj, expected, desired) \ + __sync_bool_compare_and_swap(obj, expected, desired) # define jl_atomic_exchange(obj, desired) \ __atomic_exchange_n(obj, desired, __ATOMIC_SEQ_CST) +# define jl_atomic_exchange_generic(obj, desired, orig)\ + __atomic_exchange(obj, desired, orig, __ATOMIC_SEQ_CST) # define jl_atomic_exchange_relaxed(obj, desired) \ __atomic_exchange_n(obj, desired, __ATOMIC_RELAXED) // TODO: Maybe add jl_atomic_compare_exchange_weak for spin lock @@ -115,6 +119,7 @@ jl_atomic_fetch_add(T *obj, T2 arg) { return (T)_InterlockedExchangeAdd64((volatile __int64*)obj, (__int64)arg); } +// TODO: jl_atomic_exchange_generic #define jl_atomic_fetch_add_relaxed(obj, arg) jl_atomic_fetch_add(obj, arg) // and @@ -200,6 +205,7 @@ jl_atomic_compare_exchange(volatile T *obj, T2 expected, T3 desired) return (T)_InterlockedCompareExchange64((volatile __int64*)obj, (__int64)desired, (__int64)expected); } +// TODO: jl_atomic_bool_compare_exchange // atomic exchange template static inline typename std::enable_if::type diff --git a/src/forkjoin-ti.c b/src/forkjoin-ti.c new file mode 100644 index 0000000000000..1723709739372 --- /dev/null +++ b/src/forkjoin-ti.c @@ -0,0 +1,366 @@ +// This file is a part of Julia. License is MIT: https://julialang.org/license + +#include +#include +#include +#include +#include + +#include "julia.h" +#include "julia_internal.h" + +#ifdef __cplusplus +extern "C" { +#endif + +#include "options.h" +#include "threading.h" + +#ifdef JULIA_ENABLE_THREADING +#ifdef JULIA_ENABLE_FORKJOIN_TI + +// for the barrier +typedef struct { + int sense; +} ti_thread_sense_t; + +// thread group +typedef struct { + int16_t *tid_map, num_threads, added_threads; + uint8_t num_sockets, num_cores, num_threads_per_core; + + // fork/join/barrier + uint8_t group_sense; // Written only by master thread + ti_thread_sense_t **thread_sense; + void *envelope; + + // to let threads sleep + uv_mutex_t alarm_lock; + uv_cond_t alarm; + uint64_t sleep_threshold; +} ti_threadgroup_t; + +// thread state +enum { + TI_THREAD_INIT, + TI_THREAD_WORK +}; + +// passed to thread function +typedef struct { + int16_t volatile state; + ti_threadgroup_t *tg; +} ti_threadarg_t; + +// work command to thread function +typedef struct { + jl_method_instance_t *mfunc; + jl_generic_fptr_t fptr; + jl_value_t **args; + uint32_t nargs; + jl_value_t *ret; + jl_module_t *current_module; + size_t world_age; +} ti_threadwork_t; + +// for broadcasting work to threads +static ti_threadwork_t threadwork; + +// only one thread group for now +static ti_threadgroup_t *tgworld; + + +// threadgroup functions +// --- +static int ti_threadgroup_create(uint8_t num_sockets, uint8_t num_cores, + uint8_t num_threads_per_core, + ti_threadgroup_t **newtg) +{ + int i; + ti_threadgroup_t *tg; + int num_threads = num_sockets * num_cores * num_threads_per_core; + char *cp; + + tg = (ti_threadgroup_t*)jl_malloc_aligned(sizeof(ti_threadgroup_t), 64); + tg->tid_map = (int16_t*)jl_malloc_aligned(num_threads * sizeof(int16_t), 64); + for (i = 0; i < num_threads; ++i) + tg->tid_map[i] = -1; + tg->num_sockets = num_sockets; + tg->num_cores = num_cores; + tg->num_threads_per_core = num_threads_per_core; + tg->num_threads = num_threads; + tg->added_threads = 0; + tg->thread_sense = (ti_thread_sense_t**) + jl_malloc_aligned(num_threads * sizeof(ti_thread_sense_t*), 64); + for (i = 0; i < num_threads; i++) + tg->thread_sense[i] = NULL; + jl_atomic_store_release(&tg->group_sense, 0); + + uv_mutex_init(&tg->alarm_lock); + uv_cond_init(&tg->alarm); + + tg->sleep_threshold = DEFAULT_THREAD_SLEEP_THRESHOLD; + cp = getenv(THREAD_SLEEP_THRESHOLD_NAME); + if (cp) { + if (!strncasecmp(cp, "infinite", 8)) + tg->sleep_threshold = 0; + else + tg->sleep_threshold = (uint64_t)strtol(cp, NULL, 10); + } + + *newtg = tg; + return 0; +} + +static int ti_threadgroup_addthread(ti_threadgroup_t *tg, int16_t ext_tid, + int16_t *tgtid) +{ + if (ext_tid < 0 || ext_tid >= tg->num_threads) + return -1; + if (tg->tid_map[ext_tid] != -1) + return -2; + if (tg->added_threads == tg->num_threads) + return -3; + + tg->tid_map[ext_tid] = tg->added_threads++; + if (tgtid) *tgtid = tg->tid_map[ext_tid]; + + return 0; +} + +static int ti_threadgroup_initthread(ti_threadgroup_t *tg, int16_t ext_tid) +{ + ti_thread_sense_t *ts; + + if (ext_tid < 0 || ext_tid >= tg->num_threads) + return -1; + if (tg->thread_sense[tg->tid_map[ext_tid]] != NULL) + return -2; + if (tg->num_threads == 0) + return -3; + + ts = (ti_thread_sense_t*)jl_malloc_aligned(sizeof(ti_thread_sense_t), 64); + ts->sense = 1; + tg->thread_sense[tg->tid_map[ext_tid]] = ts; + + return 0; +} + +static int ti_threadgroup_fork(ti_threadgroup_t *tg, int16_t ext_tid, void **bcast_val, int init) +{ + uint8_t *group_sense = &tg->group_sense; + int16_t tid = tg->tid_map[ext_tid]; + int thread_sense = tg->thread_sense[tid]->sense; + if (tid == 0) { + tg->envelope = bcast_val ? *bcast_val : NULL; + // synchronize `tg->envelope` and `tg->group_sense` + jl_atomic_store_release(group_sense, thread_sense); + + // if it's possible that threads are sleeping, signal them + if (tg->sleep_threshold) { + uv_mutex_lock(&tg->alarm_lock); + uv_cond_broadcast(&tg->alarm); + uv_mutex_unlock(&tg->alarm_lock); + } + } + else { + // spin up to threshold ns (count sheep), then sleep + uint64_t spin_ns; + uint64_t spin_start = 0; + // synchronize `tg->envelope` and `tg->group_sense` + while (jl_atomic_load_acquire(group_sense) != thread_sense) { + if (tg->sleep_threshold) { + if (!spin_start) { + // Lazily initialize spin_start since uv_hrtime is expensive + spin_start = uv_hrtime(); + continue; + } + spin_ns = uv_hrtime() - spin_start; + // In case uv_hrtime is not monotonic, we'll sleep earlier + if (init || spin_ns >= tg->sleep_threshold) { + uv_mutex_lock(&tg->alarm_lock); + if (jl_atomic_load_acquire(group_sense) != thread_sense) { + uv_cond_wait(&tg->alarm, &tg->alarm_lock); + } + uv_mutex_unlock(&tg->alarm_lock); + spin_start = 0; + init = 0; + continue; + } + } + jl_cpu_pause(); + } + if (bcast_val) + *bcast_val = tg->envelope; + } + + return 0; +} + +static int ti_threadgroup_join(ti_threadgroup_t *tg, int16_t ext_tid) +{ + int *p_thread_sense = &tg->thread_sense[tg->tid_map[ext_tid]]->sense; + jl_atomic_store_release(p_thread_sense, !*p_thread_sense); + if (tg->tid_map[ext_tid] == 0) { + jl_ptls_t ptls = jl_get_ptls_states(); + int8_t group_sense = tg->group_sense; + for (int i = 1; i < tg->num_threads; ++i) { + while (jl_atomic_load_acquire(&tg->thread_sense[i]->sense) == group_sense) { + jl_gc_safepoint_(ptls); + jl_cpu_pause(); + } + } + } + + return 0; +} + + +// threading interface +// --- +void jl_init_threadinginfra(void) { } + +void jl_init_threadarg(jl_threadarg_t *targ) +{ + ti_threadarg_t *tiarg = (ti_threadarg_t *)malloc(sizeof (ti_threadarg_t)); + tiarg->state = TI_THREAD_INIT; + targ->arg = (void *)tiarg; +} + +void jl_init_started_threads(jl_threadarg_t **targs) +{ + // set up the world thread group + ti_threadgroup_create(1, jl_n_threads, 1, &tgworld); + for (int i = 0; i < jl_n_threads; ++i) + ti_threadgroup_addthread(tgworld, i, NULL); + + jl_ptls_t ptls = jl_get_ptls_states(); + ti_threadgroup_initthread(tgworld, ptls->tid); + + // give the threads the world thread group; they will block waiting for fork + for (int i = 0; i < jl_n_threads - 1; ++i) { + ti_threadarg_t *tiarg = (ti_threadarg_t *)targs[i]->arg; + tiarg->tg = tgworld; + jl_atomic_store_release(&tiarg->state, TI_THREAD_WORK); + } +} + +// thread function: used by all except the main thread +void jl_threadfun(void *arg) +{ + jl_ptls_t ptls = jl_get_ptls_states(); + jl_threadarg_t *targ = (jl_threadarg_t *)arg; + ti_threadarg_t *tiarg = (ti_threadarg_t *)targ->arg; + ti_threadgroup_t *tg; + ti_threadwork_t *work; + + // initialize this thread (set tid, create heap, etc.) + jl_init_threadtls(targ->tid); + jl_init_stack_limits(0); + + // set up tasking + jl_init_root_task(ptls->stack_lo, ptls->stack_hi - ptls->stack_lo); +#ifdef COPY_STACKS + jl_set_base_ctx((char*)&arg); +#endif + + // set the thread-local tid and wait for a thread group + while (jl_atomic_load_acquire(&tiarg->state) == TI_THREAD_INIT) + jl_cpu_pause(); + + // Assuming the functions called below don't contain unprotected GC + // critical region. In general, the following part of this function + // shouldn't call any managed code without calling `jl_gc_unsafe_enter` + // first. + jl_gc_state_set(ptls, JL_GC_STATE_SAFE, 0); + uv_barrier_wait(targ->barrier); + + // initialize this thread in the thread group + tg = tiarg->tg; + ti_threadgroup_initthread(tg, ptls->tid); + + // free the thread argument here + free(tiarg); + free(targ); + + int init = 1; + + // work loop + for (; ;) { + ti_threadgroup_fork(tg, ptls->tid, (void **)&work, init); + init = 0; + + if (work) { + // TODO: before we support getting return value from + // the work, and after we have proper GC transition + // support in the codegen and runtime we don't need to + // enter GC unsafe region when starting the work. + int8_t gc_state = jl_gc_unsafe_enter(ptls); + // This is probably always NULL for now + jl_module_t *last_m = ptls->current_module; + size_t last_age = ptls->world_age; + JL_GC_PUSH1(&last_m); + ptls->current_module = work->current_module; + ptls->world_age = work->world_age; + jl_thread_run_fun(&work->fptr, work->mfunc, work->args, work->nargs); + ptls->current_module = last_m; + ptls->world_age = last_age; + JL_GC_POP(); + jl_gc_unsafe_leave(ptls, gc_state); + } + + ti_threadgroup_join(tg, ptls->tid); + } +} + +// interface to user code: specialize and compile the user thread function +// and run it in all threads +JL_DLLEXPORT jl_value_t *jl_threading_run(jl_value_t *_args) +{ + jl_ptls_t ptls = jl_get_ptls_states(); + // GC safe + uint32_t nargs; + jl_value_t **args; + if (!jl_is_svec(_args)) { + nargs = 1; + args = &_args; + } + else { + nargs = jl_svec_len(_args); + args = jl_svec_data(_args); + } + + int8_t gc_state = jl_gc_unsafe_enter(ptls); + + threadwork.mfunc = jl_lookup_generic(args, nargs, + jl_int32hash_fast(jl_return_address()), ptls->world_age); + // Ignore constant return value for now. + if (jl_compile_method_internal(&threadwork.fptr, threadwork.mfunc)) + return jl_nothing; + threadwork.args = args; + threadwork.nargs = nargs; + threadwork.ret = jl_nothing; + threadwork.current_module = ptls->current_module; + threadwork.world_age = ptls->world_age; + + // fork the world thread group + ti_threadwork_t *tw = &threadwork; + ti_threadgroup_fork(tgworld, ptls->tid, (void **)&tw, 0); + + // this thread must do work too + tw->ret = jl_thread_run_fun(&threadwork.fptr, threadwork.mfunc, args, nargs); + + // wait for completion + ti_threadgroup_join(tgworld, ptls->tid); + + jl_gc_unsafe_leave(ptls, gc_state); + + return tw->ret; +} + +#endif // JULIA_ENABLE_FORKJOIN_TI +#endif // JULIA_ENABLE_THREADING + +#ifdef __cplusplus +} +#endif diff --git a/src/forkjoin-ti.h b/src/forkjoin-ti.h new file mode 100644 index 0000000000000..0b882cbb7eed4 --- /dev/null +++ b/src/forkjoin-ti.h @@ -0,0 +1,21 @@ +// This file is a part of Julia. License is MIT: https://julialang.org/license + +#ifndef FORKJOINTI_H +#define FORKJOINTI_H + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +// interface provided by this threading infrastructure +JL_DLLEXPORT jl_value_t *jl_threading_run(jl_value_t *_args); + + +#ifdef __cplusplus +} +#endif + +#endif /* FORKJOINTI_H */ + diff --git a/src/gc.c b/src/gc.c index 0c706061f6f13..fc48bc4574425 100644 --- a/src/gc.c +++ b/src/gc.c @@ -2117,7 +2117,11 @@ mark: { jl_task_t *ta = (jl_task_t*)new_obj; gc_scrub_record_task(ta); int stkbuf = (ta->stkbuf != (void*)(intptr_t)-1 && ta->stkbuf != NULL); +#ifdef JULIA_ENABLE_PARTR + int16_t tid = ta->curr_tid; +#else int16_t tid = ta->tid; +#endif jl_ptls_t ptls2 = jl_all_tls_states[tid]; if (stkbuf) { #ifdef COPY_STACKS diff --git a/src/init.c b/src/init.c index 3556f7d8a90d9..9221d1f28a361 100644 --- a/src/init.c +++ b/src/init.c @@ -750,7 +750,11 @@ void jl_get_builtin_hooks(void) int t; for (t = 0; t < jl_n_threads; t++) { jl_ptls_t ptls2 = jl_all_tls_states[t]; +#ifdef JULIA_ENABLE_PARTR + ptls2->root_task->storage = jl_nothing; +#else ptls2->root_task->tls = jl_nothing; +#endif ptls2->root_task->consumers = jl_nothing; ptls2->root_task->donenotify = jl_nothing; ptls2->root_task->exception = jl_nothing; diff --git a/src/julia.h b/src/julia.h index 398d7eeeba9ca..9ff702ed97977 100644 --- a/src/julia.h +++ b/src/julia.h @@ -1483,6 +1483,8 @@ typedef struct _jl_handler_t { size_t world_age; } jl_handler_t; +#if !defined(JULIA_ENABLE_THREADING) || !defined(JULIA_ENABLE_PARTR) + typedef struct _jl_task_t { JL_DATA_TYPE struct _jl_task_t *parent; @@ -1521,6 +1523,111 @@ typedef struct _jl_task_t { jl_timing_block_t *timing_stack; } jl_task_t; +#endif // !JULIA_ENABLE_THREADING || ! JULIA_ENABLE_PARTR + +#if defined(JULIA_ENABLE_THREADING) && defined(JULIA_ENABLE_PARTR) +/* task settings */ +#define TASK_IS_DETACHED 0x02 + /* clean up the task on completion */ +#define TASK_IS_STICKY 0x04 + /* task is sticky to the thread that first runs it */ + +typedef struct _arriver_t arriver_t; +typedef struct _reducer_t reducer_t; + +typedef struct _jl_taskq_t jl_taskq_t; +typedef struct _jl_taskq_t jl_condition_t; // TODO +typedef struct _jl_task_t jl_task_t; + +struct _jl_taskq_t { + jl_task_t *head; + jl_mutex_t lock; +}; + +struct _jl_task_t { + JL_DATA_TYPE + + /* to link this task into queues */ + jl_task_t *next; + + /* context and stack */ + jl_jmp_buf ctx; + size_t bufsz; + void *stkbuf; + + /* task local storage */ + jl_value_t *storage; + + /* state */ + jl_sym_t *state; + size_t started:1; + + /* TODO: other stuff from old task structure */ + size_t ssize; + jl_value_t *consumers; + jl_value_t *donenotify; + jl_value_t *exception; + jl_value_t *backtrace; + arraylist_t locks; + + /* task entry point, arguments, result, etc. */ + jl_method_instance_t *mfunc; + jl_generic_fptr_t fptr; + jl_value_t *args; + jl_value_t *result; + + /* current exception handler */ + jl_handler_t *eh; + + /* saved gc stack top for context switches */ + jl_gcframe_t *gcstack; + + /* current module, or NULL if this task has not set one */ + jl_module_t *current_module; + + /* current world age */ + size_t world_age; + + /* thread currently running this task */ + int16_t curr_tid; + + /* grain's range, for parfors */ + int64_t start, end; + + /* reduction function, for parfors */ + jl_method_instance_t *mredfunc; + jl_generic_fptr_t rfptr; + jl_value_t *rargs; + + /* parent (first) task of a parfor set */ + jl_task_t *parent; + + /* to synchronize/reduce grains of a parfor */ + arriver_t *arr; + reducer_t *red; + + /* parfor reduction result */ + jl_value_t *red_result; + + /* completion queue */ + jl_taskq_t cq; + + /* task settings */ + int8_t settings; + + /* tid of the thread to which this task is sticky */ + int16_t sticky_tid; + + /* the index of this task in the set of grains of a parfor */ + int16_t grain_num; + + /* for the multiqueue */ + int16_t prio; + + jl_timing_block_t *timing_stack; +}; +#endif // JULIA_ENABLE_THREADING && JULIA_ENABLE_PARTR + JL_DLLEXPORT jl_task_t *jl_new_task(jl_function_t *start, size_t ssize); JL_DLLEXPORT void jl_switchto(jl_task_t **pt); JL_DLLEXPORT void JL_NORETURN jl_throw(jl_value_t *e); diff --git a/src/julia_internal.h b/src/julia_internal.h index 94297906a56b3..51c2389680f87 100644 --- a/src/julia_internal.h +++ b/src/julia_internal.h @@ -551,7 +551,6 @@ extern ssize_t jl_tls_offset; extern const int jl_tls_elf_support; void jl_init_threading(void); void jl_start_threads(void); -void jl_shutdown_threading(void); // Whether the GC is running extern char *jl_safepoint_pages; @@ -710,6 +709,22 @@ STATIC_INLINE char *jl_copy_str(char **to, const char *from) // Returns time in nanosec JL_DLLEXPORT uint64_t jl_hrtime(void); +// congruential random number generator +STATIC_INLINE void seed_cong(uint64_t *seed) +{ + *seed = jl_hrtime(); +} +STATIC_INLINE void unbias_cong(uint64_t max, uint64_t *unbias) +{ + *unbias = UINT64_MAX - ((UINT64_MAX % max)+1); +} +STATIC_INLINE uint64_t cong(uint64_t max, uint64_t unbias, uint64_t *seed) +{ + while ((*seed = 69069 * (*seed) + 362437) > unbias) + ; + return *seed % max; +} + // libuv stuff: JL_DLLEXPORT extern void *jl_dl_handle; JL_DLLEXPORT extern void *jl_RTLD_DEFAULT_handle; diff --git a/src/julia_threads.h b/src/julia_threads.h index d76ee0212b3d1..606536424d6cb 100644 --- a/src/julia_threads.h +++ b/src/julia_threads.h @@ -106,6 +106,11 @@ typedef struct _jl_tls_states_t { jl_jmp_buf base_ctx; // base context of stack jl_jmp_buf *safe_restore; int16_t tid; +#ifdef JULIA_ENABLE_PARTR + uint64_t rngseed; + struct _jl_task_t *curr_task; + struct _jl_taskq_t *sticky_taskq; +#endif size_t bt_size; // JL_MAX_BT_SIZE + 1 elements long uintptr_t *bt_data; diff --git a/src/locks.h b/src/locks.h index 21f41f9d72d07..557909b43bf02 100644 --- a/src/locks.h +++ b/src/locks.h @@ -100,6 +100,22 @@ static inline void jl_mutex_lock(jl_mutex_t *lock) jl_gc_enable_finalizers(ptls, 0); } +static inline int jl_mutex_trylock_nogc(jl_mutex_t *lock) +{ + unsigned long self = jl_thread_self(); + unsigned long owner = jl_atomic_load_acquire(&lock->owner); + if (owner == self) { + lock->count++; + return 1; + } + if (owner == 0 && + jl_atomic_compare_exchange(&lock->owner, 0, self) == 0) { + lock->count = 1; + return 1; + } + return 0; +} + /* Call this function for code that could be called from either a managed or an unmanaged thread */ static inline void jl_mutex_lock_maybe_nogc(jl_mutex_t *lock) diff --git a/src/options.h b/src/options.h index aea15e51fd70e..a316e7082c60b 100644 --- a/src/options.h +++ b/src/options.h @@ -124,6 +124,10 @@ #define MACHINE_EXCLUSIVE_NAME "JULIA_EXCLUSIVE" #define DEFAULT_MACHINE_EXCLUSIVE 0 +// threading infrastructure selection +#ifndef JULIA_ENABLE_PARTR +#define JULIA_ENABLE_FORKJOIN_TI 1 +#endif // sanitizer defaults --------------------------------------------------------- diff --git a/src/partr.c b/src/partr.c new file mode 100644 index 0000000000000..243534ca35e0a --- /dev/null +++ b/src/partr.c @@ -0,0 +1,854 @@ +// This file is a part of Julia. License is MIT: https://julialang.org/license + +// TODO: +// - coroutine integration. partr uses: +// - ctx_construct(): establish the context for a coroutine, with an entry +// point (partr_coro()), a stack, and a user data pointer (which is the +// task pointer). +// - ctx_get_user_ptr(): get the user data pointer (the task pointer). +// - ctx_is_done(): has the coroutine ended? +// - resume(): starts/resumes the coroutine specified by the passed context. +// - yield()/yield_value(): causes the calling coroutine to yield back to +// where it was resume()d. +// - stack management. pool of stacks to be implemented. +// - original task functionality to be integrated. +// - world_age should go into task local storage? +// - current_module is being obsoleted? + +#include +#include +#include +#include + +#include "julia.h" +#include "julia_internal.h" +#include "threading.h" + +#ifdef __cplusplus +extern "C" { +#endif + +#ifdef JULIA_ENABLE_THREADING +#ifdef JULIA_ENABLE_PARTR + + +// multiq +// --- + +/* a task heap */ +typedef struct taskheap_tag { + jl_mutex_t lock; + jl_task_t **tasks; + int16_t ntasks, prio; +} taskheap_t; + +static const int16_t heap_d = 8; +static const int heap_c = 4; +static const int tasks_per_heap = 129; + +static taskheap_t *heaps; +static int16_t heap_p; + +/* unbias state for the RNG */ +static uint64_t cong_unbias; + + +/* multiq_init() + */ +static inline void multiq_init() +{ + heap_p = heap_c * jl_n_threads; + heaps = (taskheap_t *)calloc(heap_p, sizeof(taskheap_t)); + for (int16_t i = 0; i < heap_p; ++i) { + jl_mutex_init(&heaps[i].lock); + heaps[i].tasks = (jl_task_t **)calloc(tasks_per_heap, sizeof(jl_task_t *)); + heaps[i].ntasks = 0; + heaps[i].prio = INT16_MAX; + } + unbias_cong(heap_p, &cong_unbias); +} + + +/* sift_up() + */ +static inline void sift_up(taskheap_t *heap, int16_t idx) +{ + if (idx > 0) { + int16_t parent = (idx-1)/heap_d; + if (heap->tasks[idx]->prio <= heap->tasks[parent]->prio) { + jl_task_t *t = heap->tasks[parent]; + heap->tasks[parent] = heap->tasks[idx]; + heap->tasks[idx] = t; + sift_up(heap, parent); + } + } +} + + +/* sift_down() + */ +static inline void sift_down(taskheap_t *heap, int16_t idx) +{ + if (idx < heap->ntasks) { + for (int16_t child = heap_d*idx + 1; + child < tasks_per_heap && child <= heap_d*idx + heap_d; + ++child) { + if (heap->tasks[child] + && heap->tasks[child]->prio <= heap->tasks[idx]->prio) { + jl_task_t *t = heap->tasks[idx]; + heap->tasks[idx] = heap->tasks[child]; + heap->tasks[child] = t; + sift_down(heap, child); + } + } + } +} + + +/* multiq_insert() + */ +static inline int multiq_insert(jl_task_t *task, int16_t priority) +{ + jl_ptls_t ptls = jl_get_ptls_states(); + uint64_t rn; + + task->prio = priority; + do { + rn = cong(heap_p, cong_unbias, &ptls->rngseed); + } while (!jl_mutex_trylock_nogc(&heaps[rn].lock)); + + if (heaps[rn].ntasks >= tasks_per_heap) { + jl_mutex_unlock_nogc(&heaps[rn].lock); + return -1; + } + + heaps[rn].tasks[heaps[rn].ntasks++] = task; + sift_up(&heaps[rn], heaps[rn].ntasks-1); + jl_mutex_unlock_nogc(&heaps[rn].lock); + int16_t prio = jl_atomic_load(&heaps[rn].prio); + if (task->prio < prio) + jl_atomic_compare_exchange(&heaps[rn].prio, prio, task->prio); + + return 0; +} + + +/* multiq_deletemin() + */ +static inline jl_task_t *multiq_deletemin() +{ + jl_ptls_t ptls = jl_get_ptls_states(); + uint64_t rn1, rn2; + int16_t i, prio1, prio2; + jl_task_t *task; + + for (i = 0; i < jl_n_threads; ++i) { + rn1 = cong(heap_p, cong_unbias, &ptls->rngseed); + rn2 = cong(heap_p, cong_unbias, &ptls->rngseed); + prio1 = jl_atomic_load(&heaps[rn1].prio); + prio2 = jl_atomic_load(&heaps[rn2].prio); + if (prio1 > prio2) { + prio1 = prio2; + rn1 = rn2; + } + else if (prio1 == prio2 && prio1 == INT16_MAX) + continue; + if (jl_mutex_trylock_nogc(&heaps[rn1].lock)) { + if (prio1 == heaps[rn1].prio) + break; + jl_mutex_unlock_nogc(&heaps[rn1].lock); + } + } + if (i == jl_n_threads) + return NULL; + + task = heaps[rn1].tasks[0]; + heaps[rn1].tasks[0] = heaps[rn1].tasks[--heaps[rn1].ntasks]; + heaps[rn1].tasks[heaps[rn1].ntasks] = NULL; + prio1 = INT16_MAX; + if (heaps[rn1].ntasks > 0) { + sift_down(&heaps[rn1], 0); + prio1 = heaps[rn1].tasks[0]->prio; + } + jl_atomic_store(&heaps[rn1].prio, prio1); + jl_mutex_unlock_nogc(&heaps[rn1].lock); + + return task; +} + + +// sync trees +// --- + +/* arrival tree */ +struct _arriver_t { + int16_t index, next_avail; + int16_t **tree; +}; + +/* reduction tree */ +struct _reducer_t { + int16_t index, next_avail; + void ***tree; +}; + + +/* pool of arrival trees */ +static arriver_t *arriverpool; +static int16_t num_arrivers, num_arriver_tree_nodes, next_arriver; + +/* pool of reduction trees */ +static reducer_t *reducerpool; +static int16_t num_reducers, num_reducer_tree_nodes, next_reducer; + + +/* synctreepool_init() + */ +static inline void synctreepool_init() +{ + num_arriver_tree_nodes = (GRAIN_K * jl_n_threads) - 1; + num_reducer_tree_nodes = (2 * GRAIN_K * jl_n_threads) - 1; + + /* num_arrivers = ((GRAIN_K * jl_n_threads) ^ ARRIVERS_P) + 1 */ + num_arrivers = GRAIN_K * jl_n_threads; + for (int i = 1; i < ARRIVERS_P; ++i) + num_arrivers = num_arrivers * num_arrivers; + ++num_arrivers; + + num_reducers = num_arrivers * REDUCERS_FRAC; + + /* allocate */ + arriverpool = (arriver_t *)calloc(num_arrivers, sizeof (arriver_t)); + next_arriver = 0; + for (int i = 0; i < num_arrivers; ++i) { + arriverpool[i].index = i; + arriverpool[i].next_avail = i + 1; + arriverpool[i].tree = (int16_t **) + jl_malloc_aligned(num_arriver_tree_nodes * sizeof (int16_t *), 64); + for (int j = 0; j < num_arriver_tree_nodes; ++j) + arriverpool[i].tree[j] = (int16_t *)jl_malloc_aligned(sizeof (int16_t), 64); + } + arriverpool[num_arrivers - 1].next_avail = -1; + + reducerpool = (reducer_t *)calloc(num_reducers, sizeof (reducer_t)); + next_reducer = 0; + for (int i = 0; i < num_reducers; ++i) { + reducerpool[i].index = i; + reducerpool[i].next_avail = i + 1; + reducerpool[i].tree = (void ***) + jl_malloc_aligned(num_reducer_tree_nodes * sizeof (void **), 64); + for (int j = 0; j < num_reducer_tree_nodes; ++j) + reducerpool[i].tree[j] = (void **)jl_malloc_aligned(sizeof (void *), 64); + } + if (num_reducers > 0) + reducerpool[num_reducers - 1].next_avail = -1; + else + next_reducer = -1; +} + + +/* arriver_alloc() + */ +static inline arriver_t *arriver_alloc() +{ + int16_t candidate; + arriver_t *arr; + + do { + candidate = jl_atomic_load(&next_arriver); + if (candidate == -1) + return NULL; + arr = &arriverpool[candidate]; + } while (!jl_atomic_bool_compare_exchange(&next_arriver, + candidate, arr->next_avail)); + return arr; +} + + +/* arriver_free() + */ +static inline void arriver_free(arriver_t *arr) +{ + for (int i = 0; i < num_arriver_tree_nodes; ++i) + *arr->tree[i] = 0; + + jl_atomic_exchange_generic(&next_arriver, &arr->index, &arr->next_avail); +} + + +/* reducer_alloc() + */ +static inline reducer_t *reducer_alloc() +{ + int16_t candidate; + reducer_t *red; + + do { + candidate = jl_atomic_load(&next_reducer); + if (candidate == -1) + return NULL; + red = &reducerpool[candidate]; + } while (!jl_atomic_bool_compare_exchange(&next_reducer, + candidate, red->next_avail)); + return red; +} + + +/* reducer_free() + */ +static inline void reducer_free(reducer_t *red) +{ + for (int i = 0; i < num_reducer_tree_nodes; ++i) + *red->tree[i] = 0; + + jl_atomic_exchange_generic(&next_reducer, &red->index, &red->next_avail); +} + + +/* last_arriver() + */ +static inline int last_arriver(arriver_t *arr, int idx) +{ + int arrived, aidx = idx + (GRAIN_K * jl_n_threads) - 1; + + while (aidx > 0) { + --aidx; + aidx >>= 1; + arrived = jl_atomic_fetch_add(arr->tree[aidx], 1); + if (!arrived) return 0; + } + + return 1; +} + + +/* reduce() + */ +static inline void *reduce(arriver_t *arr, reducer_t *red, void *(*rf)(void *, void *), + void *val, int idx) +{ + int arrived, aidx = idx + (GRAIN_K * jl_n_threads) - 1, ridx = aidx, nidx; + + *red->tree[ridx] = val; + while (aidx > 0) { + --aidx; + aidx >>= 1; + arrived = jl_atomic_fetch_add(arr->tree[aidx], 1); + if (!arrived) return NULL; + + /* neighbor has already arrived, get its value and reduce it */ + nidx = ridx & 0x1 ? ridx + 1 : ridx - 1; + val = rf(val, *red->tree[nidx]); + + /* move up the tree */ + --ridx; + ridx >>= 1; + *red->tree[ridx] = val; + } + + return val; +} + + +// parallel task runtime +// --- + +// sticky task queues need to be visible to all threads +jl_taskq_t *sticky_taskqs; + +// internally used to indicate a yield occurred in the runtime itself +// TODO: what's the Julia way to do this? A symbol? +static const int64_t yield_from_sync = 1; + + +// initialize the threading infrastructure +void jl_init_threadinginfra(void) +{ + /* initialize the synchronization trees pool and the multiqueue */ + synctreepool_init(); + multiq_init(); + + /* allocate sticky task queues */ + sticky_taskqs = (jl_taskq_t *)jl_malloc_aligned(jl_n_threads * sizeof(jl_taskq_t), 64); +} + + +// initialize the thread function argument +void jl_init_threadarg(jl_threadarg_t *targ) { } + + +// helper for final thread initialization +static void init_started_thread() +{ + jl_ptls_t ptls = jl_get_ptls_states(); + + /* allocate this thread's sticky task queue pointer and initialize the lock */ + seed_cong(&ptls->rngseed); + ptls->sticky_taskq = &sticky_taskqs[ptls->tid]; + ptls->sticky_taskq->head = NULL; +} + + +// once the threads are started, perform any final initializations +void jl_init_started_threads(jl_threadarg_t **targs) +{ + // master thread final initialization + init_started_thread(); +} + + +static int run_next(); + + +// thread function: used by all except the main thread +void jl_threadfun(void *arg) +{ + jl_threadarg_t *targ = (jl_threadarg_t *)arg; + + // initialize this thread (set tid, create heap, etc.) + jl_init_threadtls(targ->tid); + jl_init_stack_limits(0); + + jl_ptls_t ptls = jl_get_ptls_states(); + + // set up tasking + jl_init_root_task(ptls->stack_lo, ptls->stack_hi - ptls->stack_lo); +#ifdef COPY_STACKS + jl_set_base_ctx((char*)&arg); +#endif + + init_started_thread(); + + // Assuming the functions called below don't contain unprotected GC + // critical region. In general, the following part of this function + // shouldn't call any managed code without calling `jl_gc_unsafe_enter` + // first. + jl_gc_state_set(ptls, JL_GC_STATE_SAFE, 0); + uv_barrier_wait(targ->barrier); + + // free the thread argument here + free(targ); + + /* get the highest priority task and run it */ + while (run_next() == 0) + ; +} + + +// coroutine entry point +static void partr_coro(void *ctx) +{ + jl_task_t *task = (jl_task_t *)ctx; // TODO. ctx_get_user_ptr(ctx); + //task->result = task->f(task->arg, task->start, task->end); + + /* grain tasks must synchronize */ + if (task->grain_num >= 0) { + int was_last = 0; + + /* reduce... */ + if (task->red) { + // TODO. task->result = reduce(task->arr, task->red, task->rf, + // task->result, task->grain_num); + /* if this task is last, set the result in the parent task */ + if (task->result) { + task->parent->red_result = task->result; + was_last = 1; + } + } + /* ... or just sync */ + else { + if (last_arriver(task->arr, task->grain_num)) + was_last = 1; + } + + /* the last task to finish needs to finish up the loop */ + if (was_last) { + /* a non-parent task must wake up the parent */ + if (task->grain_num > 0) { + multiq_insert(task->parent, 0); + } + /* the parent task was last; it can just end */ + // TODO: free arriver/reducer when task completes + } + else { + /* the parent task needs to wait */ + if (task->grain_num == 0) { + // TODO. yield_value(task->ctx, (void *)yield_from_sync); + } + } + } +} + + +// add the specified task to the sticky task queue +static void add_to_stickyq(jl_task_t *task) +{ + assert(task->sticky_tid != -1); + + jl_taskq_t *q = &sticky_taskqs[task->sticky_tid]; + JL_LOCK(&q->lock); + if (q->head == NULL) + q->head = task; + else { + jl_task_t *pt = q->head; + while (pt->next) + pt = pt->next; + pt->next = task; + } + JL_UNLOCK(&q->lock); +} + + +// pop the first task off the sticky task queue +static jl_task_t *get_from_stickyq() +{ + jl_ptls_t ptls = jl_get_ptls_states(); + jl_taskq_t *q = ptls->sticky_taskq; + + /* racy check for quick path */ + if (q->head == NULL) + return NULL; + + JL_LOCK(&q->lock); + jl_task_t *task = q->head; + if (task) { + q->head = task->next; + task->next = NULL; + } + JL_UNLOCK(&q->lock); + + return task; +} + + +// start the task if it is new, or switch to it +static jl_value_t *resume(jl_task_t *task) +{ + jl_ptls_t ptls = jl_get_ptls_states(); + + // GC safe + uint32_t nargs; + jl_value_t **args; + if (!jl_is_svec(task->args)) { + nargs = 1; + args = &task->args; + } + else { + nargs = jl_svec_len(task->args); + args = jl_svec_data(task->args); + } + + // TODO: before we support getting return value from + // the work, and after we have proper GC transition + // support in the codegen and runtime we don't need to + // enter GC unsafe region when starting the work. + int8_t gc_state = jl_gc_unsafe_enter(ptls); + + jl_value_t *result = NULL; + if (!task->started) { + task->started = 1; + result = task->result = jl_thread_run_fun(&task->fptr, task->mfunc, args, nargs); + } + else { + // TODO: switch to task + } + + jl_gc_unsafe_leave(ptls, gc_state); + + return result; +} + + +// get the next available task and run it +static int run_next() +{ + jl_ptls_t ptls = jl_get_ptls_states(); + + /* first check for sticky tasks */ + jl_task_t *task = get_from_stickyq(); + + /* no sticky tasks, go to the multiq */ + if (task == NULL) { + task = multiq_deletemin(); + if (task == NULL) + return 0; + + /* a sticky task will only come out of the multiq if it has not been run */ + if (task->settings & TASK_IS_STICKY) { + assert(task->sticky_tid == -1); + task->sticky_tid = ptls->tid; + } + } + + /* run/resume the task */ + ptls->curr_task = task; + task->curr_tid = ptls->tid; + + // TODO + int64_t y = 0; + // int64_t y = (int64_t)resume(task->ctx); + task->curr_tid = -1; + ptls->curr_task = NULL; + + /* if the task isn't done, it is either in a CQ, or must be re-queued */ + if (0 /* TODO. !ctx_is_done(task->ctx) */) { + /* the yield value tells us if the task is in a CQ */ + if (y != yield_from_sync) { + /* sticky tasks go to the thread's sticky queue */ + if (task->settings & TASK_IS_STICKY) + add_to_stickyq(task); + /* all others go back into the multiq */ + else + multiq_insert(task, task->prio); + } + return 0; + } + + /* The task completed. Detached tasks cannot be synced, so nothing will + be in their CQs. + */ + if (task->settings & TASK_IS_DETACHED) + return 0; + + /* add back all the tasks in this one's completion queue */ + JL_LOCK(&task->cq.lock); + jl_task_t *cqtask = task->cq.head; + task->cq.head = NULL; + JL_UNLOCK(&task->cq.lock); + + jl_task_t *cqnext; + while (cqtask) { + cqnext = cqtask->next; + cqtask->next = NULL; + if (cqtask->settings & TASK_IS_STICKY) + add_to_stickyq(cqtask); + else + multiq_insert(cqtask, cqtask->prio); + cqtask = cqnext; + } + + return 0; +} + + +// specialize and compile the user function +static int setup_task_fun(jl_value_t *_args, jl_method_instance_t **mfunc, + jl_generic_fptr_t *fptr) +{ + jl_ptls_t ptls = jl_get_ptls_states(); + + uint32_t nargs; + jl_value_t **args; + if (!jl_is_svec(_args)) { + nargs = 1; + args = &_args; + } + else { + nargs = jl_svec_len(_args); + args = jl_svec_data(_args); + } + + *mfunc = jl_lookup_generic(args, nargs, + jl_int32hash_fast(jl_return_address()), ptls->world_age); + + // Ignore constant return value for now. + if (jl_compile_method_internal(fptr, *mfunc)) + return 0; + + return 1; +} + + +// allocate and initialize a task +static jl_task_t *new_task(jl_value_t *_args) +{ + jl_ptls_t ptls = jl_get_ptls_states(); + + // TODO: using jl_task_type below, assuming task and task will be merged + jl_task_t *task = (jl_task_t *)jl_gc_alloc(ptls, sizeof (jl_task_t), + jl_task_type); + // TODO. ctx_construct(task->ctx, task->stack, TASK_STACK_SIZE, partr_coro, task); + if (!setup_task_fun(_args, &task->mfunc, &task->fptr)) + return NULL; + task->args = _args; + task->result = jl_nothing; + task->current_module = ptls->current_module; + task->world_age = ptls->world_age; + task->sticky_tid = -1; + task->grain_num = -1; + + return task; +} + + +// allocate a task and copy the specified task's contents into it +static jl_task_t *copy_task(jl_task_t *ft) +{ + jl_ptls_t ptls = jl_get_ptls_states(); + + // TODO: using jl_task_type below, assuming task and task will be merged + jl_task_t *task = (jl_task_t *)jl_gc_alloc(ptls, sizeof (jl_task_t), + jl_task_type); + memcpy(task, ft, sizeof (jl_task_t)); + return task; +} + + +/* partr_spawn() -- create a task for `f(arg)` and enqueue it for execution + + Implicitly asserts that `f(arg)` can run concurrently with everything + else that's currently running. If `detach` is set, the spawned task + will not be returned (and cannot be synced). Yields. + */ +int partr_spawn(partr_t *t, jl_value_t *_args, int8_t sticky, int8_t detach) +{ + jl_ptls_t ptls = jl_get_ptls_states(); + + jl_task_t *task = new_task(_args); + if (task == NULL) + return -1; + if (sticky) + task->settings |= TASK_IS_STICKY; + if (detach) + task->settings |= TASK_IS_DETACHED; + + if (multiq_insert(task, ptls->tid) != 0) { + return -2; + } + + *t = detach ? NULL : (partr_t)task; + + /* only yield if we're running a non-sticky task */ + if (!(ptls->curr_task->settings & TASK_IS_STICKY)) + // TODO. yield(ptls->curr_task->ctx); + ; + + return 0; +} + + +/* partr_sync() -- get the return value of task `t` + + Returns only when task `t` has completed. + */ +int partr_sync(void **r, partr_t t) +{ + jl_task_t *task = (jl_task_t *)t; + + jl_ptls_t ptls = jl_get_ptls_states(); + + /* if the target task has not finished, add the current task to its + completion queue; the thread that runs the target task will add + this task back to the ready queue + */ + if (0 /* TODO. !ctx_is_done(task->ctx) */) { + ptls->curr_task->next = NULL; + JL_LOCK(&task->cq.lock); + + /* ensure the task didn't finish before we got the lock */ + if (0 /* TODO. !ctx_is_done(task->ctx) */) { + /* add the current task to the CQ */ + if (task->cq.head == NULL) + task->cq.head = ptls->curr_task; + else { + jl_task_t *pt = task->cq.head; + while (pt->next) + pt = pt->next; + pt->next = ptls->curr_task; + } + + JL_UNLOCK(&task->cq.lock); + /* yield point */ + // TODO. yield_value(ptls->curr_task->ctx, (void *)yield_from_sync); + } + + /* the task finished before we could add to its CQ */ + else + JL_UNLOCK(&task->cq.lock); + } + + if (r) + *r = task->grain_num >= 0 && task->red ? + task->red_result : task->result; + return 0; +} + + +/* partr_parfor() -- spawn multiple tasks for a parallel loop + + Spawn tasks that invoke `f(arg, start, end)` such that the sum of `end-start` + for all tasks is `count`. Uses `rf()`, if provided, to reduce the return + values from the tasks, and returns the result. Yields. + */ +int partr_parfor(partr_t *t, jl_value_t *_args, int64_t count, jl_value_t *_rargs) +{ + jl_ptls_t ptls = jl_get_ptls_states(); + + int64_t n = GRAIN_K * jl_n_threads; + lldiv_t each = lldiv(count, n); + + /* allocate synchronization tree(s) */ + arriver_t *arr = arriver_alloc(); + if (arr == NULL) + return -1; + reducer_t *red = NULL; + jl_method_instance_t *mredfunc; + jl_generic_fptr_t rfptr; + if (_rargs != NULL) { + red = reducer_alloc(); + if (red == NULL) { + arriver_free(arr); + return -2; + } + if (!setup_task_fun(_rargs, &mredfunc, &rfptr)) { + reducer_free(red); + arriver_free(arr); + return -3; + } + } + + /* allocate and enqueue (GRAIN_K * nthreads) tasks */ + *t = NULL; + int64_t start = 0, end; + for (int64_t i = 0; i < n; ++i) { + end = start + each.quot + (i < each.rem ? 1 : 0); + jl_task_t *task; + if (*t == NULL) + *t = task = new_task(_args); + else + task = copy_task(*t); + if (task == NULL) + return -4; + + task->start = start; + task->end = end; + task->parent = *t; + task->grain_num = i; + task->mredfunc = mredfunc; + task->rfptr = rfptr; + task->rargs = _rargs; + task->arr = arr; + task->red = red; + + if (multiq_insert(task, ptls->tid) != 0) { + return -5; + } + + start = end; + } + + /* only yield if we're running a non-sticky task */ + if (!(ptls->curr_task->settings & TASK_IS_STICKY)) + // TODO. yield(curr_task->ctx); + ; + + return 0; +} + + +#endif // JULIA_ENABLE_PARTR +#endif // JULIA_ENABLE_THREADING + +#ifdef __cplusplus +} +#endif diff --git a/src/partr.h b/src/partr.h new file mode 100644 index 0000000000000..c9f302c9dae1a --- /dev/null +++ b/src/partr.h @@ -0,0 +1,53 @@ +// This file is a part of Julia. License is MIT: https://julialang.org/license + +/* partr -- parallel tasks runtime options + */ + +#ifndef PARTR_H +#define PARTR_H + +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +#ifdef JULIA_ENABLE_PARTR + +#include "julia.h" + + +/* multiq */ +#define MULTIQ_HEAP_C 4 + /* number of heaps = MULTIQ_HEAP_C * nthreads */ +#define MULTIQ_TASKS_PER_HEAP 129 + /* how many in each heap */ + +/* parfor */ +#define GRAIN_K 4 + /* tasks = niters / (GRAIN_K * nthreads) */ + +/* synchronization */ +#define ARRIVERS_P 2 + /* narrivers = ((GRAIN_K * nthreads) ^ ARRIVERS_P) + 1 + limit for number of recursive parfors */ +#define REDUCERS_FRAC 1 + /* nreducers = narrivers * REDUCERS_FRAC */ + + +/* interface */ +typedef void *partr_t; + +int partr_spawn(partr_t *t, jl_value_t *_args, int8_t sticky, int8_t detach); +int partr_sync(void **r, partr_t t); +int partr_parfor(partr_t *t, jl_value_t *_args, int64_t count, jl_value_t *_rargs); + +#endif /* JULIA_ENABLE_PARTR */ + +#ifdef __cplusplus +} +#endif + +#endif /* PARTR_H */ + diff --git a/src/task.c b/src/task.c index 742f325790a50..99d3e0bb0baa5 100644 --- a/src/task.c +++ b/src/task.c @@ -594,7 +594,11 @@ JL_DLLEXPORT jl_task_t *jl_new_task(jl_function_t *start, size_t ssize) t->ssize = ssize; t->current_module = NULL; t->parent = ptls->current_task; +#ifdef JULIA_ENABLE_PARTR + t->storage = jl_nothing; +#else t->tls = jl_nothing; +#endif t->consumers = jl_nothing; t->state = runnable_sym; t->start = start; @@ -606,7 +610,9 @@ JL_DLLEXPORT jl_task_t *jl_new_task(jl_function_t *start, size_t ssize) t->eh = NULL; t->gcstack = NULL; t->stkbuf = NULL; +#ifndef JULIA_ENABLE_PARTR t->tid = 0; +#endif t->started = 0; #ifdef ENABLE_TIMINGS t->timing_stack = NULL; @@ -714,7 +720,11 @@ void jl_init_root_task(void *stack, size_t ssize) ptls->current_task->started = 1; ptls->current_task->parent = ptls->current_task; ptls->current_task->current_module = ptls->current_module; +#ifdef JULIA_ENABLE_PARTR + ptls->current_task->storage = jl_nothing; +#else ptls->current_task->tls = jl_nothing; +#endif ptls->current_task->consumers = jl_nothing; ptls->current_task->state = runnable_sym; ptls->current_task->start = NULL; @@ -724,7 +734,9 @@ void jl_init_root_task(void *stack, size_t ssize) ptls->current_task->backtrace = jl_nothing; ptls->current_task->eh = NULL; ptls->current_task->gcstack = NULL; +#ifndef JULIA_ENABLE_PARTR ptls->current_task->tid = ptls->tid; +#endif #ifdef JULIA_ENABLE_THREADING arraylist_new(&ptls->current_task->locks, 0); #endif diff --git a/src/threadgroup.c b/src/threadgroup.c deleted file mode 100644 index f2158423acc0e..0000000000000 --- a/src/threadgroup.c +++ /dev/null @@ -1,206 +0,0 @@ -// This file is a part of Julia. License is MIT: https://julialang.org/license - -/* - threading infrastructure - . threadgroup abstraction - . fork/join/barrier -*/ - -#include -#include - -#include "julia.h" -#include "julia_internal.h" - -#ifdef __cplusplus -extern "C" { -#endif - -#include "options.h" -#include "threadgroup.h" - -int ti_threadgroup_create(uint8_t num_sockets, uint8_t num_cores, - uint8_t num_threads_per_core, - ti_threadgroup_t **newtg) -{ - int i; - ti_threadgroup_t *tg; - int num_threads = num_sockets * num_cores * num_threads_per_core; - char *cp; - - tg = (ti_threadgroup_t*)jl_malloc_aligned(sizeof(ti_threadgroup_t), 64); - tg->tid_map = (int16_t*)jl_malloc_aligned(num_threads * sizeof(int16_t), 64); - for (i = 0; i < num_threads; ++i) - tg->tid_map[i] = -1; - tg->num_sockets = num_sockets; - tg->num_cores = num_cores; - tg->num_threads_per_core = num_threads_per_core; - tg->num_threads = num_threads; - tg->added_threads = 0; - tg->thread_sense = (ti_thread_sense_t**) - jl_malloc_aligned(num_threads * sizeof(ti_thread_sense_t*), 64); - for (i = 0; i < num_threads; i++) - tg->thread_sense[i] = NULL; - jl_atomic_store_release(&tg->group_sense, 0); - - uv_mutex_init(&tg->alarm_lock); - uv_cond_init(&tg->alarm); - - tg->sleep_threshold = DEFAULT_THREAD_SLEEP_THRESHOLD; - cp = getenv(THREAD_SLEEP_THRESHOLD_NAME); - if (cp) { - if (!strncasecmp(cp, "infinite", 8)) - tg->sleep_threshold = 0; - else - tg->sleep_threshold = (uint64_t)strtol(cp, NULL, 10); - } - - *newtg = tg; - return 0; -} - -int ti_threadgroup_addthread(ti_threadgroup_t *tg, int16_t ext_tid, - int16_t *tgtid) -{ - if (ext_tid < 0 || ext_tid >= tg->num_threads) - return -1; - if (tg->tid_map[ext_tid] != -1) - return -2; - if (tg->added_threads == tg->num_threads) - return -3; - - tg->tid_map[ext_tid] = tg->added_threads++; - if (tgtid) *tgtid = tg->tid_map[ext_tid]; - - return 0; -} - -int ti_threadgroup_initthread(ti_threadgroup_t *tg, int16_t ext_tid) -{ - ti_thread_sense_t *ts; - - if (ext_tid < 0 || ext_tid >= tg->num_threads) - return -1; - if (tg->thread_sense[tg->tid_map[ext_tid]] != NULL) - return -2; - if (tg->num_threads == 0) - return -3; - - ts = (ti_thread_sense_t*)jl_malloc_aligned(sizeof(ti_thread_sense_t), 64); - ts->sense = 1; - tg->thread_sense[tg->tid_map[ext_tid]] = ts; - - return 0; -} - -int ti_threadgroup_member(ti_threadgroup_t *tg, int16_t ext_tid, int16_t *tgtid) -{ - if (ext_tid < 0 || ext_tid >= tg->num_threads) - return -1; - if (tg == NULL) { - if (tgtid) *tgtid = -1; - return -2; - } - if (tg->tid_map[ext_tid] == -1) { - if (tgtid) *tgtid = -1; - return -3; - } - if (tgtid) *tgtid = tg->tid_map[ext_tid]; - - return 0; -} - -int ti_threadgroup_size(ti_threadgroup_t *tg, int16_t *tgsize) -{ - *tgsize = tg->num_threads; - return 0; -} - -int ti_threadgroup_fork(ti_threadgroup_t *tg, int16_t ext_tid, void **bcast_val, int init) -{ - uint8_t *group_sense = &tg->group_sense; - int16_t tid = tg->tid_map[ext_tid]; - int thread_sense = tg->thread_sense[tid]->sense; - if (tid == 0) { - tg->envelope = bcast_val ? *bcast_val : NULL; - // synchronize `tg->envelope` and `tg->group_sense` - jl_atomic_store_release(group_sense, thread_sense); - - // if it's possible that threads are sleeping, signal them - if (tg->sleep_threshold) { - uv_mutex_lock(&tg->alarm_lock); - uv_cond_broadcast(&tg->alarm); - uv_mutex_unlock(&tg->alarm_lock); - } - } - else { - // spin up to threshold ns (count sheep), then sleep - uint64_t spin_ns; - uint64_t spin_start = 0; - // synchronize `tg->envelope` and `tg->group_sense` - while (jl_atomic_load_acquire(group_sense) != thread_sense) { - if (tg->sleep_threshold) { - if (!spin_start) { - // Lazily initialize spin_start since uv_hrtime is expensive - spin_start = uv_hrtime(); - continue; - } - spin_ns = uv_hrtime() - spin_start; - // In case uv_hrtime is not monotonic, we'll sleep earlier - if (init || spin_ns >= tg->sleep_threshold) { - uv_mutex_lock(&tg->alarm_lock); - if (jl_atomic_load_acquire(group_sense) != thread_sense) { - uv_cond_wait(&tg->alarm, &tg->alarm_lock); - } - uv_mutex_unlock(&tg->alarm_lock); - spin_start = 0; - init = 0; - continue; - } - } - jl_cpu_pause(); - } - if (bcast_val) - *bcast_val = tg->envelope; - } - - return 0; -} - -int ti_threadgroup_join(ti_threadgroup_t *tg, int16_t ext_tid) -{ - int *p_thread_sense = &tg->thread_sense[tg->tid_map[ext_tid]]->sense; - jl_atomic_store_release(p_thread_sense, !*p_thread_sense); - if (tg->tid_map[ext_tid] == 0) { - jl_ptls_t ptls = jl_get_ptls_states(); - int8_t group_sense = tg->group_sense; - for (int i = 1; i < tg->num_threads; ++i) { - while (jl_atomic_load_acquire(&tg->thread_sense[i]->sense) == group_sense) { - jl_gc_safepoint_(ptls); - jl_cpu_pause(); - } - } - } - - return 0; -} - -int ti_threadgroup_destroy(ti_threadgroup_t *tg) -{ - int i; - - uv_mutex_destroy(&tg->alarm_lock); - uv_cond_destroy(&tg->alarm); - - for (i = 0; i < tg->num_threads; i++) - jl_free_aligned(tg->thread_sense[i]); - jl_free_aligned(tg->thread_sense); - jl_free_aligned(tg->tid_map); - jl_free_aligned(tg); - - return 0; -} - -#ifdef __cplusplus -} -#endif diff --git a/src/threadgroup.h b/src/threadgroup.h deleted file mode 100644 index 82fc59785cd05..0000000000000 --- a/src/threadgroup.h +++ /dev/null @@ -1,44 +0,0 @@ -// This file is a part of Julia. License is MIT: https://julialang.org/license - -#ifndef JL_THREADGROUP_H -#define JL_THREADGROUP_H - -#include -#include "uv.h" - -// for the barrier -typedef struct { - int sense; -} ti_thread_sense_t; - -// thread group -typedef struct { - int16_t *tid_map, num_threads, added_threads; - uint8_t num_sockets, num_cores, num_threads_per_core; - - // fork/join/barrier - uint8_t group_sense; // Written only by master thread - ti_thread_sense_t **thread_sense; - void *envelope; - - // to let threads sleep - uv_mutex_t alarm_lock; - uv_cond_t alarm; - uint64_t sleep_threshold; -} ti_threadgroup_t; - -int ti_threadgroup_create(uint8_t num_sockets, uint8_t num_cores, - uint8_t num_threads_per_core, - ti_threadgroup_t **newtg); -int ti_threadgroup_addthread(ti_threadgroup_t *tg, int16_t ext_tid, - int16_t *tgtid); -int ti_threadgroup_initthread(ti_threadgroup_t *tg, int16_t ext_tid); -int ti_threadgroup_member(ti_threadgroup_t *tg, int16_t ext_tid, - int16_t *tgtid); -int ti_threadgroup_size(ti_threadgroup_t *tg, int16_t *tgsize); -int ti_threadgroup_fork(ti_threadgroup_t *tg, int16_t ext_tid, - void **bcast_val, int init); -int ti_threadgroup_join(ti_threadgroup_t *tg, int16_t ext_tid); -int ti_threadgroup_destroy(ti_threadgroup_t *tg); - -#endif /* THREADGROUP_H */ diff --git a/src/threading.c b/src/threading.c index 8dbe9066b9224..ef75ce5f242d9 100644 --- a/src/threading.c +++ b/src/threading.c @@ -1,18 +1,5 @@ // This file is a part of Julia. License is MIT: https://julialang.org/license -/* - threading infrastructure - . thread and threadgroup creation - . thread function - . invoke Julia function from multiple threads - -TODO: - . fix interface to properly support thread groups - . add queue per thread for tasks - . add reduction; reduce values returned from thread function - . make code generation thread-safe and remove the lock -*/ - #include #include #include @@ -47,7 +34,6 @@ extern "C" { #endif -#include "threadgroup.h" #include "threading.h" // The tls_states buffer: @@ -240,8 +226,7 @@ JL_DLLEXPORT JL_CONST_FUNC jl_ptls_t (jl_get_ptls_states)(void) } #endif -// thread ID -JL_DLLEXPORT int jl_n_threads; // # threads we're actually using +JL_DLLEXPORT int jl_n_threads; jl_ptls_t *jl_all_tls_states; // return calling thread's ID @@ -253,10 +238,19 @@ JL_DLLEXPORT int16_t jl_threadid(void) return ptls->tid; } -static void ti_initthread(int16_t tid) +void jl_init_threadtls(int16_t tid) { jl_ptls_t ptls = jl_get_ptls_states(); -#ifndef _OS_WINDOWS_ +#ifdef _OS_WINDOWS_ + if (tid == 0) { + if (!DuplicateHandle(GetCurrentProcess(), GetCurrentThread(), + GetCurrentProcess(), &hMainThread, 0, + FALSE, DUPLICATE_SAME_ACCESS)) { + jl_printf(JL_STDERR, "WARNING: failed to access handle to main thread\n"); + hMainThread = INVALID_HANDLE_VALUE; + } + } +#else ptls->system_id = pthread_self(); #endif assert(ptls->world_age == 0); @@ -288,21 +282,8 @@ static void ti_initthread(int16_t tid) jl_all_tls_states[tid] = ptls; } -static void ti_init_master_thread(void) -{ -#ifdef _OS_WINDOWS_ - if (!DuplicateHandle(GetCurrentProcess(), GetCurrentThread(), - GetCurrentProcess(), &hMainThread, 0, - FALSE, DUPLICATE_SAME_ACCESS)) { - jl_printf(JL_STDERR, "WARNING: failed to access handle to main thread\n"); - hMainThread = INVALID_HANDLE_VALUE; - } -#endif - ti_initthread(0); -} - // all threads call this function to run user code -static jl_value_t *ti_run_fun(const jl_generic_fptr_t *fptr, jl_method_instance_t *mfunc, +jl_value_t *jl_thread_run_fun(const jl_generic_fptr_t *fptr, jl_method_instance_t *mfunc, jl_value_t **args, uint32_t nargs) { jl_ptls_t ptls = jl_get_ptls_states(); @@ -329,125 +310,12 @@ static jl_value_t *ti_run_fun(const jl_generic_fptr_t *fptr, jl_method_instance_ return jl_nothing; } - // lock for code generation jl_mutex_t codegen_lock; jl_mutex_t typecache_lock; #ifdef JULIA_ENABLE_THREADING -// only one thread group for now -static ti_threadgroup_t *tgworld; - -// for broadcasting work to threads -static ti_threadwork_t threadwork; - -#if PROFILE_JL_THREADING -uint64_t prep_ns; -uint64_t *fork_ns; -uint64_t *user_ns; -uint64_t *join_ns; -#endif - -static uv_barrier_t thread_init_done; - -// thread function: used by all except the main thread -void ti_threadfun(void *arg) -{ - jl_ptls_t ptls = jl_get_ptls_states(); - ti_threadarg_t *ta = (ti_threadarg_t *)arg; - ti_threadgroup_t *tg; - ti_threadwork_t *work; - - // initialize this thread (set tid, create heap, etc.) - ti_initthread(ta->tid); - jl_init_stack_limits(0); - - // set up tasking - jl_init_root_task(ptls->stack_lo, ptls->stack_hi - ptls->stack_lo); -#ifdef COPY_STACKS - jl_set_base_ctx((char*)&arg); -#endif - - // set the thread-local tid and wait for a thread group - while (jl_atomic_load_acquire(&ta->state) == TI_THREAD_INIT) - jl_cpu_pause(); - - // Assuming the functions called below doesn't contain unprotected GC - // critical region. In general, the following part of this function - // shouldn't call any managed code without calling `jl_gc_unsafe_enter` - // first. - jl_gc_state_set(ptls, JL_GC_STATE_SAFE, 0); - uv_barrier_wait(&thread_init_done); - // initialize this thread in the thread group - tg = ta->tg; - ti_threadgroup_initthread(tg, ptls->tid); - - // free the thread argument here - free(ta); - - int init = 1; - - // work loop - for (; ;) { -#if PROFILE_JL_THREADING - uint64_t tstart = uv_hrtime(); -#endif - - ti_threadgroup_fork(tg, ptls->tid, (void **)&work, init); - init = 0; - -#if PROFILE_JL_THREADING - uint64_t tfork = uv_hrtime(); - fork_ns[ptls->tid] += tfork - tstart; -#endif - - if (work) { - if (work->command == TI_THREADWORK_DONE) { - break; - } - else if (work->command == TI_THREADWORK_RUN) { - // TODO: return value? reduction? - // TODO: before we support getting return value from - // the work, and after we have proper GC transition - // support in the codegen and runtime we don't need to - // enter GC unsafe region when starting the work. - int8_t gc_state = jl_gc_unsafe_enter(ptls); - // This is probably always NULL for now - jl_module_t *last_m = ptls->current_module; - size_t last_age = ptls->world_age; - JL_GC_PUSH1(&last_m); - ptls->current_module = work->current_module; - ptls->world_age = work->world_age; - ti_run_fun(&work->fptr, work->mfunc, work->args, work->nargs); - ptls->current_module = last_m; - ptls->world_age = last_age; - JL_GC_POP(); - jl_gc_unsafe_leave(ptls, gc_state); - } - } - -#if PROFILE_JL_THREADING - uint64_t tuser = uv_hrtime(); - user_ns[ptls->tid] += tuser - tfork; -#endif - - ti_threadgroup_join(tg, ptls->tid); - -#if PROFILE_JL_THREADING - uint64_t tjoin = uv_hrtime(); - join_ns[ptls->tid] += tjoin - tuser; -#endif - - // TODO: - // nowait should skip the join, but confirm that fork is reentrant - } -} - -#if PROFILE_JL_THREADING -void ti_reset_timings(void); -#endif - ssize_t jl_tls_offset = -1; #ifdef JL_ELF_TLS_VARIANT @@ -558,9 +426,8 @@ void jl_init_threading(void) int max_threads = jl_cpu_cores(); jl_n_threads = JULIA_NUM_THREADS; cp = getenv(NUM_THREADS_NAME); - if (cp) { + if (cp) jl_n_threads = (uint64_t)strtol(cp, NULL, 10); - } if (jl_n_threads > max_threads) jl_n_threads = max_threads; if (jl_n_threads <= 0) @@ -568,25 +435,18 @@ void jl_init_threading(void) jl_all_tls_states = (jl_ptls_t*)malloc(jl_n_threads * sizeof(void*)); -#if PROFILE_JL_THREADING - // set up space for profiling information - fork_ns = (uint64_t*)jl_malloc_aligned(jl_n_threads * sizeof(uint64_t), 64); - user_ns = (uint64_t*)jl_malloc_aligned(jl_n_threads * sizeof(uint64_t), 64); - join_ns = (uint64_t*)jl_malloc_aligned(jl_n_threads * sizeof(uint64_t), 64); - ti_reset_timings(); -#endif - - // initialize this master thread (set tid, create heap, etc.) - ti_init_master_thread(); + // initialize this thread (set tid, create heap, etc.) + jl_init_threadtls(0); } +static uv_barrier_t thread_init_done; + void jl_start_threads(void) { - jl_ptls_t ptls = jl_get_ptls_states(); char *cp, mask[UV_CPU_SETSIZE]; int i, exclusive; uv_thread_t uvtid; - ti_threadarg_t **targs; + jl_threadarg_t **targs; // do we have exclusive use of the machine? default is no exclusive = DEFAULT_MACHINE_EXCLUSIVE; @@ -604,16 +464,20 @@ void jl_start_threads(void) uv_thread_setaffinity(&uvtid, mask, NULL, UV_CPU_SETSIZE); } + // initialize threading infrastructure + jl_init_threadinginfra(); + // create threads - targs = (ti_threadarg_t **)malloc((jl_n_threads - 1) * sizeof (ti_threadarg_t *)); + targs = (jl_threadarg_t **)malloc((jl_n_threads - 1) * sizeof (jl_threadarg_t *)); uv_barrier_init(&thread_init_done, jl_n_threads); for (i = 0; i < jl_n_threads - 1; ++i) { - targs[i] = (ti_threadarg_t *)malloc(sizeof (ti_threadarg_t)); - targs[i]->state = TI_THREAD_INIT; + targs[i] = (jl_threadarg_t *)malloc(sizeof (jl_threadarg_t)); targs[i]->tid = i + 1; - uv_thread_create(&uvtid, ti_threadfun, targs[i]); + targs[i]->barrier = &thread_init_done; + jl_init_threadarg(targs[i]); + uv_thread_create(&uvtid, jl_threadfun, targs[i]); if (exclusive) { memset(mask, 0, UV_CPU_SETSIZE); mask[i+1] = 1; @@ -622,17 +486,7 @@ void jl_start_threads(void) uv_thread_detach(&uvtid); } - // set up the world thread group - ti_threadgroup_create(1, jl_n_threads, 1, &tgworld); - for (i = 0; i < jl_n_threads; ++i) - ti_threadgroup_addthread(tgworld, i, NULL); - ti_threadgroup_initthread(tgworld, ptls->tid); - - // give the threads the world thread group; they will block waiting for fork - for (i = 0; i < jl_n_threads - 1; ++i) { - targs[i]->tg = tgworld; - jl_atomic_store_release(&targs[i]->state, TI_THREAD_WORK); - } + jl_init_started_threads(targs); uv_barrier_wait(&thread_init_done); @@ -640,153 +494,17 @@ void jl_start_threads(void) free(targs); } -// TODO: is this needed? where/when/how to call it? -void jl_shutdown_threading(void) -{ - jl_ptls_t ptls = jl_get_ptls_states(); - // stop the spinning threads by sending them a command - ti_threadwork_t *work = &threadwork; - - work->command = TI_THREADWORK_DONE; - ti_threadgroup_fork(tgworld, ptls->tid, (void **)&work, 0); - - sleep(1); - - // destroy the world thread group - ti_threadgroup_destroy(tgworld); - -#if PROFILE_JL_THREADING - jl_free_aligned(join_ns); - jl_free_aligned(user_ns); - jl_free_aligned(fork_ns); - fork_ns = user_ns = join_ns = NULL; -#endif -} - -// interface to user code: specialize and compile the user thread function -// and run it in all threads -JL_DLLEXPORT jl_value_t *jl_threading_run(jl_value_t *_args) -{ - jl_ptls_t ptls = jl_get_ptls_states(); - // GC safe -#if PROFILE_JL_THREADING - uint64_t tstart = uv_hrtime(); -#endif - uint32_t nargs; - jl_value_t **args; - if (!jl_is_svec(_args)) { - nargs = 1; - args = &_args; - } - else { - nargs = jl_svec_len(_args); - args = jl_svec_data(_args); - } - - int8_t gc_state = jl_gc_unsafe_enter(ptls); - - threadwork.command = TI_THREADWORK_RUN; - threadwork.mfunc = jl_lookup_generic(args, nargs, - jl_int32hash_fast(jl_return_address()), ptls->world_age); - // Ignore constant return value for now. - if (jl_compile_method_internal(&threadwork.fptr, threadwork.mfunc)) - return jl_nothing; - threadwork.args = args; - threadwork.nargs = nargs; - threadwork.ret = jl_nothing; - threadwork.current_module = ptls->current_module; - threadwork.world_age = ptls->world_age; - -#if PROFILE_JL_THREADING - uint64_t tcompile = uv_hrtime(); - prep_ns += (tcompile - tstart); -#endif - - // fork the world thread group - ti_threadwork_t *tw = &threadwork; - ti_threadgroup_fork(tgworld, ptls->tid, (void **)&tw, 0); - -#if PROFILE_JL_THREADING - uint64_t tfork = uv_hrtime(); - fork_ns[ptls->tid] += (tfork - tcompile); -#endif - - // this thread must do work too (TODO: reduction?) - tw->ret = ti_run_fun(&threadwork.fptr, threadwork.mfunc, args, nargs); - -#if PROFILE_JL_THREADING - uint64_t trun = uv_hrtime(); - user_ns[ptls->tid] += (trun - tfork); -#endif - - // wait for completion (TODO: nowait?) - ti_threadgroup_join(tgworld, ptls->tid); - -#if PROFILE_JL_THREADING - uint64_t tjoin = uv_hrtime(); - join_ns[ptls->tid] += (tjoin - trun); -#endif - - jl_gc_unsafe_leave(ptls, gc_state); - - return tw->ret; -} - -#if PROFILE_JL_THREADING - -void ti_reset_timings(void) -{ - int i; - prep_ns = 0; - for (i = 0; i < jl_n_threads; i++) - fork_ns[i] = user_ns[i] = join_ns[i] = 0; -} - -void ti_timings(uint64_t *times, uint64_t *min, uint64_t *max, uint64_t *avg) -{ - int i; - *min = UINT64_MAX; - *max = *avg = 0; - for (i = 0; i < jl_n_threads; i++) { - if (times[i] < *min) - *min = times[i]; - if (times[i] > *max) - *max = times[i]; - *avg += times[i]; - } - *avg /= jl_n_threads; -} - -#define NS_TO_SECS(t) ((t) / (double)1e9) - -JL_DLLEXPORT void jl_threading_profile(void) -{ - if (!fork_ns) return; - - printf("\nti profile:\n"); - printf("prep: %g (%" PRIu64 ")\n", NS_TO_SECS(prep_ns), prep_ns); - - uint64_t min, max, avg; - ti_timings(fork_ns, &min, &max, &avg); - printf("fork: %g (%g - %g)\n", NS_TO_SECS(min), NS_TO_SECS(max), - NS_TO_SECS(avg)); - ti_timings(user_ns, &min, &max, &avg); - printf("user: %g (%g - %g)\n", NS_TO_SECS(min), NS_TO_SECS(max), - NS_TO_SECS(avg)); - ti_timings(join_ns, &min, &max, &avg); - printf("join: %g (%g - %g)\n", NS_TO_SECS(min), NS_TO_SECS(max), - NS_TO_SECS(avg)); -} - -#else //!PROFILE_JL_THREADING +#else // !JULIA_ENABLE_THREADING -JL_DLLEXPORT void jl_threading_profile(void) +void jl_init_threading(void) { + static jl_ptls_t _jl_all_tls_states; + jl_all_tls_states = &_jl_all_tls_states; + jl_n_threads = 1; + jl_init_threadtls(0); } -#endif //!PROFILE_JL_THREADING - -#else // !JULIA_ENABLE_THREADING +void jl_start_threads(void) { } JL_DLLEXPORT jl_value_t *jl_threading_run(jl_value_t *_args) { @@ -806,19 +524,9 @@ JL_DLLEXPORT jl_value_t *jl_threading_run(jl_value_t *_args) jl_generic_fptr_t fptr; if (jl_compile_method_internal(&fptr, mfunc)) return jl_nothing; - return ti_run_fun(&fptr, mfunc, args, nargs); -} - -void jl_init_threading(void) -{ - static jl_ptls_t _jl_all_tls_states; - jl_all_tls_states = &_jl_all_tls_states; - jl_n_threads = 1; - ti_init_master_thread(); + return jl_thread_run_fun(&fptr, mfunc, args, nargs); } -void jl_start_threads(void) { } - #endif // !JULIA_ENABLE_THREADING // Make gc alignment available for threading diff --git a/src/threading.h b/src/threading.h index 17d0a1c9efbab..792baa70954b9 100644 --- a/src/threading.h +++ b/src/threading.h @@ -8,51 +8,39 @@ extern "C" { #endif -#include "threadgroup.h" #include "julia.h" -#define PROFILE_JL_THREADING 1 - -// thread ID -extern jl_ptls_t *jl_all_tls_states; -extern JL_DLLEXPORT int jl_n_threads; // # threads we're actually using - -// thread state -enum { - TI_THREAD_INIT, - TI_THREAD_WORK -}; - -// passed to thread function -typedef struct { - int16_t volatile state; - int16_t tid; - ti_threadgroup_t *tg; -} ti_threadarg_t; - -// commands to thread function -enum { - TI_THREADWORK_DONE, - TI_THREADWORK_RUN -}; - -// work command to thread function -typedef struct { - uint8_t command; - jl_method_instance_t *mfunc; - jl_generic_fptr_t fptr; - jl_value_t **args; - uint32_t nargs; - jl_value_t *ret; - jl_module_t *current_module; - size_t world_age; -} ti_threadwork_t; - -// thread function -void ti_threadfun(void *arg); - -// helpers for thread function -jl_value_t *ti_runthread(jl_function_t *f, jl_svec_t *args, size_t nargs); +extern jl_ptls_t *jl_all_tls_states; /* thread local storage */ +extern JL_DLLEXPORT int jl_n_threads; /* # threads we're actually using */ + +typedef struct _jl_threadarg_t { + int16_t tid; + uv_barrier_t *barrier; + void *arg; +} jl_threadarg_t; + +// each thread must initialize its TLS +void jl_init_threadtls(int16_t tid); + +// generic helper for a thread to run a function +jl_value_t *jl_thread_run_fun(const jl_generic_fptr_t *fptr, + jl_method_instance_t *mfunc, jl_value_t **args, + uint32_t nargs); + +// provided by a threading infrastructure +void jl_init_threadinginfra(void); +void jl_init_threadarg(jl_threadarg_t *targ); +void jl_init_started_threads(jl_threadarg_t **targs); +void jl_threadfun(void *arg); + +// interfaces defined by threading infrastructures +#ifdef JULIA_ENABLE_FORKJOIN_TI +#include "forkjoin-ti.h" +#else +#ifdef JULIA_ENABLE_PARTR +#include "partr.h" +#endif +#endif #ifdef __cplusplus }