From 2389de05714f16805c49fafcb8418d78c5e13923 Mon Sep 17 00:00:00 2001 From: Kiran Date: Fri, 11 Aug 2023 09:29:07 -0400 Subject: [PATCH] Add a `threadpool` parameter to `Channel` constructor (#50858) Without this, the task created by a `Channel` will run in the threadpool of the creating task; in the REPL, this could be the interactive threadpool. On 1.8, without threadpools: ```julia % julia +1.8 -t 8 _ _ _ _(_)_ | Documentation: https://docs.julialang.org (_) | (_) (_) | _ _ _| |_ __ _ | Type "?" for help, "]?" for Pkg help. | | | | | | |/ _` | | | | |_| | | | (_| | | Version 1.8.5 (2023-01-08) _/ |\__'_|_|_|\__'_| | Official https://julialang.org/ release |__/ | julia> for _ in 1:10 Channel{Int}(1; spawn=true) do _ Core.print("threadid=$(Threads.threadid())\n") end end threadid=2 threadid=5 threadid=2 threadid=2 threadid=1 threadid=6 threadid=7 threadid=8 threadid=3 threadid=4 ``` On 1.9, with no interactive threads: ```julia % julia +1.9 -t 8 _ _ _ _(_)_ | Documentation: https://docs.julialang.org (_) | (_) (_) | _ _ _| |_ __ _ | Type "?" for help, "]?" for Pkg help. | | | | | | |/ _` | | | | |_| | | | (_| | | Version 1.9.2 (2023-07-05) _/ |\__'_|_|_|\__'_| | Official https://julialang.org/ release |__/ | julia> for _ in 1:10 Channel{Int}(1; spawn=true) do _ Core.print("threadid=$(Threads.threadid())\n") end end threadid=4 threadid=4 threadid=4 threadid=2 threadid=3 threadid=1 threadid=7 threadid=5 threadid=8 threadid=6 ``` On 1.9, with an interactive thread: ```julia % julia +1.9 -t 7,1 _ _ _ _(_)_ | Documentation: https://docs.julialang.org (_) | (_) (_) | _ _ _| |_ __ _ | Type "?" for help, "]?" for Pkg help. | | | | | | |/ _` | | | | |_| | | | (_| | | Version 1.9.2 (2023-07-05) _/ |\__'_|_|_|\__'_| | Official https://julialang.org/ release |__/ | julia> for _ in 1:10 Channel{Int}(1; spawn=true) do _ Core.print("threadid=$(Threads.threadid())\n") end end threadid=1 threadid=1 threadid=1 threadid=1 threadid=1 threadid=1 threadid=1 threadid=1 threadid=1 threadid=1 ``` With this PR, the `:default` threadpool is used instead. ```julia % julia +master -t7,1 _ _ _ _(_)_ | Documentation: https://docs.julialang.org (_) | (_) (_) | _ _ _| |_ __ _ | Type "?" for help, "]?" for Pkg help. | | | | | | |/ _` | | | | |_| | | | (_| | | Version 1.11.0-DEV.244 (2023-08-09) _/ |\__'_|_|_|\__'_| | Commit d99f2496ff* (0 days old master) |__/ | julia> for _ in 1:10 Channel{Int}(1; spawn=true) do _ Core.print("threadid=$(Threads.threadid())\n") end end threadid=7 threadid=6 threadid=7 threadid=7 threadid=6 threadid=3 threadid=5 threadid=2 threadid=4 threadid=8 ``` And, the behavior can be overridden. ```julia % julia +master -t7,1 _ _ _ _(_)_ | Documentation: https://docs.julialang.org (_) | (_) (_) | _ _ _| |_ __ _ | Type "?" for help, "]?" for Pkg help. | | | | | | |/ _` | | | | |_| | | | (_| | | Version 1.11.0-DEV.244 (2023-08-09) _/ |\__'_|_|_|\__'_| | Commit d99f2496ff* (0 days old master) |__/ | julia> for _ in 1:10 Channel{Int}(1; spawn=true, threadpool=:interactive) do _ Core.print("threadid=$(Threads.threadid())\n") end end threadid=1 threadid=1 threadid=1 threadid=1 threadid=1 threadid=1 threadid=1 threadid=1 threadid=1 threadid=1 ``` --------- Co-authored-by: Nathan Daly (cherry picked from commit 555cd2304fb72669cc2cd92adb485306cc0caa7e) --- base/channels.jl | 28 +++++++++++++++++++++------- test/channel_threadpool.jl | 14 ++++++++++++++ test/channels.jl | 5 +++++ 3 files changed, 40 insertions(+), 7 deletions(-) create mode 100644 test/channel_threadpool.jl diff --git a/base/channels.jl b/base/channels.jl index 1b5b427f92671..90dac37f41cb6 100644 --- a/base/channels.jl +++ b/base/channels.jl @@ -59,7 +59,7 @@ Channel(sz=0) = Channel{Any}(sz) # special constructors """ - Channel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false) + Channel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing) Create a new task from `func`, bind it to a new channel of type `T` and size `size`, and schedule the task, all in a single call. @@ -70,9 +70,14 @@ The channel is automatically closed when the task terminates. If you need a reference to the created task, pass a `Ref{Task}` object via the keyword argument `taskref`. -If `spawn = true`, the Task created for `func` may be scheduled on another thread +If `spawn=true`, the `Task` created for `func` may be scheduled on another thread in parallel, equivalent to creating a task via [`Threads.@spawn`](@ref). +If `spawn=true` and the `threadpool` argument is not set, it defaults to `:default`. + +If the `threadpool` argument is set (to `:default` or `:interactive`), this implies +that `spawn=true` and the new Task is spawned to the specified threadpool. + Return a `Channel`. # Examples @@ -117,6 +122,9 @@ true In earlier versions of Julia, Channel used keyword arguments to set `size` and `T`, but those constructors are deprecated. +!!! compat "Julia 1.9" + The `threadpool=` argument was added in Julia 1.9. + ```jldoctest julia> chnl = Channel{Char}(1, spawn=true) do ch for c in "hello world" @@ -129,12 +137,18 @@ julia> String(collect(chnl)) "hello world" ``` """ -function Channel{T}(func::Function, size=0; taskref=nothing, spawn=false) where T +function Channel{T}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing) where T chnl = Channel{T}(size) task = Task(() -> func(chnl)) + if threadpool === nothing + threadpool = :default + else + spawn = true + end task.sticky = !spawn bind(chnl, task) if spawn + Threads._spawn_set_thrpool(task, threadpool) schedule(task) # start it on (potentially) another thread else yield(task) # immediately start it, yielding the current thread @@ -149,17 +163,17 @@ Channel(func::Function, args...; kwargs...) = Channel{Any}(func, args...; kwargs # of course not deprecated.) # We use `nothing` default values to check which arguments were set in order to throw the # deprecation warning if users try to use `spawn=` with `ctype=` or `csize=`. -function Channel(func::Function; ctype=nothing, csize=nothing, taskref=nothing, spawn=nothing) +function Channel(func::Function; ctype=nothing, csize=nothing, taskref=nothing, spawn=nothing, threadpool=nothing) # The spawn= keyword argument was added in Julia v1.3, and cannot be used with the # deprecated keyword arguments `ctype=` or `csize=`. - if (ctype !== nothing || csize !== nothing) && spawn !== nothing - throw(ArgumentError("Cannot set `spawn=` in the deprecated constructor `Channel(f; ctype=Any, csize=0)`. Please use `Channel{T=Any}(f, size=0; taskref=nothing, spawn=false)` instead!")) + if (ctype !== nothing || csize !== nothing) && (spawn !== nothing || threadpool !== nothing) + throw(ArgumentError("Cannot set `spawn=` or `threadpool=` in the deprecated constructor `Channel(f; ctype=Any, csize=0)`. Please use `Channel{T=Any}(f, size=0; taskref=nothing, spawn=false, threadpool=nothing)` instead!")) end # Set the actual default values for the arguments. ctype === nothing && (ctype = Any) csize === nothing && (csize = 0) spawn === nothing && (spawn = false) - return Channel{ctype}(func, csize; taskref=taskref, spawn=spawn) + return Channel{ctype}(func, csize; taskref=taskref, spawn=spawn, threadpool=threadpool) end closed_exception() = InvalidStateException("Channel is closed.", :closed) diff --git a/test/channel_threadpool.jl b/test/channel_threadpool.jl new file mode 100644 index 0000000000000..4509604087fa8 --- /dev/null +++ b/test/channel_threadpool.jl @@ -0,0 +1,14 @@ +# This file is a part of Julia. License is MIT: https://julialang.org/license + +using Test +using Base.Threads + +@testset "Task threadpools" begin + c = Channel{Symbol}() do c; put!(c, threadpool(current_task())); end + @test take!(c) === threadpool(current_task()) + c = Channel{Symbol}(spawn = true) do c; put!(c, threadpool(current_task())); end + @test take!(c) === :default + c = Channel{Symbol}(threadpool = :interactive) do c; put!(c, threadpool(current_task())); end + @test take!(c) === :interactive + @test_throws ArgumentError Channel{Symbol}(threadpool = :foo) do c; put!(c, :foo); end +end diff --git a/test/channels.jl b/test/channels.jl index dbda5cf069081..36fec7b842de1 100644 --- a/test/channels.jl +++ b/test/channels.jl @@ -107,6 +107,11 @@ end @test taskref[].sticky == false @test collect(c) == [0] end +let cmd = `$(Base.julia_cmd()) --depwarn=error --rr-detach --startup-file=no channel_threadpool.jl` + new_env = copy(ENV) + new_env["JULIA_NUM_THREADS"] = "1,1" + run(pipeline(setenv(cmd, new_env), stdout = stdout, stderr = stderr)) +end @testset "multiple concurrent put!/take! on a channel for different sizes" begin function testcpt(sz)