Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

RFC : Added interrupt_workers #3819

Merged
merged 1 commit into from

3 participants

@amitmurthy
Collaborator

Patch provides a means to support Ctrl-C for both remote as well as local workers as outlined in #3805

Base.interrupt_workers() must be called from the sigint handler as desired.

Interrupting workers has been implemented via callbacks to the ClusterManager. The system pid of the worker process is sent back as a response to the initial :join_pgrp message sent to the workers

@JeffBezanson

Great functionality to have.

I'm a little concerned about splitting up information about a Worker so that some of it is in the Worker object, and some of it is in dictionaries in the ClusterManager. And a function accepting a symbol to tell it what to do is a little strange; seems like it should be 3 methods instead.

I think the API should be interrupt(workers()) and interrupt(n).

@amitmurthy
Collaborator

I put it in the RegularCluster in order to keep it similar to Scyld, SGE and other Cluster Managers. Since the typical use cases would be ssh or local procs, I could move the required information into the Worker object and only call the ClusterManager callback if it is not of type RegularCluster. Will update the patch.

The function accepting the symbol is a callback function provided by the ClusterManager interface. Currently it is called only on 3 events - register, interrupt and deregister. There may be more events in the future - I just wanted to avoid a series of differently named callbacks. For SGE/Scyld the interrupt functionality will have to be provided by the respective ClusterManagers.

You mean interrupt(; pids = workers()) and interrupt(pid::Integer), right? I'll not export these if this functionality will only be exposed via Ctrl-C

@JeffBezanson

Putting in a special case for RegularCluster instead of using the normal callback sounds worse to me.

@amitmurthy
Collaborator

It does keep the regular code cleaner and simpler. Let me quickly try it out and we can then take a call.

@amitmurthy
Collaborator

Removed dictionaries in the cluster manager to keep context. Instead, at worker creation time, the cluster managers can provide an opaque "context" object linked with each worker which is passed back in the callbacks.

The above simplifies the code a bit and also keeps the same model for RegularCluster as well as other ClusterManagers.

@amitmurthy
Collaborator

Have exported interrupt.

interrupt(workers()) uses @sync and @async. I don't know if this may cause problems when called from within a signal handler as may be required for closing #3805

Do let me know if I should change it.

@amitmurthy
Collaborator

@JeffBezanson - can I merge this?

@JeffBezanson
Owner

OK, let's rebase and merge this.
I'm not sure pids needs to be a keyword argument, but that's a small detail.

@amitmurthy amitmurthy merged commit 5d05a48 into JuliaLang:master
@ViralBShah
Owner

Now that this is in, it would be nice to have docs too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Aug 2, 2013
  1. @amitmurthy
This page is out of date. Refresh to see the latest.
Showing with 104 additions and 31 deletions.
  1. +1 −0  base/exports.jl
  2. +103 −31 base/multi.jl
View
1  base/exports.jl
@@ -1105,6 +1105,7 @@ export
addprocs,
ClusterManager,
fetch,
+ interrupt,
isready,
myid,
nprocs,
View
134 base/multi.jl
@@ -85,6 +85,8 @@ function send_msg_now(s::IO, kind, args...)
send_msg_unknown(s, kind, args)
end
+abstract ClusterManager
+
type Worker
host::ByteString
port::Uint16
@@ -94,6 +96,9 @@ type Worker
add_msgs::Array{Any,1}
id::Int
gcflag::Bool
+ cman::ClusterManager
+ cman_ctxt
+ ospid
Worker(host::String, port::Integer, sock::TcpSocket, id::Int) =
new(bytestring(host), uint16(port), sock, IOBuffer(), {}, {}, id, false)
@@ -200,15 +205,22 @@ function add_workers(pg::ProcessGroup, w::Array{Any,1})
# NOTE: currently only node 1 can add new nodes, since nobody else
# has the full list of address:port
assert(LPROC.id == 1)
- for i=1:length(w)
+ numw = length(w)
+ for i=1:numw
w[i].id = get_next_pid()
register_worker(w[i])
create_message_handler_loop(w[i].socket)
+
+ cman = w[i].cman
+ if contains(names(cman), :worker_cb) && isdefined(cman, :worker_cb)
+ cman.worker_cb(:register, cman, w[i].cman_ctxt, w[i].id, i)
+ end
end
all_locs = map(x -> isa(x, Worker) ? (x.host,x.port, x.id) : ("", 0, x.id), pg.workers)
- for i=1:length(w)
+ for i=1:numw
send_msg_now(w[i], :join_pgrp, w[i].id, all_locs)
end
+
[w[i].id for i in 1:length(w)]
end
@@ -306,7 +318,14 @@ deregister_worker(pid) = deregister_worker(PGRP, pid)
function deregister_worker(pg, pid)
pg.workers = filter(x -> !(x.id == pid), pg.workers)
w = delete!(map_pid_wrkr, pid, nothing)
- if isa(w, Worker) delete!(map_sock_wrkr, w.socket) end
+ if isa(w, Worker)
+ delete!(map_sock_wrkr, w.socket)
+
+ # Notify the cluster manager of this workers death
+ if (myid() == 1) && contains(names(w.cman), :worker_cb) && isdefined(w.cman, :worker_cb)
+ w.cman.worker_cb(:deregister, w.cman, w.cman_ctxt, w.id)
+ end
+ end
add!(map_del_wrkr, pid)
# delete this worker from our RemoteRef client sets
@@ -778,7 +797,7 @@ function create_message_handler_loop(sock::AsyncStream) #returns immediately
try
while true
msg = deserialize(sock)
- #println("got msg: ",msg)
+ # println("got msg: ",msg)
# handle message
if is(msg, :call)
id = deserialize(sock)
@@ -834,6 +853,9 @@ function create_message_handler_loop(sock::AsyncStream) #returns immediately
register_worker(Worker("", 0, sock, 1))
register_worker(LPROC)
+ #send back env info to pid 1. Currently only ospid
+ send_msg(sock, :worker_info, getpid())
+
for (rhost, rport, rpid) in locs
if (rpid < self_pid) && (!(rpid == 1))
# Connect to them
@@ -847,7 +869,12 @@ function create_message_handler_loop(sock::AsyncStream) #returns immediately
continue
end
end
+ elseif is(msg, :worker_info)
+ w = map_sock_wrkr[sock]
+ ospid = deserialize(sock)
+ w.ospid = ospid
end
+
end # end of while
catch e
iderr = worker_id_from_socket(sock)
@@ -928,39 +955,39 @@ function start_cluster_workers(n, config)
# Get the cluster manager to launch the instance
(insttype, instances) = cman.launch_cb(n, config)
-
+
+ safe_tupidx(tup, pos) = length(tup) >= pos ? tup[pos] : nothing
if insttype == :io_only
read_cb_response(inst) =
begin
- (host, port) = read_worker_host_port(inst)
- inst, host, port, host
+ (host, port) = read_worker_host_port(inst[1])
+ inst[1], host, port, host, safe_tupidx(inst,2)
end
elseif insttype == :io_host
read_cb_response(inst) =
begin
- io = inst[1]
- (priv_hostname, port) = read_worker_host_port(io)
- io, priv_hostname, port, inst[2]
+ (priv_hostname, port) = read_worker_host_port(inst[1])
+ inst[1], priv_hostname, port, inst[2], safe_tupidx(inst,3)
end
elseif insttype == :io_host_port
- read_cb_response(inst) = (inst[1], inst[2], inst[3], inst[2])
+ read_cb_response(inst) = (inst[1], inst[2], inst[3], inst[2], safe_tupidx(inst,4))
elseif insttype == :host_port
- read_cb_response(inst) = (nothing, inst[1], inst[2], inst[1])
+ read_cb_response(inst) = (nothing, inst[1], inst[2], inst[1], safe_tupidx(inst,3))
elseif insttype == :cmd
read_cb_response(inst) =
begin
io,_ = readsfrom(detach(inst))
(host, port) = read_worker_host_port(io)
- io, host, port, host
+ io, host, port, host, safe_tupidx(inst,2)
end
else
error("Unsupported format from Cluster Manager callback")
end
for i=1:n
- (io, privhost, port, pubhost) = read_cb_response(instances[i])
- w[i] = create_worker(privhost, port, pubhost, io, config)
+ (io, privhost, port, pubhost, cman_ctxt) = read_cb_response(instances[i])
+ w[i] = create_worker(privhost, port, pubhost, io, config, cman_ctxt)
end
w
end
@@ -976,10 +1003,11 @@ function read_worker_host_port (io::IO)
end
end
-function create_worker(privhost, port, pubhost, stream, config)
+function create_worker(privhost, port, pubhost, stream, config, cman_ctxt)
tunnel = config[:tunnel]
s = split(pubhost,'@')
+ user = ""
if length(s) > 1
user = s[1]
pubhost = s[2]
@@ -997,6 +1025,8 @@ function create_worker(privhost, port, pubhost, stream, config)
else
w = Worker(pubhost, port)
end
+ w.cman = config[:cman]
+ w.cman_ctxt = cman_ctxt
if isa(stream, AsyncStream)
stream.line_buffered = true
@@ -1052,15 +1082,16 @@ function ssh_tunnel(user, host, privhost, port, sshflags)
end
-abstract ClusterManager
-
function launch_procs(n::Integer, config::Dict)
dir = config[:dir]
exename = config[:exename]
exeflags = config[:exeflags]
cman = config[:cman]
- outs=cell(n)
+ cman.config = config # Will need the various startup env later
+
+ io_objs = cell(n)
+ contexts = cell(n)
# start the processes first...
if (cman.ssh)
@@ -1071,36 +1102,52 @@ function launch_procs(n::Integer, config::Dict)
end
for i in 1:n
- io,_ = readsfrom(detach(lcmd(i)))
- outs[i] = io
+ io, pobj = readsfrom(detach(lcmd(i)))
+ io_objs[i] = io
+ contexts[i] = cman.ssh ? {:machine => cman.machines[i]} : {:pobj => pobj}
end
# ...and then read the host:port info. This optimizes overall start times.
# For ssh, the tunnel connection, if any, has to be with the specified machine name.
# but the port needs to be forwarded to the private hostname/ip-address
- if cman.ssh
- for i in 1:n
- io = outs[i]
- outs[i] = (io, cman.machines[i])
- end
- end
if cman.ssh
- return (:io_host, outs)
+ return (:io_host, map(i -> (io_objs[i], cman.machines[i], contexts[i]), 1:n))
else
- return (:io_only, outs)
+ return (:io_only, map(i -> (io_objs[i], contexts[i]), 1:n))
end
end
-immutable RegularCluster <: ClusterManager
+function cman_worker_cb(op, cman, ctxt, pid, args...)
+ if op == :interrupt
+ (ospid,) = args
+ if cman.ssh
+ machine = ctxt[:machine]
+ sshflags = cman.config[:sshflags]
+ if !success(`ssh -n $sshflags $machine "kill -2 $ospid"`)
+ println("Error sending a Ctrl-C to julia worker $pid on $machine")
+ end
+ else
+ pobj = ctxt[:pobj]
+ kill(pobj, 2)
+ end
+ end
+end
+
+type RegularCluster <: ClusterManager
launch_cb::Function
ssh::Bool
machines
- RegularCluster(; ssh=false, machines=[]) = new(launch_procs, ssh, machines)
+ worker_cb::Function
+ config::Dict
+
+ RegularCluster(; ssh=false, machines=[]) = new(launch_procs, ssh, machines, cman_worker_cb)
end
+show(io::IO, cman::RegularCluster) = println("RegularCluster :: ssh: ", cman.ssh, ", machines : ", cman.machines)
+
# start and connect to processes via SSH.
# optionally through an SSH tunnel.
# the tunnel is only used from the head (process 1); the nodes are assumed
@@ -1535,3 +1582,28 @@ function timedwait(testcb::Function, secs::Float64; pollint::Float64=0.1)
ret
end
+function interrupt(pid::Integer)
+ assert(myid() == 1)
+ w = map_pid_wrkr[pid]
+ if isa(w, Worker)
+ if contains(names(w.cman), :worker_cb) && isdefined(w.cman, :worker_cb)
+ if isdefined(w, :ospid)
+ w.cman.worker_cb(:interrupt, w.cman, w.cman_ctxt, w.id, w.ospid)
+ else
+ # This state can happen immediately after an addprocs
+ println("Worker $(w.id) cannot be presently interrupted.")
+ end
+ else
+ println("This worker's ClusterManager does not support interrupts.")
+ end
+ end
+end
+
+function interrupt(pids::AbstractVector=workers())
+ assert(myid() == 1)
+ @sync begin
+ for pid in pids
+ @async interrupt(pid)
+ end
+ end
+end
Something went wrong with that request. Please try again.