Skip to content

Conversation

@MasonProtter
Copy link
Contributor

@MasonProtter MasonProtter commented Mar 4, 2020

Hello, I've occassionally encountered confusion online from people who think that Threads.@threads is using the new depth first scheduling routine and run into problems when trying to nest Threads.@threads loops within other multi-threaded code.

I wrote up a quick macro that roughly mimics the behaviour of Threads.@threads but instead uses Threads.@spawn inside, but will only spawn at most Threads.nthreads() + 1 tasks.

Here's an implementation of the macro that should work on Julia 1.3+ (functionally identical to the one implemented in this PR):

macro mythreads(expr::Expr)
    @assert expr.head == :for
    @assert length(expr.args) == 2
    @assert length(expr.args[1].args) == 2
    loopvar   = expr.args[1].args[1]
    iter      = expr.args[1].args[2]
    loop_body = expr.args[2]

    rng = gensym(:rng)
    quote
        @sync for $rng in $(Iterators.partition)($iter, length($iter) ÷ Threads.nthreads())
            Threads.@spawn begin
                for $loopvar in $rng
                    $loop_body
                end
            end
        end
    end |> esc
end

So that

@mythreads for i in eachindex(xs)
    out[i] = sin(xs[i])
end

will get turned into

@sync for rng in partition(eachindex(xs), length(eachindex(xs)) ÷ Threads.nthreads())
    Threads.@spawn begin
        for i in rng
            out[i] = sin(xs[i])
        end
    end
end

This makes it so that a for loop only spawns a number of tasks equal to Threads.nthreads() (or Threads.nthreads()+1 if the division has a remainder).

Here's a performance comparison on Julia 1.4.0-rc2 with 2 threads:

using BenchmarkTools

f1!(xs, out) = Threads.@threads for i in eachindex(xs)
    out[i] = sin(xs[i])
end

f2!(xs, out) = @mythreads for i in eachindex(xs)
    out[i] = sin(xs[i])
end

julia> let
           xs = randn(10_000)
           out1 = similar(xs)
           out2 = similar(xs)
           @btime f1!($xs, $out1)
           @btime f2!($xs, $out2)
           out1  out2
       end
  75.158 μs (13 allocations: 1.39 KiB)
  56.488 μs (15 allocations: 1.55 KiB)
true

I'm not sure if it is intentional or not that the current implementation of Threads.@threads does not use PARTR and I also suspect there are things about my implementation of this macro that are not up the Base Julia snuff, hence the RFC.

Thoughts, comments and code review much appreciated.

@DilumAluthge
Copy link
Member

Should we just deprecate Threads.@threads in favor of Threads.@spawn?

@MasonProtter
Copy link
Contributor Author

Should we just deprecate Threads.@threads in favor of Threads.@Spawn?

I'd strongly argue no. They do different things.

@DilumAluthge
Copy link
Member

Ah. Never mind then.

@JeffBezanson
Copy link
Member

#32477 also does this.

@MasonProtter
Copy link
Contributor Author

MasonProtter commented Mar 4, 2020

@DilumAluthge Threads.@threads is really handy when you have a very quick function being iterated over a very long iterator because it only spawns a number of tasks equal to Threads.@threads. If you naively did

@sync for i in 1:10_000
    Threads.@spawn out[i] = sin(x[i])
end 

you'd find that incredibly slow because spawning each task takes on the order of 1µs.

@MasonProtter
Copy link
Contributor Author

#32477 also does this.

Ah I see, I wasn't able to decode that title when I saw it originally. If this is redundant feel free to close it.

@MasonProtter
Copy link
Contributor Author

MasonProtter commented Mar 4, 2020

Is there an argument to be made that this is desirable independent of #32477? I found the old version of the _threadfor function a little verbose and hard to understand.

I think it's nice that this PR takes advantage of our already provided Iterators.partition and I think it results in more readable / maintainable code, seemingly without performance cost.

Comment on lines 26 to 28
@sync for $rng in $(Iterators.partition)($iter, $(length)($iter) ÷ $(nthreads)())
Base.Threads.@spawn begin
for $loopvar in $rng
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strictly speaking, I think this is a breaking change as the following code will behave differently before and after this PR:

@threads for ...
    @spawn ...
end

Before this PR, @spawn is not synchronized at the end of @threads block. After this PR, it is not thread-safe: #34666. You can make it thread-safe by adding @sync before for $loopvar in $rng although the behavior of @spawn is still different (it's synchronized by @threads).

Copy link
Contributor Author

@MasonProtter MasonProtter Mar 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mhm, this could conceivably change behaviour (though threading is experimental, so breaking changes are allowed), but before if you did

@threads for ...
    @spawn ...
end

then the @spawn would destructively interfere with the tasks spawned by @threads so arguably, there's not really any reason to have done that.

Are you able to cook up a valid test case that would have passed on the old version of @threads but would fail on this version?

Copy link
Contributor Author

@MasonProtter MasonProtter Mar 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see an error in the test suite provides an example of exactly what you referenced. Trying to fix now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(though threading is experimental, so breaking changes are allowed)

Yeah, I think I agree. Though it could be surprising if @threads acts implicitly as @sync.

FWIW, I think Threads.foreach would be a much better API as it doesn't hide the fact that you'll have a closure. It's also easy to add keyword options, e.g., for controlling the base case size for better load balancing. Something like this:

function Threads.foreach(f, xs::AbstractArray; basesize = length(xs) ÷ Threads.nthreads())
    @sync for p in Iterators.partition(xs, basesize)
        @spawn foreach(f, p)
    end
    return
end

(I'm experimenting this API here: https://github.com/tkf/ThreadsX.jl/blob/1b2f2a42094986361785b617c064e5fe8cda93af/src/map.jl#L26)

@threads can exist as a simple wrapper around Threads.foreach for backward compatibility. This trivially avoids the @sync scoping problem.

@quinnj
Copy link
Member

quinnj commented Apr 23, 2020

I think it's been mentioned elsewhere, but just wanted to add a voice that this would indeed be breaking; a few of us rely on the current behavior in order to start worker tasks on specific threads, such as the code here. I'd say as long as we have either: 1) a way to reserve a main thread where tasks couldn't be spawned, or 2) a way to still spawn a threaded task on a specific thread (e.g. Threads.@spawn 2 expr), then we could still get the behavior we're after.

@baggepinnen
Copy link
Contributor

Could the implementation in this pr simply live alongside the old one, under a different macro name? @partr_threads or similar

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants