Skip to content

Commit

Permalink
introduce commons/parallel.ml, a poor's man job parallel runner
Browse files Browse the repository at this point in the history
  • Loading branch information
pad committed Mar 20, 2011
1 parent 654b13d commit 6fae8e9
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 1 deletion.
4 changes: 3 additions & 1 deletion commons/Makefile
Expand Up @@ -22,7 +22,9 @@ MYSRC=common.ml common_extra.ml \
math.ml \
unicode.ml \
simple_color.ml file_type.ml file_format.ml \
glimpse.ml parser_combinators.ml
glimpse.ml parser_combinators.ml \
concurrency.ml parallel.ml \


# src from other authors, got from the web or caml hump, too small to be
# in their own external/ocamlxxx/
Expand Down
9 changes: 9 additions & 0 deletions commons/common.ml
Expand Up @@ -4936,13 +4936,22 @@ let hashset_to_list h = hash_to_list h +> List.map fst
let hashset_of_list xs =
xs +> List.map (fun x -> x, true) +> hash_of_list

let hashset_union h1 h2 =
h2 +> Hashtbl.iter (fun k _bool ->
Hashtbl.replace h1 k true
)



let hkeys h =
let hkey = Hashtbl.create 101 in
h +> Hashtbl.iter (fun k v -> Hashtbl.replace hkey k true);
hashset_to_list hkey

let hunion h1 h2 =
h2 +> Hashtbl.iter (fun k v ->
Hashtbl.add h1 k v
)


let group_assoc_bykey_eff2 xs =
Expand Down
6 changes: 6 additions & 0 deletions commons/common.mli
Expand Up @@ -1757,6 +1757,9 @@ val hash_of_list : ('a * 'b) list -> ('a, 'b) Hashtbl.t


val hkeys : ('a, 'b) Hashtbl.t -> 'a list

(* hunion h1 h2 adds all binding in h2 into h1 *)
val hunion: ('a, 'b) Hashtbl.t -> ('a, 'b) Hashtbl.t -> unit
(*x: common.mli for collection types *)
(*****************************************************************************)
(* Hash sets *)
Expand All @@ -1768,6 +1771,9 @@ type 'a hashset = ('a, bool) Hashtbl.t
(* common use of hashset, in a hash of hash *)
val hash_hashset_add : 'a -> 'b -> ('a, 'b hashset) Hashtbl.t -> unit

(* hashset_union h1 h2 adds all elements in h2 into h1 *)
val hashset_union: 'a hashset -> 'a hashset -> unit

val hashset_to_set :
< fromlist : ('a ) list -> 'c; .. > -> ('a, 'b) Hashtbl.t -> 'c

Expand Down
61 changes: 61 additions & 0 deletions commons/parallel.ml
@@ -0,0 +1,61 @@
open Common

(*****************************************************************************)
(* Prelude *)
(*****************************************************************************)

(*****************************************************************************)
(* Building block *)
(*****************************************************************************)

(* src: harrop article on fork-based parallelism *)
let invoke (f : 'a -> 'b) x : unit -> 'b =
let input, output = Unix.pipe() in
match Unix.fork() with
(* pad: what is this ?? *)
| -1 -> (let v = f x in fun () -> v)
| 0 ->
Unix.close input;
let output = Unix.out_channel_of_descr output in
Marshal.to_channel output (try `Res(f x) with e -> `Exn e) [];
close_out output;
exit 0
| pid ->
Unix.close output;
let input = Unix.in_channel_of_descr input in
fun () ->
let v = Marshal.from_channel input in
ignore (Unix.waitpid [] pid);
close_in input;
match v with
| `Res x -> x
| `Exn e -> raise e;;

let parallel_map f xs =
let futures = List.map (invoke f) xs in
(* sync *)
List.map (fun futur -> futur ()) futures

(*****************************************************************************)
(* Poor's man job scheduler *)
(*****************************************************************************)

type 'a job = unit -> 'a
type 'a jobs = ('a job) list

(*
* This is a very naive job scheduler. One limitation is that before
* launching another run we must wait for the slowest process. A
* set of workers and a master model would be more efficient by always
* feeding processors. A partial fix is to give a tasks number that
* is quite superior to the actual number of processors.
*)
let map_jobs ~tasks xs =
if tasks = 1
then List.map (fun job -> job ()) xs
else
let xxs = Common.pack_safe tasks xs in
xxs +> List.map (fun xs ->
(* do in parallel a batch of job *)
parallel_map (fun job -> job ()) xs
) +> List.flatten
9 changes: 9 additions & 0 deletions commons/parallel.mli
@@ -0,0 +1,9 @@

val invoke : ('a -> 'b) -> 'a -> unit -> 'b

val parallel_map : ('a -> 'b) -> 'a list -> 'b list

type 'a job = unit -> 'a
type 'a jobs = ('a job) list

val map_jobs: tasks:int -> 'a jobs -> 'a list

0 comments on commit 6fae8e9

Please sign in to comment.