Skip to content

Commit

Permalink
Catch interrupts in phyperopt, return current data (#68)
Browse files Browse the repository at this point in the history
* Catch interrupts in phyperopt, return current data

* add a done_channel to keep track of when stopped

* try to fix test

* remove interrupt tests (not working)
  • Loading branch information
albheim committed Jan 31, 2022
1 parent c62e635 commit e207cec
Showing 1 changed file with 48 additions and 25 deletions.
73 changes: 48 additions & 25 deletions src/Hyperopt.jl
Expand Up @@ -214,31 +214,50 @@ function pmacrobody(ex, params, ho_, pmap=pmap)
# <https://docs.julialang.org/en/v1/manual/distributed-computing/#Local-invocations>
put!(ho_channel, deepcopy(ho))

# We don't care about the results of the `pmap` since we update the hyperoptimizer
# inside the loop.
$(esc(pmap))(1:ho.iterations) do i
# We take the hyperoptimizer out of the channel, and get our new parameter values
local_ho = take!(ho_channel)
# We use `update_history` because we want to only update the history once we've
# finished the iteration and have a result to report back as well. Otherwise,
# some processes may observe the Hyperoptimizer in an inconsistent state with
# `length(ho.history) > length(ho.results)`. Moreover, if one run is very quick
# the history and results could become out of order with respect to one another.
$(Expr(:tuple, esc.(params)...)), _ = iterate(local_ho, i; update_history = false)
# Now we put it back so another processor can use the hyperoptimizer
put!(ho_channel, local_ho)

# Now run the objective
res = $(esc(ex.args[2])) # ex.args[2] = Body of the For loop

# Now update the results; we again grab the one true hyperoptimizer,
# and populate it's history and result.
local_ho = take!(ho_channel)
push!(local_ho.history, $(Expr(:tuple, esc.(params[2:end])...)))
push!(local_ho.results, res)
put!(ho_channel, local_ho)

res
# pmap continues running a few lefterover jobs when interrupted, this is to stop that
done_channel = RemoteChannel(() -> Channel{Bool}(1), 1)
put!(done_channel, false)

try
# We don't care about the results of the `pmap` since we update the hyperoptimizer
# inside the loop.
$(esc(pmap))(1:ho.iterations) do i
isdone = take!(done_channel)
put!(done_channel, isdone)
isdone && return

# We take the hyperoptimizer out of the channel, and get our new parameter values
local_ho = take!(ho_channel)
# We use `update_history` because we want to only update the history once we've
# finished the iteration and have a result to report back as well. Otherwise,
# some processes may observe the Hyperoptimizer in an inconsistent state with
# `length(ho.history) > length(ho.results)`. Moreover, if one run is very quick
# the history and results could become out of order with respect to one another.
$(Expr(:tuple, esc.(params)...)), _ = iterate(local_ho, i; update_history = false)
# Now we put it back so another processor can use the hyperoptimizer
put!(ho_channel, local_ho)

# Now run the objective
res = $(esc(ex.args[2])) # ex.args[2] = Body of the For loop

# Now update the results; we again grab the one true hyperoptimizer,
# and populate it's history and result.
local_ho = take!(ho_channel)
push!(local_ho.history, $(Expr(:tuple, esc.(params[2:end])...)))
push!(local_ho.results, res)
put!(ho_channel, local_ho)

res
end
catch e
take!(done_channel)
put!(done_channel, true)
interrupt() # Stop all workers
if e isa InterruptException
@info "Aborting hyperoptimization"
else
rethrow()
end
end

# What we get out of the channel is an updated copy of our original hyperoptimizer.
Expand All @@ -255,6 +274,10 @@ function pmacrobody(ex, params, ho_, pmap=pmap)
empty!(ho.results)
append!(ho.results, updated_ho.results)

# Do this last in case remaining runs are slow at starting so they can take the channel and stop
# properly without erroring (happens if accessing channel after closing)
close(done_channel)

ho
end
workaround_function()
Expand Down

0 comments on commit e207cec

Please sign in to comment.