/
Timeout.jl
98 lines (80 loc) · 3.62 KB
/
Timeout.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
__precompile__()
module Timeout
using Compat.Distributed
using Compat.Dates
using Compat: @warn
export ptimeout, @ptimeout
if isdefined(Base, :_UVError)
using Base: _UVError
else
const _UVError = Base.UVError
end
if isdefined(Base, :SIGTERM)
using Base: SIGTERM
else
const SIGTERM = 15 # just taking the value in 1.0
end
"""
ptimeout(f, limit; worker=1, poll=0.5, verbose=true)
Run the given function `f` in a separate process on worker `worker` for a maximum time
of `limit`, which can be a number of seconds or a `Dates.Period`. If the time limit is
reached, the remote process will be interrupted, first by sending SIGINTs and then by
sending a SIGTERM to forcibly kill the process if the interrupts are ineffective. If `f`
completed without timing out, `true` is returned, otherwise `false`.
The keyword argument `worker` selects the worker by process ID from the pool of worker
processes known to Julia, i.e. that which is returned by `workers()`. The chosen worker
cannot be the current process.
`poll` is the number of seconds to wait before rechecking whether `f` has finished
executing on the remote worker. For long running jobs, you may want to set this higher
than the default 0.5, which will poll every half second.
If `verbose` is `true`, a warning will be logged by the calling process before attempting
to interrupt.
!!! warn
The return value refers ONLY to whether the computation timed out. Computation of
`f` may have terminated due to an error that's unrelated to an interrupt sent by
`ptimeout`, and this will NOT be reflected in the return value.
"""
function ptimeout(f::Function, secs::Real; worker=1, poll=0.5, verbose=true)
nprocs() > 1 || throw(ArgumentError("No worker processes available"))
worker in workers() || throw(ArgumentError("Unknown worker process ID: $worker"))
worker == myid() && throw(ArgumentError("Can't run ptimeout on the current process"))
poll > 0 || throw(ArgumentError("Can't poll every $poll seconds"))
# We need the worker process to be on the same host as the calling process, otherwise
# sending a SIGTERM to the result of getpid might kill off something local
if gethostname() != remotecall_fetch(gethostname, worker)
throw(ArgumentError("Can't run ptimeout with a worker on a different host"))
end
# Now start by getting the OS process ID for the worker so that we have something to
# forcibly kill if need be
ospid = remotecall_fetch(getpid, worker)
# Run the function on the given worker, with a channel for communicating with the
# process so that checking isready won't block
channel = Channel(1)
@async put!(channel, remotecall_fetch(f, worker))
timedwait(()->isready(channel), float(secs), pollint=float(poll))
isready(channel) && return true
verbose && @warn "Time limit for computation exceeded. Interrupting..."
patience = 10
while !isready(channel) && (patience -= 1) > 0
interrupt(worker)
end
# If our interrupts didn't work, forcibly kill the process
if !isready(channel)
rc = ccall(:uv_kill, Cint, (Cint, Cint), ospid, SIGTERM)
rc == 0 || throw(_UVError("kill", rc))
end
close(channel)
false
end
ptimeout(f::Function, time::Period; kwargs...) =
ptimeout(f, Dates.value(convert(Second, time)); kwargs...)
"""
@ptimeout worker limit expr
Wrap the given expression `expr` in a function and run it on a remote worker up to the
given amount of time, `limit`. This macro is a thin convenience wrapper around the
[`ptimeout`](@ref) function.
"""
macro ptimeout(worker, limit, expr)
:(ptimeout(()->$expr, $limit, worker=$worker))
end
end # module