Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add threadpool support to runtime #42302

Merged
merged 2 commits into from
Apr 26, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions base/options.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# This file is a part of Julia. License is MIT: https://julialang.org/license

# NOTE: This type needs to be kept in sync with jl_options in src/julia.h
# NOTE: This type needs to be kept in sync with jl_options in src/jloptions.h
struct JLOptions
quiet::Int8
banner::Int8
Expand All @@ -9,7 +9,9 @@ struct JLOptions
commands::Ptr{Ptr{UInt8}} # (e)eval, (E)print, (L)load
image_file::Ptr{UInt8}
cpu_target::Ptr{UInt8}
nthreads::Int32
nthreadpools::Int16
nthreads::Int16
nthreads_per_pool::Ptr{Int16}
nprocs::Int32
machine_file::Ptr{UInt8}
project::Ptr{UInt8}
Expand Down
78 changes: 45 additions & 33 deletions base/partr.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module Partr

using ..Threads: SpinLock, nthreads
using ..Threads: SpinLock, nthreads, threadid

# a task minheap
mutable struct taskheap
Expand All @@ -16,12 +16,13 @@ end

# multiqueue minheap state
const heap_d = UInt32(8)
global heaps::Vector{taskheap} = Vector{taskheap}(undef, 0)
const heaps_lock = SpinLock()
global cong_unbias::UInt32 = typemax(UInt32)
const heaps = [Vector{taskheap}(undef, 0), Vector{taskheap}(undef, 0)]
const heaps_lock = [SpinLock(), SpinLock()]
const cong_unbias = [typemax(UInt32), typemax(UInt32)]


cong(max::UInt32, unbias::UInt32) = ccall(:jl_rand_ptls, UInt32, (UInt32, UInt32), max, unbias) + UInt32(1)
cong(max::UInt32, unbias::UInt32) =
ccall(:jl_rand_ptls, UInt32, (UInt32, UInt32), max, unbias) + UInt32(1)

function unbias_cong(max::UInt32)
return typemax(UInt32) - ((typemax(UInt32) % max) + UInt32(1))
Expand Down Expand Up @@ -60,46 +61,52 @@ function multiq_sift_down(heap::taskheap, idx::Int32)
end


function multiq_size()
function multiq_size(tpid::Int8)
nt = UInt32(Threads._nthreads_in_pool(tpid))
tp = tpid + 1
tpheaps = heaps[tp]
heap_c = UInt32(2)
heap_p = UInt32(length(heaps))
nt = UInt32(nthreads())
heap_p = UInt32(length(tpheaps))

if heap_c * nt <= heap_p
return heap_p
end

@lock heaps_lock begin
heap_p = UInt32(length(heaps))
nt = UInt32(nthreads())
@lock heaps_lock[tp] begin
heap_p = UInt32(length(tpheaps))
nt = UInt32(Threads._nthreads_in_pool(tpid))
if heap_c * nt <= heap_p
return heap_p
end

heap_p += heap_c * nt
Comment on lines +77 to 82
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the wait-free property of multiqueue, we need nt = nthreads() since all worker threads from all worker pools are possible enqueuers. We probably don't need to worry too much about this for now since most of enqueues happen inside each pool and a theoretical guarantee does not imply practical performance in general. But I think it's something to watch for when using it in practice especially when there are intensive inter-pool synchronizations. For example, a rather very conservative but wasteful approach is heap_p = max(heap_c * _nthreads_in_pool(tpid), nthreads() + 1).

@vtjnash Why is += used in heap_p += heap_c * nt instead of = by the way? We only need heap_p / nt > 1 here, right? The heap vector is either empy or fully initialized so I suppose it doesn't matter ATM though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good point. In particular, to retain interactivity, it should be a common pattern for an interactive task to enqueue work to the default threadpool. However, as you say, it may not be a problem in practice, and in any case we have many problems with the use of multi-queues in general; IMO the way forward is an evolution of your #43366.

newheaps = Vector{taskheap}(undef, heap_p)
copyto!(newheaps, heaps)
for i = (1 + length(heaps)):heap_p
copyto!(newheaps, tpheaps)
for i = (1 + length(tpheaps)):heap_p
newheaps[i] = taskheap()
end
global heaps = newheaps
global cong_unbias = unbias_cong(heap_p)
heaps[tp] = newheaps
cong_unbias[tp] = unbias_cong(heap_p)
end

return heap_p
end


function multiq_insert(task::Task, priority::UInt16)
tpid = ccall(:jl_get_task_threadpoolid, Int8, (Any,), task)
heap_p = multiq_size(tpid)
tp = tpid + 1

task.priority = priority

heap_p = multiq_size()
rn = cong(heap_p, cong_unbias)
while !trylock(heaps[rn].lock)
rn = cong(heap_p, cong_unbias)
rn = cong(heap_p, cong_unbias[tp])
tpheaps = heaps[tp]
while !trylock(tpheaps[rn].lock)
rn = cong(heap_p, cong_unbias[tp])
end

heap = heaps[rn]
heap = tpheaps[rn]
if heap.ntasks >= length(heap.tasks)
resize!(heap.tasks, length(heap.tasks) * 2)
end
Expand All @@ -122,34 +129,37 @@ function multiq_deletemin()
local rn1, rn2
local prio1, prio2

tid = Threads.threadid()
tp = ccall(:jl_threadpoolid, Int8, (Int16,), tid-1) + 1
tpheaps = heaps[tp]

@label retry
GC.safepoint()
heap_p = UInt32(length(heaps))
heap_p = UInt32(length(tpheaps))
for i = UInt32(0):heap_p
if i == heap_p
return nothing
end
rn1 = cong(heap_p, cong_unbias)
rn2 = cong(heap_p, cong_unbias)
prio1 = heaps[rn1].priority
prio2 = heaps[rn2].priority
rn1 = cong(heap_p, cong_unbias[tp])
rn2 = cong(heap_p, cong_unbias[tp])
prio1 = tpheaps[rn1].priority
prio2 = tpheaps[rn2].priority
if prio1 > prio2
prio1 = prio2
rn1 = rn2
elseif prio1 == prio2 && prio1 == typemax(UInt16)
continue
end
if trylock(heaps[rn1].lock)
if prio1 == heaps[rn1].priority
if trylock(tpheaps[rn1].lock)
if prio1 == tpheaps[rn1].priority
break
end
unlock(heaps[rn1].lock)
unlock(tpheaps[rn1].lock)
end
end

heap = heaps[rn1]
heap = tpheaps[rn1]
task = heap.tasks[1]
tid = Threads.threadid()
if ccall(:jl_set_task_tid, Cint, (Any, Cint), task, tid-1) == 0
unlock(heap.lock)
@goto retry
Expand All @@ -171,9 +181,11 @@ end


function multiq_check_empty()
for i = UInt32(1):length(heaps)
if heaps[i].ntasks != 0
return false
for j = UInt32(1):length(heaps)
for i = UInt32(1):length(heaps[j])
if heaps[j][i].ntasks != 0
return false
end
end
end
return true
Expand Down
4 changes: 4 additions & 0 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ true
istaskfailed(t::Task) = (load_state_acquire(t) === task_state_failed)

Threads.threadid(t::Task) = Int(ccall(:jl_get_task_tid, Int16, (Any,), t)+1)
function Threads.threadpool(t::Task)
tpid = ccall(:jl_get_task_threadpoolid, Int8, (Any,), t)
return tpid == 0 ? :default : :interactive
end

task_result(t::Task) = t.result

Expand Down
2 changes: 1 addition & 1 deletion base/threadcall.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const threadcall_restrictor = Semaphore(max_ccall_threads)

The `@threadcall` macro is called in the same way as [`ccall`](@ref) but does the work
in a different thread. This is useful when you want to call a blocking C
function without causing the main `julia` thread to become blocked. Concurrency
function without causing the current `julia` thread to become blocked. Concurrency
is limited by size of the libuv thread pool, which defaults to 4 threads but
can be increased by setting the `UV_THREADPOOL_SIZE` environment variable and
restarting the `julia` process.
Expand Down
104 changes: 84 additions & 20 deletions base/threadingconstructs.jl
Original file line number Diff line number Diff line change
@@ -1,26 +1,62 @@
# This file is a part of Julia. License is MIT: https://julialang.org/license

export threadid, nthreads, @threads, @spawn
export threadid, nthreads, @threads, @spawn,
threadpool, nthreadpools

"""
Threads.threadid()
Threads.threadid() -> Int

Get the ID number of the current thread of execution. The master thread has ID `1`.
Get the ID number of the current thread of execution. The master thread has
ID `1`.
"""
threadid() = Int(ccall(:jl_threadid, Int16, ())+1)

# Inclusive upper bound on threadid()
"""
Threads.nthreads()
Threads.nthreads([:default|:interactive]) -> Int

Get the number of threads available to the Julia process. This is the inclusive upper bound
on [`threadid()`](@ref).
Get the number of threads (across all thread pools or within the specified
thread pool) available to Julia. The number of threads across all thread
pools is the inclusive upper bound on [`threadid()`](@ref).

See also: `BLAS.get_num_threads` and `BLAS.set_num_threads` in the
[`LinearAlgebra`](@ref man-linalg) standard library, and `nprocs()` in the
[`Distributed`](@ref man-distributed) standard library.
"""
function nthreads end

nthreads() = Int(unsafe_load(cglobal(:jl_n_threads, Cint)))
function nthreads(pool::Symbol)
if pool == :default
tpid = Int8(0)
elseif pool == :interactive
tpid = Int8(1)
else
error("invalid threadpool specified")
end
return _nthreads_in_pool(tpid)
end
function _nthreads_in_pool(tpid::Int8)
p = unsafe_load(cglobal(:jl_n_threads_per_pool, Ptr{Cint}))
return Int(unsafe_load(p, tpid + 1))
end

"""
Threads.threadpool(tid = threadid()) -> Symbol

Returns the specified thread's threadpool; either `:default` or `:interactive`.
"""
function threadpool(tid = threadid())
tpid = ccall(:jl_threadpoolid, Int8, (Int16,), tid-1)
return tpid == 0 ? :default : :interactive
end

"""
Threads.nthreadpools() -> Int

Returns the number of threadpools currently configured.
"""
nthreadpools() = Int(unsafe_load(cglobal(:jl_n_threadpools, Cint)))
kpamnany marked this conversation as resolved.
Show resolved Hide resolved


function threading_run(fun, static)
ccall(:jl_enter_threaded_region, Cvoid, ())
Expand Down Expand Up @@ -48,7 +84,7 @@ function _threadsfor(iter, lbody, schedule)
quote
local threadsfor_fun
let range = $(esc(range))
function threadsfor_fun(tid=1; onethread=false)
function threadsfor_fun(tid = 1; onethread = false)
r = range # Load into local variable
lenr = length(r)
# divide loop iterations among threads
Expand Down Expand Up @@ -232,35 +268,63 @@ macro threads(args...)
end

"""
Threads.@spawn expr
Threads.@spawn [:default|:interactive] expr
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name :interactive seems to be rather too specific. I think it's reasonable to put any latency-oriented tasks in the "secondary" task pool. Not sure what the right name is, though. Maybe something like :io, :shortlatency, :quick, :I_promise_that_I_yield_often, ...

Copy link
Contributor

@kpamnany kpamnany Apr 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find :interactive more appropriate than any of those but :shortlatency (or maybe :lowlatency) is the best of those. I'm open to other suggestions.

The fact is that there is really no difference between the threadpools -- the term :interactive is actually a suggestion to the programmer to abide by a handshake agreement to avoid running long duration tasks in this pool, or to yield frequently.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally prefer :io but :lowlatency also sounds good to me.

I don't think :interactive is good. Maybe you have some sensory devices and do some quick managing before sending it to the main pool.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:io is nicely minimal, but incomplete? How about we get some more input? @vtjnash, @JeffBezanson, @jpsamaroo, thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:interactive or :lowlatency are my votes. :io is an acronym which may not be universally understood, and :shortlatency is really better phrased as :lowlatency. :quick is a super overloaded word and not particularly useful.


Create a [`Task`](@ref) and [`schedule`](@ref) it to run on any available thread.
The task is allocated to a thread after it becomes available. To wait for the task
to finish, call [`wait`](@ref) on the result of this macro, or call [`fetch`](@ref) to
wait and then obtain its return value.
Create a [`Task`](@ref) and [`schedule`](@ref) it to run on any available
thread in the specified threadpool (`:default` if unspecified). The task is
allocated to a thread once one becomes available. To wait for the task to
finish, call [`wait`](@ref) on the result of this macro, or call
[`fetch`](@ref) to wait and then obtain its return value.

Values can be interpolated into `@spawn` via `\$`, which copies the value directly into the
constructed underlying closure. This allows you to insert the _value_ of a variable,
isolating the asynchronous code from changes to the variable's value in the current task.
Values can be interpolated into `@spawn` via `\$`, which copies the value
directly into the constructed underlying closure. This allows you to insert
the _value_ of a variable, isolating the asynchronous code from changes to
the variable's value in the current task.

!!! note
See the manual chapter on threading for important caveats.
See the manual chapter on [multi-threading](@ref man-multithreading)
for important caveats. See also the chapter on [threadpools](@ref man-threadpools).

!!! compat "Julia 1.3"
This macro is available as of Julia 1.3.

!!! compat "Julia 1.4"
Interpolating values via `\$` is available as of Julia 1.4.

!!! compat "Julia 1.9"
A threadpool may be specified as of Julia 1.9.
"""
macro spawn(expr)
letargs = Base._lift_one_interp!(expr)
macro spawn(args...)
tpid = Int8(0)
na = length(args)
if na == 2
ttype, ex = args
if ttype isa QuoteNode
ttype = ttype.value
elseif ttype isa Symbol
# TODO: allow unquoted symbols
ttype = nothing
end
if ttype === :interactive
tpid = Int8(1)
elseif ttype !== :default
throw(ArgumentError("unsupported threadpool in @spawn: $ttype"))
end
elseif na == 1
ex = args[1]
else
throw(ArgumentError("wrong number of arguments in @spawn"))
end

letargs = Base._lift_one_interp!(ex)

thunk = esc(:(()->($expr)))
thunk = esc(:(()->($ex)))
var = esc(Base.sync_varname)
quote
let $(letargs...)
local task = Task($thunk)
task.sticky = false
ccall(:jl_set_task_threadpoolid, Cint, (Any, Int8), task, $tpid)
if $(Expr(:islocal, var))
put!($var, task)
end
Expand Down
4 changes: 3 additions & 1 deletion doc/src/base/multi-threading.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ Base.Threads.foreach
Base.Threads.@spawn
Base.Threads.threadid
Base.Threads.nthreads
Base.Threads.threadpool
Base.Threads.nthreadpools
```

See also [Multi-Threading](@ref man-multithreading).
Expand Down Expand Up @@ -49,7 +51,7 @@ Base.Threads.atomic_min!
Base.Threads.atomic_fence
```

## ccall using a threadpool (Experimental)
## ccall using a libuv threadpool (Experimental)

```@docs
Base.@threadcall
Expand Down
Loading