Skip to content

Commit

Permalink
Remove context from move (#144)
Browse files Browse the repository at this point in the history
Fixes #143 
Avoids creation of a new context
Test that no new Context is created
Remove ctx from move
Remove pids from collect_remote
  • Loading branch information
DrChainsaw committed Sep 30, 2020
1 parent 98caa9d commit c678533
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 20 deletions.
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name = "Dagger"
uuid = "d58978e5-989f-55fb-8d15-ea34adc7bf54"
version = "0.9.2"
version = "0.10.0"

[deps]
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
Expand Down
8 changes: 4 additions & 4 deletions src/chunks.jl
Original file line number Diff line number Diff line change
Expand Up @@ -66,22 +66,22 @@ end
unrelease(c::Chunk) = c

collect_remote(chunk::Chunk) =
move(Context(), chunk.processor, OSProc(), poolget(chunk.handle))
move(chunk.processor, OSProc(), poolget(chunk.handle))
function collect(ctx::Context, chunk::Chunk; options=nothing)
# delegate fetching to handle by default.
if chunk.handle isa DRef && !(chunk.processor isa OSProc)
return remotecall_fetch(collect_remote, chunk.handle.owner, chunk)
elseif chunk.handle isa FileRef
return poolget(chunk.handle)
else
return move(ctx, chunk.processor, OSProc(), chunk.handle)
return move(chunk.processor, OSProc(), chunk.handle)
end
end
collect(ctx::Context, ref::DRef; options=nothing) =
move(ctx, OSProc(ref.owner), OSProc(), ref)
move(OSProc(ref.owner), OSProc(), ref)
collect(ctx::Context, ref::FileRef; options=nothing) =
poolget(ref)
move(ctx, from_proc::OSProc, to_proc::OSProc, ref::Union{DRef, FileRef}) =
move(from_proc::OSProc, to_proc::OSProc, ref::Union{DRef, FileRef}) =
poolget(ref)


Expand Down
14 changes: 7 additions & 7 deletions src/processor.jl
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ moving back to `from_proc` before being serialized over the wire as needed.
The default implementation breaks a single `move` call down into a sequence of
`move` calls, and is not intended to be maximally efficient.
"""
function move(ctx, from_proc::Processor, to_proc::Processor, x)
function move(from_proc::Processor, to_proc::Processor, x)
if from_proc == to_proc
return x
end
Expand All @@ -89,7 +89,7 @@ function move(ctx, from_proc::Processor, to_proc::Processor, x)
remote_proc = parent_proc
local_proc = OSProc()
@debug "(Network) moving $remote_proc to $local_proc"
x = move(ctx, remote_proc, local_proc, x)
x = move(remote_proc, local_proc, x)

# Move to to_proc
parent_proc = get_parent(to_proc)
Expand All @@ -102,7 +102,7 @@ function move(ctx, from_proc::Processor, to_proc::Processor, x)
while !isempty(path)
next_proc = pop!(path)
@debug "(Local) moving $last_proc to $next_proc"
x = move(ctx, last_proc, next_proc, x)
x = move(last_proc, next_proc, x)
last_proc = next_proc
end
return x
Expand All @@ -117,7 +117,7 @@ function move_to_osproc(parent_proc, x)
ctx = Context()
while !(parent_proc isa OSProc)
grandparent_proc = get_parent(parent_proc)
x = move(ctx, parent_proc, grandparent_proc, x)
x = move(parent_proc, grandparent_proc, x)
parent_proc = grandparent_proc
end
return x, parent_proc
Expand Down Expand Up @@ -203,7 +203,7 @@ function Base.show(io::IO, pex::ProcessorSelectionException)
print(io, " Arguments: $(pex.args)")
end

move(ctx, from_proc::OSProc, to_proc::OSProc, x) = x
move(from_proc::OSProc, to_proc::OSProc, x) = x
execute!(proc::OSProc, f, args...) = f(args...)
default_enabled(proc::OSProc) = true

Expand All @@ -219,8 +219,8 @@ end
iscompatible(proc::ThreadProc, opts, f, args...) = true
iscompatible_func(proc::ThreadProc, opts, f) = true
iscompatible_arg(proc::ThreadProc, opts, x) = true
move(ctx, from_proc::OSProc, to_proc::ThreadProc, x) = x
move(ctx, from_proc::ThreadProc, to_proc::OSProc, x) = x
move(from_proc::OSProc, to_proc::ThreadProc, x) = x
move(from_proc::ThreadProc, to_proc::OSProc, x) = x
@static if VERSION >= v"1.3.0-DEV.573"
execute!(proc::ThreadProc, f, args...) = fetch(Threads.@spawn f(args...))
else
Expand Down
2 changes: 1 addition & 1 deletion src/scheduler.jl
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ end
to_proc = choose_processor(from_proc, options, f, fetched)
fetched = map(Iterators.zip(fetched,ids)) do (x, id)
@dbg timespan_start(ctx, :move, (thunk_id, id), (f, id))
x = move(ctx, from_proc, to_proc, x)
x = move(from_proc, to_proc, x)
@dbg timespan_end(ctx, :move, (thunk_id, id), (f, id))
return x
end
Expand Down
6 changes: 3 additions & 3 deletions test/fakeproc.jl
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ fakesum(xs...) = FakeVal(sum(map(y->y.x, xs)))
Dagger.iscompatible_func(proc::FakeProc, opts, f) = true
Dagger.iscompatible_arg(proc::FakeProc, opts, x::Integer) = true
Dagger.iscompatible_arg(proc::FakeProc, opts, x::FakeVal) = true
Dagger.move(ctx, from_proc::OSProc, to_proc::FakeProc, x::Integer) = FakeVal(x)
Dagger.move(ctx, from_proc::FakeProc, to_proc::OSProc, x::Vector) =
Dagger.move(from_proc::OSProc, to_proc::FakeProc, x::Integer) = FakeVal(x)
Dagger.move(from_proc::FakeProc, to_proc::OSProc, x::Vector) =
map(y->y.x, x)
Dagger.move(ctx, from_proc::FakeProc, to_proc::OSProc, x::FakeVal) =
Dagger.move(from_proc::FakeProc, to_proc::OSProc, x::FakeVal) =
x.x
Dagger.execute!(proc::FakeProc, func, args...) = FakeVal(42+func(args...).x)

Expand Down
8 changes: 4 additions & 4 deletions test/processors.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ struct PathProc <: Dagger.Processor
owner::Int
end
Dagger.get_parent(proc::PathProc) = OSProc(proc.owner)
Dagger.move(ctx, ::PathProc, ::OSProc, x::Float64) = x+1
Dagger.move(ctx, ::OSProc, ::PathProc, x::Float64) = x+2
Dagger.move(::PathProc, ::OSProc, x::Float64) = x+1
Dagger.move(::OSProc, ::PathProc, x::Float64) = x+2
Dagger.iscompatible(proc::PathProc, opts, f, args...) = true
Dagger.execute!(proc::PathProc, func, args...) = func(args...)

Expand Down Expand Up @@ -54,7 +54,7 @@ end
tp = ThreadProc(1, 1)
op = get_parent(tp)
value = rand()
moved_value = Dagger.move(ctx, tp, op, Dagger.move(ctx, op, tp, value))
moved_value = Dagger.move(tp, op, Dagger.move(op, tp, value))
@test value === moved_value
end
@testset "Generic path move()" begin
Expand All @@ -63,7 +63,7 @@ end
proc1 = first(filter(x->x isa PathProc, get_processors(OSProc(1))))
proc2 = first(filter(x->x isa PathProc, get_processors(OSProc(2))))
value = rand()
moved_value = Dagger.move(ctx, proc1, proc2, value)
moved_value = Dagger.move(proc1, proc2, value)
@test moved_value == value+3
@everywhere pop!(Dagger.PROCESSOR_CALLBACKS)
end
Expand Down
27 changes: 27 additions & 0 deletions test/scheduler.jl
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,31 @@ end
end
@everywhere (pop!(Dagger.PROCESSOR_CALLBACKS); empty!(Dagger.OSPROC_CACHE))

@testset "Add new workers" begin
using Distributed
ps1 = addprocs(2, exeflags="--project");

@everywhere begin
using Dagger, Distributed
# Condition to guarantee that processing is not completed before we add new workers
c = Condition()
function testfun(i)
i < 2 && return myid()
wait(c)
return myid()
end
end

ts = delayed(vcat)((delayed(testfun)(i) for i in 1:4)...);
job = @async collect(Context(ps1), ts);

ps2 = addprocs(2, exeflags="--project");

while !istaskdone(job)
@everywhere ps1 notify(c)
end
@test fetch(job) |> unique |> sort == ps1

wait(rmprocs(vcat(ps1,ps2)))
end
end

2 comments on commit c678533

@jpsamaroo
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator register()

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration pull request created: JuliaRegistries/General/22216

After the above pull request is merged, it is recommended that a tag is created on this repository for the registered package version.

This will be done automatically if the Julia TagBot GitHub Action is installed, or can be done manually through the github interface, or via:

git tag -a v0.10.0 -m "<description of version>" c6785336d5d0fbcea1f96cc17e2a12cac2ebcf45
git push origin v0.10.0

Please sign in to comment.