Skip to content

Commit

Permalink
Cleanup whitespace and method signatures
Browse files Browse the repository at this point in the history
  • Loading branch information
jpsamaroo committed Oct 6, 2020
1 parent 1f0538e commit 928b7ad
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 53 deletions.
2 changes: 1 addition & 1 deletion src/fault-handler.jl
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ function handle_fault(ctx, state, thunk, oldproc, chan, node_order)

# Reschedule inputs from deadlist
ps = procs(ctx)
@assert !isempty(ps) "No processes left!"
@assert !isempty(ps) "No workers left for fault handling!"
newproc = rand(ps)

while length(deadlist) > 0
Expand Down
12 changes: 6 additions & 6 deletions src/processor.jl
Original file line number Diff line number Diff line change
Expand Up @@ -286,27 +286,27 @@ Base.lock(f, ctx::Context) = lock(f, ctx.proc_lock)
"""
addprocs!(ctx::Context, xs)
Add new workers `xs` to `ctx`.
Add new workers `xs` to `ctx`.
Workers will typically be assigned new tasks in the next scheduling iteration if scheduling is ongoing.
Workers can be either `Processor`s or the underlying process ids as `Integer`s.
Workers can be either `Processor`s or the underlying process IDs as `Integer`s.
"""
addprocs!(ctx::Context, xs::AbstractVector{<:Integer}) = addprocs!(ctx, map(OSProc, xs))
addprocs!(ctx::Context, xs::AbstractVector{<:Processor}) = lock(ctx) do
addprocs!(ctx::Context, xs::AbstractVector{<:Processor}) = lock(ctx) do
append!(ctx.procs, xs)
end

"""
rmprocs!(ctx::Context, xs)
Remove the specified workers `xs` from `ctx`.
Workers will typically finish all their assigned tasks if scheduling is ongoing but will not be assigned new tasks after removal.
Workers can be either `Processors` or the underlying process ids as `Integer`s.
Workers can be either `Processor`s or the underlying process IDs as `Integer`s.
"""
rmprocs!(ctx::Context, xs::AbstractVector{<:Integer}) = rmprocs!(ctx, map(OSProc, xs))
rmprocs!(ctx::Context, xs::AbstractVector{<:Processor}) = lock(ctx) do
rmprocs!(ctx::Context, xs::AbstractVector{<:Processor}) = lock(ctx) do
filter!(p -> p xs, ctx.procs)
end

38 changes: 19 additions & 19 deletions src/scheduler.jl
Original file line number Diff line number Diff line change
Expand Up @@ -106,21 +106,24 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())
end
end

# This is a bit redundant as the @async task below does basically the same job
# Without it though, testing of process modification becomes non-deterministic
# (due to sleep in CI environment) which is why it is still here.
# This is a bit redundant as the @async task below does basically the
# same job Without it though, testing of process modification becomes
# non-deterministic (due to sleep in CI environment) which is why it is
# still here.
procs_state = assign_new_procs!(ctx, state, chan, node_order, procs_state)

if isempty(state.running)
# the block above fired only meta tasks
continue
end

check_integrity(ctx)
# Check periodically for new workers in a parallel task so that we don't accidentally end up
# having to wait for 'take!(chan)' on some large task before new workers are put to work
# Lock is used to stop this task as soon as something pops out from the channel to minimize
# risk that the task schedules thunks simultaneously as the main task (after future refactoring).

# Check periodically for new workers in a parallel task so that we
# don't accidentally end up having to wait for `take!(chan)` on some
# large task before new workers are put to work. Locking is used to
# stop this task as soon as something pops out from the channel to
# minimize risk that the task schedules thunks simultaneously as the
# main task (after future refactoring).
newtasks_lock = ReentrantLock()
@async while !isempty(state.ready) || !isempty(state.running)
sleep(1)
Expand Down Expand Up @@ -152,7 +155,7 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())

@dbg timespan_start(ctx, :scheduler, thunk_id, master)
immediate_next = finish_task!(state, node, node_order)
if !isempty(state.ready) && !shall_remove_proc(ctx, proc, immediate_next)
if !isempty(state.ready) && !shall_remove_proc(ctx, proc)
thunk = pop_with_affinity!(Context(procs_to_use(ctx)), state.ready, proc, immediate_next)
if thunk !== nothing
fire_task!(ctx, thunk, proc, state, chan, node_order)
Expand All @@ -163,18 +166,16 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())
state.cache[d]
end

procs_to_use(ctx) = procs_to_use(ctx, ctx.options)
function procs_to_use(ctx, options)
function procs_to_use(ctx, options=ctx.options)
return if options.single !== 0
@assert options.single in vcat(1, workers()) "Sch option 'single' must specify an active worker id"
@assert options.single in vcat(1, workers()) "Sch option `single` must specify an active worker ID."
OSProc[OSProc(options.single)]
else
procs(ctx)
end
end

check_integrity(ctx) = check_integrity(ctx, ctx.options)
check_integrity(ctx, ::Any) = @assert !isempty(procs_to_use(ctx)) "No workers available!!"
check_integrity(ctx) = @assert !isempty(procs_to_use(ctx)) "No suitable workers available in context."

# Main responsibility of this function is to check if new procs have been pushed to the context
function assign_new_procs!(ctx, state, chan, node_order, assignedprocs=[])
Expand All @@ -191,11 +192,10 @@ function assign_new_procs!(ctx, state, chan, node_order, assignedprocs=[])
end

# Might be a good policy to not remove the proc if immediate_next
shall_remove_proc(ctx, proc, immediate_next) = proc procs_to_use(ctx)
shall_remove_proc(ctx, proc) = proc procs_to_use(ctx)

remove_dead_proc!(ctx, proc) = remove_dead_proc!(ctx, ctx.options, proc)
function remove_dead_proc!(ctx, options, proc)
@assert options.single !== proc.pid "Single worker failed!"
function remove_dead_proc!(ctx, proc, options=ctx.options)
@assert options.single !== proc.pid "Single worker failed, cannot continue."
rmprocs!(ctx, [proc])
end

Expand Down
2 changes: 1 addition & 1 deletion test/processors.jl
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ end
@testset "Modify workers in Context" begin
ps = addprocs(4, exeflags="--project")
@everywhere ps using Dagger

ctx = Context(ps[1:2])

Dagger.addprocs!(ctx, ps[3:end])
Expand Down
51 changes: 25 additions & 26 deletions test/scheduler.jl
Original file line number Diff line number Diff line change
Expand Up @@ -68,41 +68,41 @@ end

@testset "Modify workers in running job" begin
# Test that we can add/remove workers while scheduler is running.
# As this requires asynchronity a flag is used to stall the tasks to
# ensure workers are actually modified while the scheduler is working
# As this requires asynchronity, a flag is used to stall the tasks to
# ensure workers are actually modified while the scheduler is working.

setup = quote
using Dagger, Distributed
# blocked is to guarantee that processing is not completed before we add new workers
# Note: blocked is used in expressions below
blocked = true
function testfun(i)
using Dagger, Distributed
# blocked is to guarantee that processing is not completed before we add new workers
# Note: blocked is used in expressions below
blocked = true
function testfun(i)
i < 4 && return myid()
# Wait for test to do its thing before we proceed
while blocked
sleep(0.001)
end
return myid()
end
end
end

@testset "Add new workers" begin
ps = []
try
try
ps1 = addprocs(2, exeflags="--project")
append!(ps, ps1)

@everywhere vcat(ps1, myid()) $setup

ts = delayed(vcat)((delayed(testfun)(i) for i in 1:10)...)

ctx = Context(ps1)
job = @async collect(ctx, ts)

while !istaskstarted(job)
while !istaskstarted(job)
sleep(0.001)
end

# Will not be added, so they should never appear in output
ps2 = addprocs(2, exeflags="--project")
append!(ps, ps2)
Expand All @@ -112,67 +112,66 @@ end
@everywhere ps3 $setup
addprocs!(ctx, ps3)
@test length(procs(ctx)) == 4

@everywhere vcat(ps1, ps3) blocked=false

@test fetch(job) isa Vector
@test fetch(job) |> unique |> sort == vcat(ps1, ps3)

finally
wait(rmprocs(ps))
end
end

@testset "Remove workers" begin
ps = []
try
try
ps1 = addprocs(4, exeflags="--project")
append!(ps, ps1)

@everywhere vcat(ps1, myid()) $setup

ts = delayed(vcat)((delayed(testfun)(i) for i in 1:16)...)

ctx = Context(ps1)
job = @async collect(ctx, ts)

while !istaskstarted(job)
while !istaskstarted(job)
sleep(0.001)
end

rmprocs!(ctx, ps1[3:end])
@test length(procs(ctx)) == 2

@everywhere ps1 blocked=false

res = fetch(job)
@test res isa Vector
# First all four workers will report their IDs without hassle
# Then all four will be waiting for the Condition
# While they are waiting ps1[3:end] are removed, but when the Condition is notified they will finish their tasks before being removed
# Will probably break if workers are assigned more than one Thunk
# Then all four will be waiting for the Condition While they
# are waiting ps1[3:end] are removed, but when the Condition is
# notified they will finish their tasks before being removed
# Will probably break if workers are assigned more than one Thunk
@test res[1:8] |> unique |> sort == ps1
@test all(pid -> pid in ps1[1:2], res[9:end])

finally
wait(rmprocs(ps))
end
end

@testset "Remove all workers throws" begin
ps = []
try
try
ps1 = addprocs(2, exeflags="--project")
append!(ps, ps1)

@everywhere vcat(ps1, myid()) $setup

ts = delayed(vcat)((delayed(testfun)(i) for i in 1:16)...)

ctx = Context(ps1)
job = @async collect(ctx, ts)

while !istaskstarted(job)
while !istaskstarted(job)
sleep(0.001)
end

Expand Down

0 comments on commit 928b7ad

Please sign in to comment.