diff --git a/base/channels.jl b/base/channels.jl index 33365c03e5d3d..4ba95f8f91082 100644 --- a/base/channels.jl +++ b/base/channels.jl @@ -59,7 +59,7 @@ Channel(sz=0) = Channel{Any}(sz) # special constructors """ - Channel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false) + Channel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing) Create a new task from `func`, bind it to a new channel of type `T` and size `size`, and schedule the task, all in a single call. @@ -70,9 +70,14 @@ The channel is automatically closed when the task terminates. If you need a reference to the created task, pass a `Ref{Task}` object via the keyword argument `taskref`. -If `spawn = true`, the Task created for `func` may be scheduled on another thread +If `spawn=true`, the `Task` created for `func` may be scheduled on another thread in parallel, equivalent to creating a task via [`Threads.@spawn`](@ref). +If `spawn=true` and the `threadpool` argument is not set, it defaults to `:default`. + +If the `threadpool` argument is set (to `:default` or `:interactive`), this implies +that `spawn=true` and the new Task is spawned to the specified threadpool. + Return a `Channel`. # Examples @@ -117,6 +122,9 @@ true In earlier versions of Julia, Channel used keyword arguments to set `size` and `T`, but those constructors are deprecated. +!!! compat "Julia 1.9" + The `threadpool=` argument was added in Julia 1.9. + ```jldoctest julia> chnl = Channel{Char}(1, spawn=true) do ch for c in "hello world" @@ -129,12 +137,18 @@ julia> String(collect(chnl)) "hello world" ``` """ -function Channel{T}(func::Function, size=0; taskref=nothing, spawn=false) where T +function Channel{T}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing) where T chnl = Channel{T}(size) task = Task(() -> func(chnl)) + if threadpool === nothing + threadpool = :default + else + spawn = true + end task.sticky = !spawn bind(chnl, task) if spawn + Threads._spawn_set_thrpool(task, threadpool) schedule(task) # start it on (potentially) another thread else yield(task) # immediately start it, yielding the current thread @@ -149,17 +163,17 @@ Channel(func::Function, args...; kwargs...) = Channel{Any}(func, args...; kwargs # of course not deprecated.) # We use `nothing` default values to check which arguments were set in order to throw the # deprecation warning if users try to use `spawn=` with `ctype=` or `csize=`. -function Channel(func::Function; ctype=nothing, csize=nothing, taskref=nothing, spawn=nothing) +function Channel(func::Function; ctype=nothing, csize=nothing, taskref=nothing, spawn=nothing, threadpool=nothing) # The spawn= keyword argument was added in Julia v1.3, and cannot be used with the # deprecated keyword arguments `ctype=` or `csize=`. - if (ctype !== nothing || csize !== nothing) && spawn !== nothing - throw(ArgumentError("Cannot set `spawn=` in the deprecated constructor `Channel(f; ctype=Any, csize=0)`. Please use `Channel{T=Any}(f, size=0; taskref=nothing, spawn=false)` instead!")) + if (ctype !== nothing || csize !== nothing) && (spawn !== nothing || threadpool !== nothing) + throw(ArgumentError("Cannot set `spawn=` or `threadpool=` in the deprecated constructor `Channel(f; ctype=Any, csize=0)`. Please use `Channel{T=Any}(f, size=0; taskref=nothing, spawn=false, threadpool=nothing)` instead!")) end # Set the actual default values for the arguments. ctype === nothing && (ctype = Any) csize === nothing && (csize = 0) spawn === nothing && (spawn = false) - return Channel{ctype}(func, csize; taskref=taskref, spawn=spawn) + return Channel{ctype}(func, csize; taskref=taskref, spawn=spawn, threadpool=threadpool) end closed_exception() = InvalidStateException("Channel is closed.", :closed) diff --git a/test/channel_threadpool.jl b/test/channel_threadpool.jl new file mode 100644 index 0000000000000..4509604087fa8 --- /dev/null +++ b/test/channel_threadpool.jl @@ -0,0 +1,14 @@ +# This file is a part of Julia. License is MIT: https://julialang.org/license + +using Test +using Base.Threads + +@testset "Task threadpools" begin + c = Channel{Symbol}() do c; put!(c, threadpool(current_task())); end + @test take!(c) === threadpool(current_task()) + c = Channel{Symbol}(spawn = true) do c; put!(c, threadpool(current_task())); end + @test take!(c) === :default + c = Channel{Symbol}(threadpool = :interactive) do c; put!(c, threadpool(current_task())); end + @test take!(c) === :interactive + @test_throws ArgumentError Channel{Symbol}(threadpool = :foo) do c; put!(c, :foo); end +end diff --git a/test/channels.jl b/test/channels.jl index e6b85e93b0226..09f973922235f 100644 --- a/test/channels.jl +++ b/test/channels.jl @@ -107,6 +107,11 @@ end @test taskref[].sticky == false @test collect(c) == [0] end +let cmd = `$(Base.julia_cmd()) --depwarn=error --rr-detach --startup-file=no channel_threadpool.jl` + new_env = copy(ENV) + new_env["JULIA_NUM_THREADS"] = "1,1" + run(pipeline(setenv(cmd, new_env), stdout = stdout, stderr = stderr)) +end @testset "multiple concurrent put!/take! on a channel for different sizes" begin function testcpt(sz)