-
Notifications
You must be signed in to change notification settings - Fork 1.8k
/
lwtUtils.ml
60 lines (55 loc) · 2.29 KB
/
lwtUtils.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
(*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*)
(* Lwt.join is a great way to run multiple threads in parallel. However it has this really annoying
* property where it won't exit early if one of the threads fails. It's not a big deal if you
* expect this behavior, but it can be dangerous if you expect the same behavior as Promise.all or
* Hack's await.
*
* We can instead simulate how Lwt.join should work by calling Lwt.nchoose multiple times until
* one thread throws an exception or until all threads have finished.
*
* In the exceptional case, we won't cancel the still-sleeping threads. I (glevi) tried to get this
* to work, but it wouldn't preserve stack traces. Anyway, Promise.all doesn't cancel running
* promises either :P
*)
let rec iter_all threads =
if threads = [] then
Lwt.return_unit
else
(* If any thread in threads fails during this nchoose, the whole all function will fail *)
let%lwt (_, sleeping_threads) = Lwt.nchoose_split threads in
iter_all sleeping_threads
let get_value_unsafe thread =
match Lwt.state thread with
| Lwt.Return x -> x
| _ -> failwith "Not yet completed"
let all threads =
let%lwt () = iter_all threads in
threads |> Base.List.map ~f:get_value_unsafe |> Lwt.return
let output_graph out strip_root graph =
let%lwt () = Lwt_io.fprint out "digraph {\n" in
let%lwt () =
Lwt_list.iter_s
(fun (f, dep_fs) ->
Lwt_list.iter_s
(fun dep_f -> Lwt_io.fprintf out " \"%s\" -> \"%s\"\n" (strip_root f) (strip_root dep_f))
dep_fs)
graph
in
Lwt_io.fprint out "}"
(** [fold_result_s ~f ~init l] calls [f init x] for each [x] in [l], where [f] returns an
['acc result Lwt.t], and the fold short circuits if an [Error] is returned. Each
promise returned by [f] is resolved sequentially (hence the [_s]), and if any
promise rejects, the entire fold rejects. This is like a combination of
[Base.List.fold_result] and [Lwt_list.fold_left_s]. *)
let rec fold_result_s ~f ~init l =
match l with
| [] -> Lwt.return (Ok init)
| x :: l ->
(match%lwt f init x with
| Ok acc -> (fold_result_s [@ocaml.tailcall]) f acc l
| Error _ as err -> Lwt.return err)