Skip to content

Commit

Permalink
+
Browse files Browse the repository at this point in the history
git-svn-id: http://caml.inria.fr/svn/ocaml/branches/jocamltrunk@10561 f963ae5c-01c2-4b8c-9fe0-0dff7051ff02
  • Loading branch information
maranget committed Jun 11, 2010
1 parent b850620 commit 123dda1
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 57 deletions.
58 changes: 30 additions & 28 deletions otherlibs/join/joinHelper.ml
Expand Up @@ -18,7 +18,7 @@
type fork_args =
| No_argument
| Same_arguments of string array
| Argument_generator of (unit -> string array)
| Argument_generator of (int -> string array)

let prepend s a =
let len = Array.length a in
Expand All @@ -28,10 +28,10 @@ let prepend s a =
| 0 -> String.copy s
| i -> a.(pred i))

let get_args = function
let get_args k = function
| No_argument -> [| |]
| Same_arguments a -> a
| Argument_generator f -> f ()
| Argument_generator f -> f k

let filter_clients a =
let a' = Array.copy a in
Expand All @@ -50,16 +50,15 @@ let filter_clients a =
Array.sub a' 0 !i'

let do_forks name args n =
let rec df = function
| 0 -> []
| n ->
match Unix.fork () with
| 0 ->
let args = prepend name (get_args args) in
Unix.handle_unix_error
(fun () -> Unix.execv name args) ()
| pid -> pid :: (df (pred n)) in
df (max 0 n)
let rec df k =
if k >= n then []
else match Unix.fork () with
| 0 ->
let args = prepend name (get_args k args) in
Unix.handle_unix_error
(fun () -> Unix.execv name args) ()
| pid -> pid :: (df (succ k)) in
df 0


(* Configuration *)
Expand Down Expand Up @@ -122,21 +121,24 @@ let split_addr s =
raise (Arg.Bad ("invalid port: " ^ port)))

let make_commandline cfg =
[ "-host",
Arg.String
(fun s ->
let h, p = split_addr s in
cfg.host <- h;
cfg.port <- p),
"<name:port> Set host name and port" ;

"-clients",
Arg.Int (fun i -> cfg.clients <- i),
"<n> Set number of clients to launch";

"-forked-program",
Arg.String (fun s -> cfg.forked_program <- String.copy s),
"<name> Set executable for clients" ]
("-host",
Arg.String
(fun s ->
let h, p = split_addr s in
cfg.host <- h;
cfg.port <- p),
"<name:port> Set host name and port")::
begin
if cfg.clients < 0 then []
else
["-clients",
Arg.Int (fun i -> cfg.clients <- i),
"<n> Set number of clients to launch";

"-forked-program",
Arg.String (fun s -> cfg.forked_program <- String.copy s),
"<name> Set executable for clients" ]
end


(* Client-related functions *)
Expand Down
27 changes: 18 additions & 9 deletions otherlibs/join/joinHelper.mli
Expand Up @@ -20,7 +20,7 @@
type fork_args =
| No_argument (** No argument is passed to client. *)
| Same_arguments of string array (** All clients will reveive the same arguments. *)
| Argument_generator of (unit -> string array) (** The function will be called repeatedly to compute the arguments passed to the various clients. *)
| Argument_generator of (int -> string array) (** The function will be called repeatedly to compute the arguments passed to the various clients. *)
(** Type of arguments passed to forked clients. *)

val filter_clients : string array -> string array
Expand All @@ -31,7 +31,10 @@ val filter_clients : string array -> string array
val do_forks : string -> fork_args -> int -> int list
(** [do_forks prog args n] does [n] forks (none if [n] is negative),
using [prog] as the program name, and passing [args] to the
forked programs. Returns the pid list of the forked programs. *)
forked programs. Returns the pid list of the forked programs.
In case [fork_args] is [Argument_generator g], the calls of [g] will
be [g 0], [g 1],..., [g (n-1)]. *)


(** {6 Configuration} *)
Expand Down Expand Up @@ -64,13 +67,19 @@ val default_configuration : unit -> configuration

val make_commandline : configuration -> (Arg.key * Arg.spec * Arg.doc) list
(** [make_configuration cfg] returns
- a list of argument descriptors that will update the
configuration [cfg]when parsed through [Arg.parse] (or equivalent).
a list of argument descriptors that will update the
configuration [cfg] when parsed through [Arg.parse] (or equivalent).
The current version defines the following arguments:
The current version defines the following command line options:
- {i -host} to set [host] and [port] using ["host:port"] notation;
- {i -clients} to set [clients];
- {i -forked-program} to set [forked_program]. *)
- {i -forked-program} to set [forked_program].
By convention, if [cfg.clients] is negative, command
line options {i -clients} and {i -forked-program} are omitted.
@see <http://caml.inria.fr/pub/docs/manual-ocaml/libref/Arg.html#VALparse>
[Arg.parse]. *)


(** {6 Client-related functions} *)
Expand All @@ -80,9 +89,9 @@ type 'a lookup_function = Join.Ns.t -> string -> 'a
with a name. *)

val lookup_once : 'a lookup_function
(* A lookup function that tries to retrieve the value only once,
raising [Not_found] if value is not present and [Join.Exit] if
remote name service is down. *)
(** A lookup function that tries to retrieve the value only once,
raising [Not_found] if value is not present and {!Join.Exit} if
remote name service is down (alias for {!Join.Ns.lookup}). *)

val lookup_times : int -> float -> 'a lookup_function
(** [lookup_times n w] builds a lookup function that tries up to [n] times to
Expand Down
19 changes: 10 additions & 9 deletions otherlibs/join/joinPool.ml
Expand Up @@ -110,7 +110,7 @@ end

(** Interuptible workers *)
type subtask_id = int (** Subtask identifier *)
type ('elt,'partial) interuptible_worker =
type ('elt,'partial) interruptible_worker =
subtask_id * 'elt -> 'partial option (** Worker proper *)
type kill = subtask_id Join.chan (** Abort given subtask *)

Expand All @@ -121,8 +121,8 @@ end

type ('partial, 'result) t = {
register : (elt,'partial) worker Join.chan;
register_interuptible :
((elt,'partial) interuptible_worker * kill) Join.chan;
register_interruptible :
((elt,'partial) interruptible_worker * kill) Join.chan;
fold : collection -> ('partial -> 'result -> 'result) -> 'result -> 'result;
}
val create : unit -> ('partial, 'result) t
Expand All @@ -138,13 +138,13 @@ end

(** Interuptible workers *)
type subtask_id = int (** Subtask identifier *)
type 'a interuptible_worker =
type 'a interruptible_worker =
subtask_id * elt -> 'a option (** Worker proper *)
type kill = subtask_id Join.chan (** Abort given subtask *)

type ('partial, 'result) t = {
register : 'partial worker Join.chan;
register_interuptible : ('partial interuptible_worker * kill) Join.chan;
register_interruptible : ('partial interruptible_worker * kill) Join.chan;
fold : collection -> ('partial -> 'result -> 'result) -> 'result -> 'result;
}

Expand Down Expand Up @@ -271,7 +271,7 @@ def st(next_id) & fresh_nounce() =
(* Pool proper *)
type 'a agent =
| W of 'a worker
| WK of 'a interuptible_worker * kill
| WK of 'a interruptible_worker * kill

let create () =

Expand Down Expand Up @@ -308,12 +308,13 @@ def st(next_id) & fresh_nounce() =
end
| [] ->
agent(worker) &
if n > 0 then begin
if n <> 0 then begin
let again = m.get_active () in
match again with
| [] ->
pool(E,low)
| _ ->
let n = if n = min_int then min_int else n-1 in
pool(E,put (again,n-1,m) low)
end else begin
pool(E,low)
Expand Down Expand Up @@ -371,9 +372,9 @@ def st(next_id) & fresh_nounce() =
monitor.wait () in

def register(worker) = agent(W worker)
and register_interuptible(w,k) = agent(WK (w,k)) in
and register_interruptible(w,k) = agent(WK (w,k)) in
{ fold = fold ; register = register ;
register_interuptible = register_interuptible ; }
register_interruptible = register_interruptible ; }

end

Expand Down
41 changes: 30 additions & 11 deletions otherlibs/join/joinPool.mli
Expand Up @@ -77,6 +77,7 @@ module Shared : sig
- The same pool can be shared by several computations.
- More efficient handling of task re-issuing: fresh tasks have priority
over re-issued tasks.
- Ability to abort duplicated tasks when outcome reaches the pool.
- A little control on pool behavior is offered by the means
of the {!Config} module
argument.
Expand All @@ -90,7 +91,8 @@ module type Config = sig
val debug : bool
(** If true, gives a few diagnostics on the standard error stream. *)
val nagain : int
(** A given task will be re-issued at most [nagain] times *)
(** A given task will be re-issued at most [nagain] times.
No limit is enforced when [nagain] is strictly less that zero *)
end

(** Functional enumerations *)
Expand Down Expand Up @@ -154,14 +156,14 @@ end

(** {6 Pools} *)

(** Standard workers *)
type ('elt,'partial) worker = 'elt -> 'partial
(** Standard workers *)

(** Interuptible workers *)
type subtask_id = int (** Subtask identifier *)
type ('elt,'partial) interuptible_worker =
subtask_id * 'elt -> 'partial option (** Worker proper *)
type kill = subtask_id Join.chan (** Abort given subtask *)

type ('elt,'partial) interruptible_worker =
subtask_id * 'elt -> 'partial option (** Workers that can be aborted asynchrnously *)
type kill = subtask_id Join.chan (** To abort given subtask *)


(** Output signature of the pool functor *)
Expand All @@ -171,25 +173,42 @@ module type S = sig

type ('partial, 'result) t = {
register : (elt,'partial) worker Join.chan;
register_interuptible :
((elt,'partial) interuptible_worker * kill) Join.chan;
register_interruptible :
((elt,'partial) interruptible_worker * kill) Join.chan;
fold :
collection -> ('partial -> 'result -> 'result) -> 'result -> 'result;
}
(** Pools dispatch computations among registered agents, re-issuing pending
tasks if agents do not send computation outcomes.
Given a pool [p], returned by [create ()]:
- [p.register f] is used by agents to indicate that they can perform
- [p.register w] is used by agents to indicate that they can perform
computations, mapping [xi] values to [yi] results, using the
synchronous channel [f].
synchronous channel [w].
- [p.fold c comb y0] returns the combined result
[comb y1 (comb y2 (... (comb yn y0)))],
where the [yi] values are the results of the [xi]
transformed by the functions
registered by the agents. The [xi] result from enumerating the collection
[c]. The enumeration technique is specified by the module argument [E]
to the functor {!Make}. *)
(signature {!Enumerable})
to the functor {!Make}.
- [p.register_interruptible (w,k)] is used by agents to indicate that
they can perform computations as above.
Additionally the pool logics will attempt to abort computations
found to be useless by issusing messages on channel [k].
More specifically, when
given an argument [(id,xi)] by the pool logics,
the synchronous channel [w] should return [Some yi],
where [xi] and [yi] are the same as in the description of [p.fold] above.
However, if the pool sends [id] on channel [k] before [yi] is
available, then the agent may abort the computation of [yi],
so as to spare computing power.
In that case, [w(xi)] must reply [None].
It is the agent responsability to check that subtask identifiers
sent on the [w] and [k] channels
are equal before aborting the subtask and having [w] to reply [None] *)


val create : unit -> ('partial, 'result) t
(** Pool creator *)
Expand Down

0 comments on commit 123dda1

Please sign in to comment.