diff --git a/Project.toml b/Project.toml index 2587dd3d..c58875cc 100644 --- a/Project.toml +++ b/Project.toml @@ -1,6 +1,6 @@ name = "Dagger" uuid = "d58978e5-989f-55fb-8d15-ea34adc7bf54" -version = "0.9.2" +version = "0.10.0" [deps] Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" diff --git a/src/chunks.jl b/src/chunks.jl index 795091ce..5246d739 100644 --- a/src/chunks.jl +++ b/src/chunks.jl @@ -66,7 +66,7 @@ end unrelease(c::Chunk) = c collect_remote(chunk::Chunk) = - move(Context(), chunk.processor, OSProc(), poolget(chunk.handle)) + move(chunk.processor, OSProc(), poolget(chunk.handle)) function collect(ctx::Context, chunk::Chunk; options=nothing) # delegate fetching to handle by default. if chunk.handle isa DRef && !(chunk.processor isa OSProc) @@ -74,14 +74,14 @@ function collect(ctx::Context, chunk::Chunk; options=nothing) elseif chunk.handle isa FileRef return poolget(chunk.handle) else - return move(ctx, chunk.processor, OSProc(), chunk.handle) + return move(chunk.processor, OSProc(), chunk.handle) end end collect(ctx::Context, ref::DRef; options=nothing) = - move(ctx, OSProc(ref.owner), OSProc(), ref) + move(OSProc(ref.owner), OSProc(), ref) collect(ctx::Context, ref::FileRef; options=nothing) = poolget(ref) -move(ctx, from_proc::OSProc, to_proc::OSProc, ref::Union{DRef, FileRef}) = +move(from_proc::OSProc, to_proc::OSProc, ref::Union{DRef, FileRef}) = poolget(ref) diff --git a/src/processor.jl b/src/processor.jl index c088a91a..77bab713 100644 --- a/src/processor.jl +++ b/src/processor.jl @@ -75,7 +75,7 @@ moving back to `from_proc` before being serialized over the wire as needed. The default implementation breaks a single `move` call down into a sequence of `move` calls, and is not intended to be maximally efficient. """ -function move(ctx, from_proc::Processor, to_proc::Processor, x) +function move(from_proc::Processor, to_proc::Processor, x) if from_proc == to_proc return x end @@ -89,7 +89,7 @@ function move(ctx, from_proc::Processor, to_proc::Processor, x) remote_proc = parent_proc local_proc = OSProc() @debug "(Network) moving $remote_proc to $local_proc" - x = move(ctx, remote_proc, local_proc, x) + x = move(remote_proc, local_proc, x) # Move to to_proc parent_proc = get_parent(to_proc) @@ -102,7 +102,7 @@ function move(ctx, from_proc::Processor, to_proc::Processor, x) while !isempty(path) next_proc = pop!(path) @debug "(Local) moving $last_proc to $next_proc" - x = move(ctx, last_proc, next_proc, x) + x = move(last_proc, next_proc, x) last_proc = next_proc end return x @@ -117,7 +117,7 @@ function move_to_osproc(parent_proc, x) ctx = Context() while !(parent_proc isa OSProc) grandparent_proc = get_parent(parent_proc) - x = move(ctx, parent_proc, grandparent_proc, x) + x = move(parent_proc, grandparent_proc, x) parent_proc = grandparent_proc end return x, parent_proc @@ -203,7 +203,7 @@ function Base.show(io::IO, pex::ProcessorSelectionException) print(io, " Arguments: $(pex.args)") end -move(ctx, from_proc::OSProc, to_proc::OSProc, x) = x +move(from_proc::OSProc, to_proc::OSProc, x) = x execute!(proc::OSProc, f, args...) = f(args...) default_enabled(proc::OSProc) = true @@ -219,8 +219,8 @@ end iscompatible(proc::ThreadProc, opts, f, args...) = true iscompatible_func(proc::ThreadProc, opts, f) = true iscompatible_arg(proc::ThreadProc, opts, x) = true -move(ctx, from_proc::OSProc, to_proc::ThreadProc, x) = x -move(ctx, from_proc::ThreadProc, to_proc::OSProc, x) = x +move(from_proc::OSProc, to_proc::ThreadProc, x) = x +move(from_proc::ThreadProc, to_proc::OSProc, x) = x @static if VERSION >= v"1.3.0-DEV.573" execute!(proc::ThreadProc, f, args...) = fetch(Threads.@spawn f(args...)) else diff --git a/src/scheduler.jl b/src/scheduler.jl index 5f2fccd8..b6c17c65 100644 --- a/src/scheduler.jl +++ b/src/scheduler.jl @@ -343,7 +343,7 @@ end to_proc = choose_processor(from_proc, options, f, fetched) fetched = map(Iterators.zip(fetched,ids)) do (x, id) @dbg timespan_start(ctx, :move, (thunk_id, id), (f, id)) - x = move(ctx, from_proc, to_proc, x) + x = move(from_proc, to_proc, x) @dbg timespan_end(ctx, :move, (thunk_id, id), (f, id)) return x end diff --git a/test/fakeproc.jl b/test/fakeproc.jl index adfe4009..0134428e 100644 --- a/test/fakeproc.jl +++ b/test/fakeproc.jl @@ -17,10 +17,10 @@ fakesum(xs...) = FakeVal(sum(map(y->y.x, xs))) Dagger.iscompatible_func(proc::FakeProc, opts, f) = true Dagger.iscompatible_arg(proc::FakeProc, opts, x::Integer) = true Dagger.iscompatible_arg(proc::FakeProc, opts, x::FakeVal) = true -Dagger.move(ctx, from_proc::OSProc, to_proc::FakeProc, x::Integer) = FakeVal(x) -Dagger.move(ctx, from_proc::FakeProc, to_proc::OSProc, x::Vector) = +Dagger.move(from_proc::OSProc, to_proc::FakeProc, x::Integer) = FakeVal(x) +Dagger.move(from_proc::FakeProc, to_proc::OSProc, x::Vector) = map(y->y.x, x) -Dagger.move(ctx, from_proc::FakeProc, to_proc::OSProc, x::FakeVal) = +Dagger.move(from_proc::FakeProc, to_proc::OSProc, x::FakeVal) = x.x Dagger.execute!(proc::FakeProc, func, args...) = FakeVal(42+func(args...).x) diff --git a/test/processors.jl b/test/processors.jl index 3d0c477b..93ba8054 100644 --- a/test/processors.jl +++ b/test/processors.jl @@ -12,8 +12,8 @@ struct PathProc <: Dagger.Processor owner::Int end Dagger.get_parent(proc::PathProc) = OSProc(proc.owner) -Dagger.move(ctx, ::PathProc, ::OSProc, x::Float64) = x+1 -Dagger.move(ctx, ::OSProc, ::PathProc, x::Float64) = x+2 +Dagger.move(::PathProc, ::OSProc, x::Float64) = x+1 +Dagger.move(::OSProc, ::PathProc, x::Float64) = x+2 Dagger.iscompatible(proc::PathProc, opts, f, args...) = true Dagger.execute!(proc::PathProc, func, args...) = func(args...) @@ -54,7 +54,7 @@ end tp = ThreadProc(1, 1) op = get_parent(tp) value = rand() - moved_value = Dagger.move(ctx, tp, op, Dagger.move(ctx, op, tp, value)) + moved_value = Dagger.move(tp, op, Dagger.move(op, tp, value)) @test value === moved_value end @testset "Generic path move()" begin @@ -63,7 +63,7 @@ end proc1 = first(filter(x->x isa PathProc, get_processors(OSProc(1)))) proc2 = first(filter(x->x isa PathProc, get_processors(OSProc(2)))) value = rand() - moved_value = Dagger.move(ctx, proc1, proc2, value) + moved_value = Dagger.move(proc1, proc2, value) @test moved_value == value+3 @everywhere pop!(Dagger.PROCESSOR_CALLBACKS) end diff --git a/test/scheduler.jl b/test/scheduler.jl index 13d6fad9..2641a40b 100644 --- a/test/scheduler.jl +++ b/test/scheduler.jl @@ -66,4 +66,31 @@ end end @everywhere (pop!(Dagger.PROCESSOR_CALLBACKS); empty!(Dagger.OSPROC_CACHE)) + @testset "Add new workers" begin + using Distributed + ps1 = addprocs(2, exeflags="--project"); + + @everywhere begin + using Dagger, Distributed + # Condition to guarantee that processing is not completed before we add new workers + c = Condition() + function testfun(i) + i < 2 && return myid() + wait(c) + return myid() + end + end + + ts = delayed(vcat)((delayed(testfun)(i) for i in 1:4)...); + job = @async collect(Context(ps1), ts); + + ps2 = addprocs(2, exeflags="--project"); + + while !istaskdone(job) + @everywhere ps1 notify(c) + end + @test fetch(job) |> unique |> sort == ps1 + + wait(rmprocs(vcat(ps1,ps2))) + end end