From 6fae8e9a14e4ebe7bcee40258dc7772f7f57bf75 Mon Sep 17 00:00:00 2001 From: pad Date: Sun, 20 Mar 2011 16:21:41 -0700 Subject: [PATCH] introduce commons/parallel.ml, a poor's man job parallel runner --- commons/Makefile | 4 ++- commons/common.ml | 9 +++++++ commons/common.mli | 6 +++++ commons/parallel.ml | 61 ++++++++++++++++++++++++++++++++++++++++++++ commons/parallel.mli | 9 +++++++ 5 files changed, 88 insertions(+), 1 deletion(-) create mode 100644 commons/parallel.ml create mode 100644 commons/parallel.mli diff --git a/commons/Makefile b/commons/Makefile index df9b824dc..479208528 100644 --- a/commons/Makefile +++ b/commons/Makefile @@ -22,7 +22,9 @@ MYSRC=common.ml common_extra.ml \ math.ml \ unicode.ml \ simple_color.ml file_type.ml file_format.ml \ - glimpse.ml parser_combinators.ml + glimpse.ml parser_combinators.ml \ + concurrency.ml parallel.ml \ + # src from other authors, got from the web or caml hump, too small to be # in their own external/ocamlxxx/ diff --git a/commons/common.ml b/commons/common.ml index 3deb138f7..90e4a6762 100644 --- a/commons/common.ml +++ b/commons/common.ml @@ -4936,6 +4936,11 @@ let hashset_to_list h = hash_to_list h +> List.map fst let hashset_of_list xs = xs +> List.map (fun x -> x, true) +> hash_of_list +let hashset_union h1 h2 = + h2 +> Hashtbl.iter (fun k _bool -> + Hashtbl.replace h1 k true + ) + let hkeys h = @@ -4943,6 +4948,10 @@ let hkeys h = h +> Hashtbl.iter (fun k v -> Hashtbl.replace hkey k true); hashset_to_list hkey +let hunion h1 h2 = + h2 +> Hashtbl.iter (fun k v -> + Hashtbl.add h1 k v + ) let group_assoc_bykey_eff2 xs = diff --git a/commons/common.mli b/commons/common.mli index ea512c61d..52075a0a9 100644 --- a/commons/common.mli +++ b/commons/common.mli @@ -1757,6 +1757,9 @@ val hash_of_list : ('a * 'b) list -> ('a, 'b) Hashtbl.t val hkeys : ('a, 'b) Hashtbl.t -> 'a list + +(* hunion h1 h2 adds all binding in h2 into h1 *) +val hunion: ('a, 'b) Hashtbl.t -> ('a, 'b) Hashtbl.t -> unit (*x: common.mli for collection types *) (*****************************************************************************) (* Hash sets *) @@ -1768,6 +1771,9 @@ type 'a hashset = ('a, bool) Hashtbl.t (* common use of hashset, in a hash of hash *) val hash_hashset_add : 'a -> 'b -> ('a, 'b hashset) Hashtbl.t -> unit +(* hashset_union h1 h2 adds all elements in h2 into h1 *) +val hashset_union: 'a hashset -> 'a hashset -> unit + val hashset_to_set : < fromlist : ('a ) list -> 'c; .. > -> ('a, 'b) Hashtbl.t -> 'c diff --git a/commons/parallel.ml b/commons/parallel.ml new file mode 100644 index 000000000..ba3880ae6 --- /dev/null +++ b/commons/parallel.ml @@ -0,0 +1,61 @@ +open Common + +(*****************************************************************************) +(* Prelude *) +(*****************************************************************************) + +(*****************************************************************************) +(* Building block *) +(*****************************************************************************) + +(* src: harrop article on fork-based parallelism *) +let invoke (f : 'a -> 'b) x : unit -> 'b = + let input, output = Unix.pipe() in + match Unix.fork() with + (* pad: what is this ?? *) + | -1 -> (let v = f x in fun () -> v) + | 0 -> + Unix.close input; + let output = Unix.out_channel_of_descr output in + Marshal.to_channel output (try `Res(f x) with e -> `Exn e) []; + close_out output; + exit 0 + | pid -> + Unix.close output; + let input = Unix.in_channel_of_descr input in + fun () -> + let v = Marshal.from_channel input in + ignore (Unix.waitpid [] pid); + close_in input; + match v with + | `Res x -> x + | `Exn e -> raise e;; + +let parallel_map f xs = + let futures = List.map (invoke f) xs in + (* sync *) + List.map (fun futur -> futur ()) futures + +(*****************************************************************************) +(* Poor's man job scheduler *) +(*****************************************************************************) + +type 'a job = unit -> 'a +type 'a jobs = ('a job) list + +(* + * This is a very naive job scheduler. One limitation is that before + * launching another run we must wait for the slowest process. A + * set of workers and a master model would be more efficient by always + * feeding processors. A partial fix is to give a tasks number that + * is quite superior to the actual number of processors. + *) +let map_jobs ~tasks xs = + if tasks = 1 + then List.map (fun job -> job ()) xs + else + let xxs = Common.pack_safe tasks xs in + xxs +> List.map (fun xs -> + (* do in parallel a batch of job *) + parallel_map (fun job -> job ()) xs + ) +> List.flatten diff --git a/commons/parallel.mli b/commons/parallel.mli new file mode 100644 index 000000000..9552fd88c --- /dev/null +++ b/commons/parallel.mli @@ -0,0 +1,9 @@ + +val invoke : ('a -> 'b) -> 'a -> unit -> 'b + +val parallel_map : ('a -> 'b) -> 'a list -> 'b list + +type 'a job = unit -> 'a +type 'a jobs = ('a job) list + +val map_jobs: tasks:int -> 'a jobs -> 'a list