diff --git a/base/options.jl b/base/options.jl index 9d08af940136f..63f73982b2e8e 100644 --- a/base/options.jl +++ b/base/options.jl @@ -1,6 +1,6 @@ # This file is a part of Julia. License is MIT: https://julialang.org/license -# NOTE: This type needs to be kept in sync with jl_options in src/julia.h +# NOTE: This type needs to be kept in sync with jl_options in src/jloptions.h struct JLOptions quiet::Int8 banner::Int8 @@ -9,7 +9,9 @@ struct JLOptions commands::Ptr{Ptr{UInt8}} # (e)eval, (E)print, (L)load image_file::Ptr{UInt8} cpu_target::Ptr{UInt8} - nthreads::Int32 + nthreadpools::Int16 + nthreads::Int16 + nthreads_per_pool::Ptr{Int16} nprocs::Int32 machine_file::Ptr{UInt8} project::Ptr{UInt8} diff --git a/base/partr.jl b/base/partr.jl index aee9a3388a100..a4cfcb60fe520 100644 --- a/base/partr.jl +++ b/base/partr.jl @@ -2,7 +2,7 @@ module Partr -using ..Threads: SpinLock, nthreads +using ..Threads: SpinLock, nthreads, threadid # a task minheap mutable struct taskheap @@ -16,12 +16,13 @@ end # multiqueue minheap state const heap_d = UInt32(8) -global heaps::Vector{taskheap} = Vector{taskheap}(undef, 0) -const heaps_lock = SpinLock() -global cong_unbias::UInt32 = typemax(UInt32) +const heaps = [Vector{taskheap}(undef, 0), Vector{taskheap}(undef, 0)] +const heaps_lock = [SpinLock(), SpinLock()] +const cong_unbias = [typemax(UInt32), typemax(UInt32)] -cong(max::UInt32, unbias::UInt32) = ccall(:jl_rand_ptls, UInt32, (UInt32, UInt32), max, unbias) + UInt32(1) +cong(max::UInt32, unbias::UInt32) = + ccall(:jl_rand_ptls, UInt32, (UInt32, UInt32), max, unbias) + UInt32(1) function unbias_cong(max::UInt32) return typemax(UInt32) - ((typemax(UInt32) % max) + UInt32(1)) @@ -60,30 +61,32 @@ function multiq_sift_down(heap::taskheap, idx::Int32) end -function multiq_size() +function multiq_size(tpid::Int8) + nt = UInt32(Threads._nthreads_in_pool(tpid)) + tp = tpid + 1 + tpheaps = heaps[tp] heap_c = UInt32(2) - heap_p = UInt32(length(heaps)) - nt = UInt32(nthreads()) + heap_p = UInt32(length(tpheaps)) if heap_c * nt <= heap_p return heap_p end - @lock heaps_lock begin - heap_p = UInt32(length(heaps)) - nt = UInt32(nthreads()) + @lock heaps_lock[tp] begin + heap_p = UInt32(length(tpheaps)) + nt = UInt32(Threads._nthreads_in_pool(tpid)) if heap_c * nt <= heap_p return heap_p end heap_p += heap_c * nt newheaps = Vector{taskheap}(undef, heap_p) - copyto!(newheaps, heaps) - for i = (1 + length(heaps)):heap_p + copyto!(newheaps, tpheaps) + for i = (1 + length(tpheaps)):heap_p newheaps[i] = taskheap() end - global heaps = newheaps - global cong_unbias = unbias_cong(heap_p) + heaps[tp] = newheaps + cong_unbias[tp] = unbias_cong(heap_p) end return heap_p @@ -91,15 +94,19 @@ end function multiq_insert(task::Task, priority::UInt16) + tpid = ccall(:jl_get_task_threadpoolid, Int8, (Any,), task) + heap_p = multiq_size(tpid) + tp = tpid + 1 + task.priority = priority - heap_p = multiq_size() - rn = cong(heap_p, cong_unbias) - while !trylock(heaps[rn].lock) - rn = cong(heap_p, cong_unbias) + rn = cong(heap_p, cong_unbias[tp]) + tpheaps = heaps[tp] + while !trylock(tpheaps[rn].lock) + rn = cong(heap_p, cong_unbias[tp]) end - heap = heaps[rn] + heap = tpheaps[rn] if heap.ntasks >= length(heap.tasks) resize!(heap.tasks, length(heap.tasks) * 2) end @@ -122,34 +129,37 @@ function multiq_deletemin() local rn1, rn2 local prio1, prio2 + tid = Threads.threadid() + tp = ccall(:jl_threadpoolid, Int8, (Int16,), tid-1) + 1 + tpheaps = heaps[tp] + @label retry GC.safepoint() - heap_p = UInt32(length(heaps)) + heap_p = UInt32(length(tpheaps)) for i = UInt32(0):heap_p if i == heap_p return nothing end - rn1 = cong(heap_p, cong_unbias) - rn2 = cong(heap_p, cong_unbias) - prio1 = heaps[rn1].priority - prio2 = heaps[rn2].priority + rn1 = cong(heap_p, cong_unbias[tp]) + rn2 = cong(heap_p, cong_unbias[tp]) + prio1 = tpheaps[rn1].priority + prio2 = tpheaps[rn2].priority if prio1 > prio2 prio1 = prio2 rn1 = rn2 elseif prio1 == prio2 && prio1 == typemax(UInt16) continue end - if trylock(heaps[rn1].lock) - if prio1 == heaps[rn1].priority + if trylock(tpheaps[rn1].lock) + if prio1 == tpheaps[rn1].priority break end - unlock(heaps[rn1].lock) + unlock(tpheaps[rn1].lock) end end - heap = heaps[rn1] + heap = tpheaps[rn1] task = heap.tasks[1] - tid = Threads.threadid() if ccall(:jl_set_task_tid, Cint, (Any, Cint), task, tid-1) == 0 unlock(heap.lock) @goto retry @@ -171,9 +181,11 @@ end function multiq_check_empty() - for i = UInt32(1):length(heaps) - if heaps[i].ntasks != 0 - return false + for j = UInt32(1):length(heaps) + for i = UInt32(1):length(heaps[j]) + if heaps[j][i].ntasks != 0 + return false + end end end return true diff --git a/base/task.jl b/base/task.jl index 6d5ce6c39eb20..6b7f5747f1274 100644 --- a/base/task.jl +++ b/base/task.jl @@ -251,6 +251,10 @@ true istaskfailed(t::Task) = (load_state_acquire(t) === task_state_failed) Threads.threadid(t::Task) = Int(ccall(:jl_get_task_tid, Int16, (Any,), t)+1) +function Threads.threadpool(t::Task) + tpid = ccall(:jl_get_task_threadpoolid, Int8, (Any,), t) + return tpid == 0 ? :default : :interactive +end task_result(t::Task) = t.result diff --git a/base/threadcall.jl b/base/threadcall.jl index f0e5f336ec0ca..45965fdbc6c65 100644 --- a/base/threadcall.jl +++ b/base/threadcall.jl @@ -9,7 +9,7 @@ const threadcall_restrictor = Semaphore(max_ccall_threads) The `@threadcall` macro is called in the same way as [`ccall`](@ref) but does the work in a different thread. This is useful when you want to call a blocking C -function without causing the main `julia` thread to become blocked. Concurrency +function without causing the current `julia` thread to become blocked. Concurrency is limited by size of the libuv thread pool, which defaults to 4 threads but can be increased by setting the `UV_THREADPOOL_SIZE` environment variable and restarting the `julia` process. diff --git a/base/threadingconstructs.jl b/base/threadingconstructs.jl index 05d03cadf52fa..2f0d40f3d980e 100644 --- a/base/threadingconstructs.jl +++ b/base/threadingconstructs.jl @@ -1,26 +1,62 @@ # This file is a part of Julia. License is MIT: https://julialang.org/license -export threadid, nthreads, @threads, @spawn +export threadid, nthreads, @threads, @spawn, + threadpool, nthreadpools """ - Threads.threadid() + Threads.threadid() -> Int -Get the ID number of the current thread of execution. The master thread has ID `1`. +Get the ID number of the current thread of execution. The master thread has +ID `1`. """ threadid() = Int(ccall(:jl_threadid, Int16, ())+1) -# Inclusive upper bound on threadid() """ - Threads.nthreads() + Threads.nthreads([:default|:interactive]) -> Int -Get the number of threads available to the Julia process. This is the inclusive upper bound -on [`threadid()`](@ref). +Get the number of threads (across all thread pools or within the specified +thread pool) available to Julia. The number of threads across all thread +pools is the inclusive upper bound on [`threadid()`](@ref). See also: `BLAS.get_num_threads` and `BLAS.set_num_threads` in the [`LinearAlgebra`](@ref man-linalg) standard library, and `nprocs()` in the [`Distributed`](@ref man-distributed) standard library. """ +function nthreads end + nthreads() = Int(unsafe_load(cglobal(:jl_n_threads, Cint))) +function nthreads(pool::Symbol) + if pool == :default + tpid = Int8(0) + elseif pool == :interactive + tpid = Int8(1) + else + error("invalid threadpool specified") + end + return _nthreads_in_pool(tpid) +end +function _nthreads_in_pool(tpid::Int8) + p = unsafe_load(cglobal(:jl_n_threads_per_pool, Ptr{Cint})) + return Int(unsafe_load(p, tpid + 1)) +end + +""" + Threads.threadpool(tid = threadid()) -> Symbol + +Returns the specified thread's threadpool; either `:default` or `:interactive`. +""" +function threadpool(tid = threadid()) + tpid = ccall(:jl_threadpoolid, Int8, (Int16,), tid-1) + return tpid == 0 ? :default : :interactive +end + +""" + Threads.nthreadpools() -> Int + +Returns the number of threadpools currently configured. +""" +nthreadpools() = Int(unsafe_load(cglobal(:jl_n_threadpools, Cint))) + function threading_run(fun, static) ccall(:jl_enter_threaded_region, Cvoid, ()) @@ -48,7 +84,7 @@ function _threadsfor(iter, lbody, schedule) quote local threadsfor_fun let range = $(esc(range)) - function threadsfor_fun(tid=1; onethread=false) + function threadsfor_fun(tid = 1; onethread = false) r = range # Load into local variable lenr = length(r) # divide loop iterations among threads @@ -232,35 +268,63 @@ macro threads(args...) end """ - Threads.@spawn expr + Threads.@spawn [:default|:interactive] expr -Create a [`Task`](@ref) and [`schedule`](@ref) it to run on any available thread. -The task is allocated to a thread after it becomes available. To wait for the task -to finish, call [`wait`](@ref) on the result of this macro, or call [`fetch`](@ref) to -wait and then obtain its return value. +Create a [`Task`](@ref) and [`schedule`](@ref) it to run on any available +thread in the specified threadpool (`:default` if unspecified). The task is +allocated to a thread once one becomes available. To wait for the task to +finish, call [`wait`](@ref) on the result of this macro, or call +[`fetch`](@ref) to wait and then obtain its return value. -Values can be interpolated into `@spawn` via `\$`, which copies the value directly into the -constructed underlying closure. This allows you to insert the _value_ of a variable, -isolating the asynchronous code from changes to the variable's value in the current task. +Values can be interpolated into `@spawn` via `\$`, which copies the value +directly into the constructed underlying closure. This allows you to insert +the _value_ of a variable, isolating the asynchronous code from changes to +the variable's value in the current task. !!! note - See the manual chapter on threading for important caveats. + See the manual chapter on [multi-threading](@ref man-multithreading) + for important caveats. See also the chapter on [threadpools](@ref man-threadpools). !!! compat "Julia 1.3" This macro is available as of Julia 1.3. !!! compat "Julia 1.4" Interpolating values via `\$` is available as of Julia 1.4. + +!!! compat "Julia 1.9" + A threadpool may be specified as of Julia 1.9. """ -macro spawn(expr) - letargs = Base._lift_one_interp!(expr) +macro spawn(args...) + tpid = Int8(0) + na = length(args) + if na == 2 + ttype, ex = args + if ttype isa QuoteNode + ttype = ttype.value + elseif ttype isa Symbol + # TODO: allow unquoted symbols + ttype = nothing + end + if ttype === :interactive + tpid = Int8(1) + elseif ttype !== :default + throw(ArgumentError("unsupported threadpool in @spawn: $ttype")) + end + elseif na == 1 + ex = args[1] + else + throw(ArgumentError("wrong number of arguments in @spawn")) + end + + letargs = Base._lift_one_interp!(ex) - thunk = esc(:(()->($expr))) + thunk = esc(:(()->($ex))) var = esc(Base.sync_varname) quote let $(letargs...) local task = Task($thunk) task.sticky = false + ccall(:jl_set_task_threadpoolid, Cint, (Any, Int8), task, $tpid) if $(Expr(:islocal, var)) put!($var, task) end diff --git a/doc/src/base/multi-threading.md b/doc/src/base/multi-threading.md index 6760d3f25f5d4..293857c1c6c65 100644 --- a/doc/src/base/multi-threading.md +++ b/doc/src/base/multi-threading.md @@ -6,6 +6,8 @@ Base.Threads.foreach Base.Threads.@spawn Base.Threads.threadid Base.Threads.nthreads +Base.Threads.threadpool +Base.Threads.nthreadpools ``` See also [Multi-Threading](@ref man-multithreading). @@ -49,7 +51,7 @@ Base.Threads.atomic_min! Base.Threads.atomic_fence ``` -## ccall using a threadpool (Experimental) +## ccall using a libuv threadpool (Experimental) ```@docs Base.@threadcall diff --git a/doc/src/manual/multi-threading.md b/doc/src/manual/multi-threading.md index cc6c7f897353e..b20d0e54f1087 100644 --- a/doc/src/manual/multi-threading.md +++ b/doc/src/manual/multi-threading.md @@ -72,7 +72,61 @@ julia> Threads.threadid() three processes have 2 threads enabled. For more fine grained control over worker threads use [`addprocs`](@ref) and pass `-t`/`--threads` as `exeflags`. -## Data-race freedom +## [Threadpools](@id man-threadpools) + +When a program's threads are busy with many tasks to run, tasks may experience +delays which may negatively affect the responsiveness and interactivity of the +program. To address this, you can specify that a task is interactive when you +[`Threads.@spawn`](@ref) it: + +```julia +using Base.Threads +@spawn :interactive f() +``` + +Interactive tasks should avoid performing high latency operations, and if they +are long duration tasks, should yield frequently. + +Julia may be started with one or more threads reserved to run interactive tasks: + +```bash +$ julia --threads 3,1 +``` + +The environment variable `JULIA_NUM_THREADS` can also be used similarly: +```bash +export JULIA_NUM_THREADS=3,1 +``` + +This starts Julia with 3 threads in the `:default` threadpool and 1 thread in +the `:interactive` threadpool: + +```julia-repl +julia> using Base.Threads + +julia> nthreads() +4 + +julia> nthreadpools() +2 + +julia> threadpool() +:default + +julia> nthreads(:interactive) +1 +``` + +Either or both numbers can be replaced with the word `auto`, which causes +Julia to choose a reasonable default. + +## Communication and synchronization + +Although Julia's threads can communicate through shared memory, it is notoriously +difficult to write correct and data-race free multi-threaded code. Julia's +[`Channel`](@ref)s are thread-safe and may be used to communicate safely. + +### Data-race freedom You are entirely responsible for ensuring that your program is data-race free, and nothing promised here can be assumed if you do not observe that diff --git a/src/jl_exported_data.inc b/src/jl_exported_data.inc index 09d2949c22489..b0994ce0a0c4a 100644 --- a/src/jl_exported_data.inc +++ b/src/jl_exported_data.inc @@ -68,6 +68,7 @@ XX(jl_method_type) \ XX(jl_methtable_type) \ XX(jl_module_type) \ + XX(jl_n_threads_per_pool) \ XX(jl_namedtuple_type) \ XX(jl_namedtuple_typename) \ XX(jl_newvarnode_type) \ @@ -128,5 +129,6 @@ // Data symbols that are defined inside the public libjulia #define JL_EXPORTED_DATA_SYMBOLS(XX) \ + XX(jl_n_threadpools, int) \ XX(jl_n_threads, int) \ XX(jl_options, jl_options_t) diff --git a/src/jl_exported_funcs.inc b/src/jl_exported_funcs.inc index 3a69d1b82bd82..ef1f7c929f7e7 100644 --- a/src/jl_exported_funcs.inc +++ b/src/jl_exported_funcs.inc @@ -237,6 +237,7 @@ XX(jl_get_safe_restore) \ XX(jl_get_size) \ XX(jl_get_task_tid) \ + XX(jl_get_task_threadpoolid) \ XX(jl_get_tls_world_age) \ XX(jl_get_UNAME) \ XX(jl_get_world_counter) \ @@ -422,6 +423,7 @@ XX(jl_set_safe_restore) \ XX(jl_set_sysimg_so) \ XX(jl_set_task_tid) \ + XX(jl_set_task_threadpoolid) \ XX(jl_set_typeinf_func) \ XX(jl_set_zero_subnormals) \ XX(jl_sigatomic_begin) \ @@ -461,6 +463,7 @@ XX(jl_task_stack_buffer) \ XX(jl_test_cpu_feature) \ XX(jl_threadid) \ + XX(jl_threadpoolid) \ XX(jl_throw) \ XX(jl_throw_out_of_memory_error) \ XX(jl_too_few_args) \ diff --git a/src/jloptions.c b/src/jloptions.c index 4dc40a51a7882..ff7896e55fa55 100644 --- a/src/jloptions.c +++ b/src/jloptions.c @@ -38,7 +38,9 @@ JL_DLLEXPORT void jl_init_options(void) NULL, // cmds NULL, // image_file (will be filled in below) NULL, // cpu_target ("native", "core2", etc...) + 0, // nthreadpools 0, // nthreads + NULL, // nthreads_per_pool 0, // nprocs NULL, // machine_file NULL, // project @@ -112,16 +114,19 @@ static const char opts[] = " -L, --load Load immediately on all processors\n\n" // parallel options - " -t, --threads {N|auto} Enable N threads; \"auto\" tries to infer a useful default number\n" - " of threads to use but the exact behavior might change in the future.\n" - " Currently, \"auto\" uses the number of CPUs assigned to this julia\n" - " process based on the OS-specific affinity assignment interface, if\n" - " supported (Linux and Windows). If this is not supported (macOS) or\n" - " process affinity is not configured, it uses the number of CPU\n" - " threads.\n" - " -p, --procs {N|auto} Integer value N launches N additional local worker processes\n" - " \"auto\" launches as many workers as the number of local CPU threads (logical cores)\n" - " --machine-file Run processes on hosts listed in \n\n" + " -t, --threads {auto|N[,auto|M]}\n" + " Enable N[+M] threads; N threads are assigned to the `default`\n" + " threadpool, and if M is specified, M threads are assigned to the\n" + " `interactive` threadpool; \"auto\" tries to infer a useful\n" + " default number of threads to use but the exact behavior might change\n" + " in the future. Currently sets N to the number of CPUs assigned to\n" + " this Julia process based on the OS-specific affinity assignment\n" + " interface if supported (Linux and Windows) or to the number of CPU\n" + " threads if not supported (MacOS) or if process affinity is not\n" + " configured, and sets M to 1.\n" + " -p, --procs {N|auto} Integer value N launches N additional local worker processes\n" + " \"auto\" launches as many workers as the number of local CPU threads (logical cores)\n" + " --machine-file Run processes on hosts listed in \n\n" // interactive options " -i Interactive mode; REPL runs and `isinteractive()` is true\n" @@ -242,7 +247,7 @@ JL_DLLEXPORT void jl_parse_opts(int *argcp, char ***argvp) static const struct option longopts[] = { // exposed command line options // NOTE: This set of required arguments need to be kept in sync - // with the required arguments defined in base/client.jl `process_options()` + // with the required arguments defined in base/options.jl `struct JLOptions` { "version", no_argument, 0, 'v' }, { "help", no_argument, 0, 'h' }, { "help-hidden", no_argument, 0, opt_help_hidden }, @@ -447,15 +452,45 @@ JL_DLLEXPORT void jl_parse_opts(int *argcp, char ***argvp) break; case 't': // threads errno = 0; - if (!strcmp(optarg,"auto")) { + jl_options.nthreadpools = 1; + long nthreads = -1, nthreadsi = 0; + if (!strncmp(optarg, "auto", 4)) { jl_options.nthreads = -1; + if (optarg[4] == ',') { + if (!strncmp(&optarg[5], "auto", 4)) + nthreadsi = 1; + else { + errno = 0; + nthreadsi = strtol(&optarg[5], &endptr, 10); + if (errno != 0 || endptr == &optarg[5] || *endptr != 0 || nthreadsi < 1 || nthreadsi >= INT16_MAX) + jl_errorf("julia: -t,--threads=auto,; m must be an integer >= 1"); + } + jl_options.nthreadpools++; + } } else { - long nthreads = strtol(optarg, &endptr, 10); - if (errno != 0 || optarg == endptr || *endptr != 0 || nthreads < 1 || nthreads >= INT_MAX) - jl_errorf("julia: -t,--threads= must be an integer >= 1"); - jl_options.nthreads = (int)nthreads; + nthreads = strtol(optarg, &endptr, 10); + if (errno != 0 || optarg == endptr || nthreads < 1 || nthreads >= INT16_MAX) + jl_errorf("julia: -t,--threads=[,auto|]; n must be an integer >= 1"); + if (*endptr == ',') { + if (!strncmp(&endptr[1], "auto", 4)) + nthreadsi = 1; + else { + errno = 0; + char *endptri; + nthreadsi = strtol(&endptr[1], &endptri, 10); + if (errno != 0 || endptri == &endptr[1] || *endptri != 0 || nthreadsi < 1 || nthreadsi >= INT16_MAX) + jl_errorf("julia: -t,--threads=,; n and m must be integers >= 1"); + } + jl_options.nthreadpools++; + } + jl_options.nthreads = nthreads + nthreadsi; } + int16_t *ntpp = (int16_t *)malloc_s(jl_options.nthreadpools * sizeof(int16_t)); + ntpp[0] = (int16_t)nthreads; + if (jl_options.nthreadpools == 2) + ntpp[1] = (int16_t)nthreadsi; + jl_options.nthreads_per_pool = ntpp; break; case 'p': // procs errno = 0; @@ -464,7 +499,7 @@ JL_DLLEXPORT void jl_parse_opts(int *argcp, char ***argvp) } else { long nprocs = strtol(optarg, &endptr, 10); - if (errno != 0 || optarg == endptr || *endptr != 0 || nprocs < 1 || nprocs >= INT_MAX) + if (errno != 0 || optarg == endptr || *endptr != 0 || nprocs < 1 || nprocs >= INT16_MAX) jl_errorf("julia: -p,--procs= must be an integer >= 1"); jl_options.nprocs = (int)nprocs; } diff --git a/src/jloptions.h b/src/jloptions.h index 2425b2bb680c2..9ac681c4ffacf 100644 --- a/src/jloptions.h +++ b/src/jloptions.h @@ -13,7 +13,9 @@ typedef struct { const char **cmds; const char *image_file; const char *cpu_target; - int32_t nthreads; + int8_t nthreadpools; + int16_t nthreads; + const int16_t *nthreads_per_pool; int32_t nprocs; const char *machine_file; const char *project; diff --git a/src/julia.h b/src/julia.h index 70ffb9d916375..3587bfb0370c2 100644 --- a/src/julia.h +++ b/src/julia.h @@ -1647,7 +1647,9 @@ JL_DLLEXPORT int jl_is_debugbuild(void) JL_NOTSAFEPOINT; JL_DLLEXPORT jl_sym_t *jl_get_UNAME(void) JL_NOTSAFEPOINT; JL_DLLEXPORT jl_sym_t *jl_get_ARCH(void) JL_NOTSAFEPOINT; JL_DLLEXPORT jl_value_t *jl_get_libllvm(void) JL_NOTSAFEPOINT; +extern JL_DLLIMPORT int jl_n_threadpools; extern JL_DLLIMPORT int jl_n_threads; +extern JL_DLLIMPORT int *jl_n_threads_per_pool; // environment entries JL_DLLEXPORT jl_value_t *jl_environ(int i); @@ -1890,6 +1892,8 @@ typedef struct _jl_task_t { // hidden state: // id of owning thread - does not need to be defined until the task runs _Atomic(int16_t) tid; + // threadpool id + int8_t threadpoolid; // saved gc stack top for context switches jl_gcframe_t *gcstack; size_t world_age; @@ -1913,7 +1917,8 @@ typedef struct _jl_task_t { JL_DLLEXPORT jl_task_t *jl_new_task(jl_function_t*, jl_value_t*, size_t); JL_DLLEXPORT void jl_switchto(jl_task_t **pt); -JL_DLLEXPORT int jl_set_task_tid(jl_task_t *task, int tid) JL_NOTSAFEPOINT; +JL_DLLEXPORT int jl_set_task_tid(jl_task_t *task, int16_t tid) JL_NOTSAFEPOINT; +JL_DLLEXPORT int jl_set_task_threadpoolid(jl_task_t *task, int8_t tpid) JL_NOTSAFEPOINT; JL_DLLEXPORT void JL_NORETURN jl_throw(jl_value_t *e JL_MAYBE_UNROOTED); JL_DLLEXPORT void JL_NORETURN jl_rethrow(void); JL_DLLEXPORT void JL_NORETURN jl_sig_throw(void); diff --git a/src/julia_threads.h b/src/julia_threads.h index 22acf3aec8587..8228d1e056cb5 100644 --- a/src/julia_threads.h +++ b/src/julia_threads.h @@ -16,7 +16,7 @@ extern "C" { JL_DLLEXPORT int16_t jl_threadid(void); -JL_DLLEXPORT void jl_threading_profile(void); +JL_DLLEXPORT int8_t jl_threadpoolid(int16_t tid) JL_NOTSAFEPOINT; // JULIA_ENABLE_THREADING may be controlled by altering JULIA_THREADS in Make.user @@ -206,6 +206,7 @@ struct _jl_bt_element_t; #define JL_MAX_BT_SIZE 80000 typedef struct _jl_tls_states_t { int16_t tid; + int8_t threadpoolid; uint64_t rngseed; volatile size_t *safepoint; _Atomic(int8_t) sleep_check_state; // read/write from foreign threads diff --git a/src/options.h b/src/options.h index 36f34654b2bd0..5a1700708d9e7 100644 --- a/src/options.h +++ b/src/options.h @@ -134,6 +134,9 @@ # define JULIA_NUM_THREADS 1 #endif +// threadpools specification +#define THREADPOOLS_NAME "JULIA_THREADPOOLS" + // affinitization behavior #define MACHINE_EXCLUSIVE_NAME "JULIA_EXCLUSIVE" #define DEFAULT_MACHINE_EXCLUSIVE 0 diff --git a/src/partr.c b/src/partr.c index 52c4a468c77c2..9250ff1107108 100644 --- a/src/partr.c +++ b/src/partr.c @@ -50,7 +50,7 @@ uint64_t io_wakeup_leave; uv_mutex_t *sleep_locks; uv_cond_t *wake_signals; -JL_DLLEXPORT int jl_set_task_tid(jl_task_t *task, int tid) JL_NOTSAFEPOINT +JL_DLLEXPORT int jl_set_task_tid(jl_task_t *task, int16_t tid) JL_NOTSAFEPOINT { // Try to acquire the lock on this task. int16_t was = jl_atomic_load_relaxed(&task->tid); @@ -61,11 +61,18 @@ JL_DLLEXPORT int jl_set_task_tid(jl_task_t *task, int tid) JL_NOTSAFEPOINT return 0; } +JL_DLLEXPORT int jl_set_task_threadpoolid(jl_task_t *task, int8_t tpid) JL_NOTSAFEPOINT +{ + if (tpid < 0 || tpid >= jl_n_threadpools) + return 0; + task->threadpoolid = tpid; + return 1; +} + // GC functions used extern int jl_gc_mark_queue_obj_explicit(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp, jl_value_t *obj) JL_NOTSAFEPOINT; - // parallel task runtime // --- diff --git a/src/task.c b/src/task.c index 52ca2ba4f2015..349f6ab545196 100644 --- a/src/task.c +++ b/src/task.c @@ -800,6 +800,7 @@ JL_DLLEXPORT jl_task_t *jl_new_task(jl_function_t *start, jl_value_t *completion t->started = 0; t->priority = 0; jl_atomic_store_relaxed(&t->tid, t->copy_stack ? jl_atomic_load_relaxed(&ct->tid) : -1); // copy_stacks are always pinned since they can't be moved + t->threadpoolid = ct->threadpoolid; t->ptls = NULL; t->world_age = ct->world_age; @@ -1361,6 +1362,7 @@ jl_task_t *jl_init_root_task(jl_ptls_t ptls, void *stack_lo, void *stack_hi) ct->gcstack = NULL; ct->excstack = NULL; jl_atomic_store_relaxed(&ct->tid, ptls->tid); + ct->threadpoolid = jl_threadpoolid(ptls->tid); ct->sticky = 1; ct->ptls = ptls; ct->world_age = 1; // OK to run Julia code on this task @@ -1407,6 +1409,11 @@ JL_DLLEXPORT int16_t jl_get_task_tid(jl_task_t *t) JL_NOTSAFEPOINT return jl_atomic_load_relaxed(&t->tid); } +JL_DLLEXPORT int8_t jl_get_task_threadpoolid(jl_task_t *t) +{ + return t->threadpoolid; +} + #ifdef _OS_WINDOWS_ #if defined(_CPU_X86_) diff --git a/src/threading.c b/src/threading.c index b0757ad106dec..4464406d21a76 100644 --- a/src/threading.c +++ b/src/threading.c @@ -292,13 +292,24 @@ JL_DLLEXPORT _Atomic(uint64_t) jl_cumulative_compile_time = 0; JL_DLLEXPORT _Atomic(uint64_t) jl_cumulative_recompile_time = 0; // return calling thread's ID -// Also update the suspended_threads list in signals-mach when changing the -// type of the thread id. JL_DLLEXPORT int16_t jl_threadid(void) { return jl_atomic_load_relaxed(&jl_current_task->tid); } +JL_DLLEXPORT int8_t jl_threadpoolid(int16_t tid) JL_NOTSAFEPOINT +{ + if (tid < 0 || tid >= jl_n_threads) + jl_error("invalid tid"); + int n = 0; + for (int i = 0; i < jl_n_threadpools; i++) { + n += jl_n_threads_per_pool[i]; + if (tid < n) + return (int8_t)i; + } + jl_error("internal error: couldn't determine threadpool id"); +} + jl_ptls_t jl_init_threadtls(int16_t tid) { jl_ptls_t ptls = (jl_ptls_t)calloc(1, sizeof(jl_tls_states_t)); @@ -452,22 +463,55 @@ void jl_init_threading(void) jl_check_tls(); #endif - // how many threads available, usable + // Determine how many threads and pools are requested. This may have been + // specified on the command line (and so are in `jl_options`) or by the + // environment variable. Set the globals `jl_n_threadpools`, `jl_n_threads` + // and `jl_n_threads_per_pool`. + jl_n_threadpools = 1; jl_n_threads = JULIA_NUM_THREADS; - if (jl_options.nthreads < 0) { // --threads=auto - jl_n_threads = jl_effective_threads(); - } - else if (jl_options.nthreads > 0) { // --threads=N - jl_n_threads = jl_options.nthreads; + int16_t nthreads = jl_n_threads, nthreadsi = 0; + char *endptr, *endptri; + + if (jl_options.nthreads != 0) { // --threads specified + jl_n_threadpools = jl_options.nthreadpools; + nthreads = jl_options.nthreads_per_pool[0]; + if (nthreads < 0) + nthreads = jl_effective_threads(); + if (jl_n_threadpools == 2) + nthreadsi = jl_options.nthreads_per_pool[1]; } - else if ((cp = getenv(NUM_THREADS_NAME))) { - if (strcmp(cp, "auto")) - jl_n_threads = (uint64_t)strtol(cp, NULL, 10); // ENV[NUM_THREADS_NAME] == "N" - else - jl_n_threads = jl_effective_threads(); // ENV[NUM_THREADS_NAME] == "auto" + else if ((cp = getenv(NUM_THREADS_NAME))) { // ENV[NUM_THREADS_NAME] specified + if (!strncmp(cp, "auto", 4)) { + nthreads = jl_effective_threads(); + cp += 4; + } + else { + errno = 0; + nthreads = strtol(cp, &endptr, 10); + if (errno != 0 || endptr == cp || nthreads <= 0) + nthreads = 1; + cp = endptr; + } + if (*cp == ',') { + cp++; + if (!strncmp(cp, "auto", 4)) + nthreadsi = 1; + else { + errno = 0; + nthreadsi = strtol(cp, &endptri, 10); + if (errno != 0 || endptri == cp || nthreadsi < 0) + nthreadsi = 0; + } + if (nthreadsi > 0) + jl_n_threadpools++; + } } - if (jl_n_threads <= 0) - jl_n_threads = 1; + + jl_n_threads = nthreads + nthreadsi; + jl_n_threads_per_pool = (int *)malloc(2 * sizeof(int)); + jl_n_threads_per_pool[0] = nthreads; + jl_n_threads_per_pool[1] = nthreadsi; + #ifndef __clang_gcanalyzer__ jl_all_tls_states = (jl_ptls_t*)calloc(jl_n_threads, sizeof(void*)); #endif @@ -513,7 +557,7 @@ void jl_start_threads(void) uv_barrier_init(&thread_init_done, nthreads); for (i = 1; i < nthreads; ++i) { - jl_threadarg_t *t = (jl_threadarg_t*)malloc_s(sizeof(jl_threadarg_t)); // ownership will be passed to the thread + jl_threadarg_t *t = (jl_threadarg_t *)malloc_s(sizeof(jl_threadarg_t)); // ownership will be passed to the thread t->tid = i; t->barrier = &thread_init_done; uv_thread_create(&uvtid, jl_threadfun, t); diff --git a/test/llvmpasses/alloc-opt-gcframe.jl b/test/llvmpasses/alloc-opt-gcframe.jl index 78323c15e793f..3b5fc3a51a606 100644 --- a/test/llvmpasses/alloc-opt-gcframe.jl +++ b/test/llvmpasses/alloc-opt-gcframe.jl @@ -14,7 +14,7 @@ target datalayout = "e-m:o-i64:64-f80:128-n8:16:32:64-S128" # CHECK-LABEL: @return_obj # CHECK-NOT: @julia.gc_alloc_obj # CHECK: %current_task = getelementptr inbounds {}*, {}** %gcstack, i64 -12 -# CHECK-NEXT: [[ptls_field:%.*]] = getelementptr inbounds {}*, {}** %current_task, i64 14 +# CHECK-NEXT: [[ptls_field:%.*]] = getelementptr inbounds {}*, {}** %current_task, i64 15 # CHECK-NEXT: [[ptls_load:%.*]] = load {}*, {}** [[ptls_field]], align 8, !tbaa !0 # CHECK-NEXT: [[ppjl_ptls:%.*]] = bitcast {}* [[ptls_load]] to {}** # CHECK-NEXT: [[ptls_i8:%.*]] = bitcast {}** [[ppjl_ptls]] to i8* diff --git a/test/llvmpasses/late-lower-gc.ll b/test/llvmpasses/late-lower-gc.ll index a7ab932dc08f1..c2b67f70111ea 100644 --- a/test/llvmpasses/late-lower-gc.ll +++ b/test/llvmpasses/late-lower-gc.ll @@ -42,7 +42,7 @@ top: %0 = bitcast {}*** %pgcstack to {}** %current_task = getelementptr inbounds {}*, {}** %0, i64 -12 ; CHECK: %current_task = getelementptr inbounds {}*, {}** %0, i64 -12 -; CHECK-NEXT: [[ptls_field:%.*]] = getelementptr inbounds {}*, {}** %current_task, i64 14 +; CHECK-NEXT: [[ptls_field:%.*]] = getelementptr inbounds {}*, {}** %current_task, i64 15 ; CHECK-NEXT: [[ptls_load:%.*]] = load {}*, {}** [[ptls_field]], align 8, !tbaa !0 ; CHECK-NEXT: [[ppjl_ptls:%.*]] = bitcast {}* [[ptls_load]] to {}** ; CHECK-NEXT: [[ptls_i8:%.*]] = bitcast {}** [[ppjl_ptls]] to i8* @@ -67,7 +67,7 @@ top: %0 = bitcast {}*** %pgcstack to {}** %current_task = getelementptr inbounds {}*, {}** %0, i64 -12 ; CHECK: %current_task = getelementptr inbounds {}*, {}** %0, i64 -12 -; CHECK-NEXT: [[ptls_field:%.*]] = getelementptr inbounds {}*, {}** %current_task, i64 14 +; CHECK-NEXT: [[ptls_field:%.*]] = getelementptr inbounds {}*, {}** %current_task, i64 15 ; CHECK-NEXT: [[ptls_load:%.*]] = load {}*, {}** [[ptls_field]], align 8, !tbaa !0 ; CHECK-NEXT: [[ppjl_ptls:%.*]] = bitcast {}* [[ptls_load]] to {}** ; CHECK-NEXT: [[ptls_i8:%.*]] = bitcast {}** [[ppjl_ptls]] to i8* diff --git a/test/misc.jl b/test/misc.jl index 7291688de8f46..4035251f61ccf 100644 --- a/test/misc.jl +++ b/test/misc.jl @@ -581,7 +581,7 @@ end let optstring = repr("text/plain", Base.JLOptions()) @test startswith(optstring, "JLOptions(\n") - @test !occursin("Ptr", optstring) + @test !occursin("Ptr{UInt8}", optstring) @test endswith(optstring, "\n)") @test occursin(" = \"", optstring) end @@ -589,7 +589,7 @@ let optstring = repr(Base.JLOptions()) @test startswith(optstring, "JLOptions(") @test endswith(optstring, ")") @test !occursin("\n", optstring) - @test !occursin("Ptr", optstring) + @test !occursin("Ptr{UInt8}", optstring) @test occursin(" = \"", optstring) end diff --git a/test/threadpool_latency.jl b/test/threadpool_latency.jl new file mode 100644 index 0000000000000..bdf02b81da03f --- /dev/null +++ b/test/threadpool_latency.jl @@ -0,0 +1,50 @@ +# This file is a part of Julia. License is MIT: https://julialang.org/license + +using Test +using Base.Threads + +# This test has not been added to CI as there can be unexpected delays +# which cause timing-dependent actions to fail. + +#= +Test to ensure that the interactive threadpool works as designed. + +Task A is a standard task that does a lot of work (~2 seconds) without +yielding. This would prevent ordinarily prevent other tasks from running. + +Task B is an interactive task that does a little work (~0.02 seconds) and +yields. + +With an interactive threadpool, multiple task Bs should not see notable +delays in execution even when multiple task As are occupying Julia's +default threads. + +This test should fail in the absence of an interactive thread. +=# +const N = 263000000 # busywork(N) takes ~1 sec on an i7-9750H @ 2.6GHz +function busywork(n::Int) + acc = 0 + for i = 1:n + x = rand(2:10) + acc += i * x + end + return acc +end + +function itask() + h = N รท 50 + for i = 1:100 + t1 = time() + busywork(h) + yield() + t2 = time() + @test t2 - t1 < 0.15 + end +end + +it1 = @spawn :interactive itask() +ti1 = @spawn busywork(N * 2); +it2 = @spawn :interactive itask() +ti2 = @spawn busywork(N * 2); +wait(it1) +wait(it2) diff --git a/test/threadpool_use.jl b/test/threadpool_use.jl new file mode 100644 index 0000000000000..92a4458ee8076 --- /dev/null +++ b/test/threadpool_use.jl @@ -0,0 +1,16 @@ +# This file is a part of Julia. License is MIT: https://julialang.org/license + +using Test +using Base.Threads + +@test nthreadpools() == 2 +@test threadpool() == :default +@test threadpool(2) == :interactive +dtask() = @test threadpool(current_task()) == :default +itask() = @test threadpool(current_task()) == :interactive +dt1 = @spawn dtask() +dt2 = @spawn :default dtask() +it = @spawn :interactive itask() +wait(dt1) +wait(dt2) +wait(it) diff --git a/test/threads.jl b/test/threads.jl index bf387d0c74f00..09e802757062b 100644 --- a/test/threads.jl +++ b/test/threads.jl @@ -79,6 +79,23 @@ let cmd = `$(Base.julia_cmd()) --depwarn=error --rr-detach --startup-file=no thr end end +# Timing-sensitive tests can fail on CI due to occasional unexpected delays, +# so this test is disabled. +#= +let cmd = `$(Base.julia_cmd()) --depwarn=error --rr-detach --startup-file=no threadpool_latency.jl` + for test_nthreads in (1, 2) + new_env = copy(ENV) + new_env["JULIA_NUM_THREADS"] = string(test_nthreads, ",1") + run(pipeline(setenv(cmd, new_env), stdout = stdout, stderr = stderr)) + end +end +=# +let cmd = `$(Base.julia_cmd()) --depwarn=error --rr-detach --startup-file=no threadpool_use.jl` + new_env = copy(ENV) + new_env["JULIA_NUM_THREADS"] = "1,1" + run(pipeline(setenv(cmd, new_env), stdout = stdout, stderr = stderr)) +end + function run_with_affinity(cpus) script = joinpath(@__DIR__, "print_process_affinity.jl") return readchomp(setcpuaffinity(`$(Base.julia_cmd()) $script`, cpus))