Skip to content

Commit

Permalink
Allow multiple executables in master_slaves
Browse files Browse the repository at this point in the history
  • Loading branch information
andrenth committed Mar 13, 2012
1 parent 38d0694 commit c3810e3
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 42 deletions.
38 changes: 19 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,23 @@ exploited with root escalation is considerably decreased.
The simple case of a master process and one slave process is implemented in
the function `Release.master_slave`.

val master_slave : ?background:bool
type ipc_handler = (Lwt_unix.file_descr -> unit Lwt.t)

val master_slave : slave:(string * ipc_handler)
-> ?background:bool
-> ?syslog:bool
-> ?privileged:bool
-> ?control:(string * (Lwt_unix.file_descr -> unit Lwt.t))
-> ?control:(string * ipc_handler)
-> lock_file:string
-> slave_ipc_handler:(Lwt_unix.file_descr -> unit Lwt.t)
-> exec:string
-> unit -> unit

The `slave` argument is a tuple whose first argument is the path to the slave
process executable. The second argument is a callback function that is used by
the master to handle inter-process communication with the slave. This function
receives the file descriptor to be used as a communication channel with the
slave process. More details about slave processes and IPC in Release will be
described in more details below.

The `background` argument indicates whether `Release.daemon` will be called.
The `syslog` argument indicates if the syslog facilities from the `Lwt_log`
module will be enabled. If the master process is supposed to run as root, then
Expand All @@ -80,30 +88,22 @@ and a callback function. The master process will create and listen on this
socket on startup. This is useful for the implementation of control programs
that communicate with the master process.

Inter-process communication with the slave process is handled in the master
by a callback function given in the `slave_ipc_handler` argument. This function
receives a file descriptor that is used for communication with the slave
process. IPC in Release will be described in more details below.

The `exec` argument must be a path to an executable file that corresponds to
a program that will be run as the slave process. More about the slave process
will be described below.

The general case of _n_ slave processes is handled by the function
`Release.master_slaves`.

val master_slaves : ?background:bool
-> ?syslog:bool
-> ?privileged:bool
-> ?control:(string * (Lwt_unix.file_descr -> unit Lwt.t))
-> num_slaves:int
-> ?control:(string * ipc_handler)
-> lock_file:string
-> ipc_handler:(Lwt_unix.file_descr -> unit Lwt.t)
-> exec:string
-> slaves:(string * ipc_handler * int) list
-> unit -> unit

This function works exactly like `Release.master_slave`, but it creates
`num_slaves` slave processes.
This function generalizes `Release.master_slave`, allowing the creation of
heterogeneous groups of slave process via the `slaves` argument. This argument
is a list of 3-element tuples containing the path to the slave executables,
the IPC callback function and the number of processes that will be created for
the given executable.

### Slaves

Expand Down
10 changes: 9 additions & 1 deletion lib/option.ml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,18 @@ let some = function
| None -> failwith "Option.some: None value"
| Some x -> x

let with_default z = function
let default z = function
| None -> z
| Some x -> x

let may f = function
| None -> ()
| Some x -> f x

let may_default z f = function
| None -> z
| Some x -> f x

let map f = function
| None -> None
| Some x -> Some (f x)
27 changes: 17 additions & 10 deletions lib/release.ml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
open Lwt
open Printf

type ipc_handler = (Lwt_unix.file_descr -> unit Lwt.t)

let fork () =
lwt () = Lwt_io.flush_all () in
match Lwt_unix.fork () with
Expand Down Expand Up @@ -132,18 +134,21 @@ let handle_proc_death reexec proc =
| Lwt_unix.WSTOPPED s -> log "process %d stopped by signal %d" pid s in
reexec ()

let rec exec_process path ipc_handler check_death_rate =
lwt () = check_death_rate () in
let link_processes ipc_handler =
let master_fd, slave_fd =
Lwt_unix.socketpair Lwt_unix.PF_UNIX Lwt_unix.SOCK_STREAM 0 in
Lwt_unix.set_close_on_exec master_fd;
let _ipc_t = ipc_handler master_fd in
Lwt_unix.unix_file_descr slave_fd

let rec exec_process path ipc_handler check_death_rate =
lwt () = check_death_rate () in
let slave_fd = link_processes ipc_handler in
let run_proc path =
let fd = Lwt_unix.unix_file_descr slave_fd in
let reexec () =
exec_process path ipc_handler check_death_rate in
Lwt_process.with_process_none
~stdin:(`FD_move fd)
~stdin:(`FD_move slave_fd)
(path, [| path |])
(handle_proc_death reexec) in
let _slave_t =
Expand Down Expand Up @@ -204,12 +209,12 @@ let handle_control_connections (sock_path, handler) =
exit 1

let master_slaves ?(background = true) ?(syslog = true) ?(privileged = true)
?control ~num_slaves ~lock_file ~slave_ipc_handler ~exec () =
?control ~lock_file ~slaves () =
if syslog then Lwt_log.default := Lwt_log.syslog ~facility:`Daemon ();
let create_procs () =
for_lwt i = 1 to num_slaves do
let create_slaves (path, ipc_handler, n) =
for_lwt i = 1 to n do
let exec_slave = init_exec_slave num_exec_tries in
exec_slave exec slave_ipc_handler
exec_slave path ipc_handler
done in
let work () =
ignore (Lwt_unix.on_signal Sys.sigterm handle_sigterm);
Expand All @@ -222,14 +227,16 @@ let master_slaves ?(background = true) ?(syslog = true) ?(privileged = true)
let idle_t, idle_w = Lwt.wait () in
let control_t =
Option.either return handle_control_connections control in
lwt () = create_procs () in
lwt () = Lwt_list.iter_p create_slaves slaves in
control_t <&> idle_t in
let main_t =
lwt () = if privileged then check_root () else check_nonroot () in
if background then daemon work else work () in
Lwt_main.run main_t

let master_slave = master_slaves ~num_slaves:1
let master_slave ~slave =
let (path, ipc_handler) = slave in
master_slaves ~slaves:[path, ipc_handler, 1]

let lose_privileges user =
lwt () = check_root () in
Expand Down
15 changes: 7 additions & 8 deletions lib/release.mli
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
type ipc_handler = (Lwt_unix.file_descr -> unit Lwt.t)

val daemon : (unit -> unit Lwt.t) -> unit Lwt.t

val master_slave : ?background:bool
val master_slave : slave:(string * ipc_handler)
-> ?background:bool
-> ?syslog:bool
-> ?privileged:bool
-> ?control:(string * (Lwt_unix.file_descr -> unit Lwt.t))
-> ?control:(string * ipc_handler)
-> lock_file:string
-> slave_ipc_handler:(Lwt_unix.file_descr -> unit Lwt.t)
-> exec:string
-> unit -> unit

val master_slaves : ?background:bool
-> ?syslog:bool
-> ?privileged:bool
-> ?control:(string * (Lwt_unix.file_descr -> unit Lwt.t))
-> num_slaves:int
-> ?control:(string * ipc_handler)
-> lock_file:string
-> slave_ipc_handler:(Lwt_unix.file_descr -> unit Lwt.t)
-> exec:string
-> slaves:(string * ipc_handler * int) list
-> unit -> unit

val me : ?syslog:bool
Expand Down
6 changes: 2 additions & 4 deletions lib_test/master.ml
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@ let control_connection_handler fd =

let () =
let slave_exec = sprintf "%s/_build/lib_test/test_slave" (Unix.getcwd ()) in
Release.master_slaves
~num_slaves:1
Release.master_slave
~background:false
~syslog:false
~lock_file:(sprintf "/var/run/%s.pid" (Filename.basename Sys.argv.(0)))
~slave_ipc_handler:ipc_handler
~control:("/tmp/master.socket", control_connection_handler)
~exec:slave_exec
~slave:(slave_exec, ipc_handler)
()

0 comments on commit c3810e3

Please sign in to comment.