diff --git a/src/core/dune b/src/core/dune index 5e779d7d..0f77b53e 100644 --- a/src/core/dune +++ b/src/core/dune @@ -26,7 +26,7 @@ let () = Jbuild_plugin.V1.send @@ {| (synopsis "Monadic promises and concurrent I/O") (wrapped false) |} ^ preprocess ^ {| - (libraries bytes result seq) + (libraries bytes result seq domainslib) (flags (:standard -w +A-29))) (documentation diff --git a/src/core/lwt_lock.ml b/src/core/lwt_lock.ml new file mode 100644 index 00000000..89b0b661 --- /dev/null +++ b/src/core/lwt_lock.ml @@ -0,0 +1,32 @@ +(** Locks for multicore*) +module C = Domainslib.Chan + +type t = int C.t + +let create_lock () = + let c = C.make_bounded 1 in + C.send c 0; + c + +let acquire_lock lock = + C.recv lock |> ignore + +let release_lock lock = + C.send lock 0 + +let is_locked lock = + let c = C.recv_poll lock in + match c with + | Some v -> + C.send lock v; + false + | None -> + true + +let try_acquire lock = + match C.recv_poll lock with + | Some _ -> true + | None -> false + +let try_release lock = + C.send_poll lock 0 \ No newline at end of file diff --git a/src/core/lwt_mutex.ml b/src/core/lwt_mutex.ml index 32dff587..09387ea7 100644 --- a/src/core/lwt_mutex.ml +++ b/src/core/lwt_mutex.ml @@ -18,18 +18,25 @@ type t = { mutable locked : bool; mutable waiters : unit Lwt.u Lwt_sequence.t } let create () = { locked = false; waiters = Lwt_sequence.create () } +let l = Lwt_lock.create_lock () + let lock m = if m.locked then (Lwt.add_task_r [@ocaml.warning "-3"]) m.waiters else begin + Lwt_lock.acquire_lock l; m.locked <- true; + Lwt_lock.release_lock l; Lwt.return_unit end let unlock m = if m.locked then begin - if Lwt_sequence.is_empty m.waiters then - m.locked <- false + if Lwt_sequence.is_empty m.waiters then begin + Lwt_lock.acquire_lock l; + m.locked <- false; + Lwt_lock.release_lock l + end else (* We do not use [Lwt.wakeup] here to avoid a stack overflow when unlocking a lot of threads. *) diff --git a/src/core/lwt_mvar.ml b/src/core/lwt_mvar.ml index d7a94915..8bcc4bd0 100644 --- a/src/core/lwt_mvar.ml +++ b/src/core/lwt_mvar.ml @@ -49,6 +49,8 @@ type 'a t = { (* Threads waiting for a value *) } +let lock = Lwt_lock.create_lock () + let create_empty () = { mvar_contents = None; writers = Lwt_sequence.create (); @@ -60,6 +62,7 @@ let create v = readers = Lwt_sequence.create () } let put mvar v = + Lwt_lock.acquire_lock lock; match mvar.mvar_contents with | None -> begin match Lwt_sequence.take_opt_l mvar.readers with @@ -68,11 +71,13 @@ let put mvar v = | Some w -> Lwt.wakeup_later w v end; + Lwt_lock.release_lock lock; Lwt.return_unit | Some _ -> let (res, w) = Lwt.task () in let node = Lwt_sequence.add_r (v, w) mvar.writers in Lwt.on_cancel res (fun _ -> Lwt_sequence.remove node); + Lwt_lock.release_lock lock; res let next_writer mvar = @@ -86,7 +91,9 @@ let next_writer mvar = let take_available mvar = match mvar.mvar_contents with | Some v -> + Lwt_lock.acquire_lock lock; next_writer mvar; + Lwt_lock.release_lock lock; Some v | None -> None diff --git a/src/core/lwt_sequence.ml b/src/core/lwt_sequence.ml index 34f89723..6a0bff7f 100644 --- a/src/core/lwt_sequence.ml +++ b/src/core/lwt_sequence.ml @@ -20,6 +20,7 @@ type 'a node = { external seq_of_node : 'a node -> 'a t = "%identity" external node_of_seq : 'a t -> 'a node = "%identity" +let lock = Lwt_lock.create_lock () (* +-----------------------------------------------------------------+ | Operations on nodes | +-----------------------------------------------------------------+ *) @@ -28,14 +29,20 @@ let get node = node.node_data let set node data = - node.node_data <- data + Lwt_lock.acquire_lock lock; + node.node_data <- data; + Lwt_lock.release_lock lock let remove node = if node.node_active then begin + let is_locked = Lwt_lock.try_acquire lock in node.node_active <- false; let seq = seq_of_node node in seq.prev.next <- seq.next; - seq.next.prev <- seq.prev + seq.next.prev <- seq.prev; + match is_locked with + | true -> Lwt_lock.release_lock lock + | false -> () end (* +-----------------------------------------------------------------+ @@ -62,23 +69,29 @@ let length seq = loop seq.next 0 let add_l data seq = + Lwt_lock.acquire_lock lock; let node = { node_prev = seq; node_next = seq.next; node_data = data; node_active = true } in seq.next.prev <- seq_of_node node; seq.next <- seq_of_node node; + Lwt_lock.release_lock lock; node let add_r data seq = + Lwt_lock.acquire_lock lock; let node = { node_prev = seq.prev; node_next = seq; node_data = data; node_active = true } in seq.prev.next <- seq_of_node node; seq.prev <- seq_of_node node; + Lwt_lock.release_lock lock; node let take_l seq = if is_empty seq then raise Empty else begin + Lwt_lock.acquire_lock lock; let node = node_of_seq seq.next in remove node; + Lwt_lock.release_lock lock; node.node_data end @@ -86,8 +99,10 @@ let take_r seq = if is_empty seq then raise Empty else begin + Lwt_lock.acquire_lock lock; let node = node_of_seq seq.prev in remove node; + Lwt_lock.release_lock lock; node.node_data end @@ -110,20 +125,24 @@ let take_opt_r seq = end let transfer_l s1 s2 = + Lwt_lock.acquire_lock lock; s2.next.prev <- s1.prev; s1.prev.next <- s2.next; s2.next <- s1.next; s1.next.prev <- s2; s1.prev <- s1; - s1.next <- s1 + s1.next <- s1; + Lwt_lock.release_lock lock let transfer_r s1 s2 = + Lwt_lock.acquire_lock lock; s2.prev.next <- s1.next; s1.next.prev <- s2.prev; s2.prev <- s1.prev; s1.prev.next <- s2; s1.prev <- s1; - s1.next <- s1 + s1.next <- s1; + Lwt_lock.release_lock lock let iter_l f seq = let rec loop curr = diff --git a/src/unix/dune b/src/unix/dune index de71bb0e..6bf4b37a 100644 --- a/src/unix/dune +++ b/src/unix/dune @@ -42,7 +42,7 @@ let () = Jbuild_plugin.V1.send @@ {| (synopsis "Unix support for Lwt") (optional) (wrapped false) - (libraries bigarray lwt mmap ocplib-endian.bigstring threads unix) + (libraries bigarray lwt mmap ocplib-endian.bigstring threads unix domainslib) |} ^ preprocess ^ {| (c_names lwt_unix_stubs diff --git a/src/unix/lwt_preemptive.ml b/src/unix/lwt_preemptive.ml index c26ec596..7eb92eb3 100644 --- a/src/unix/lwt_preemptive.ml +++ b/src/unix/lwt_preemptive.ml @@ -1,7 +1,6 @@ -(* This file is part of Lwt, released under the MIT license. See LICENSE.md for - details, or visit https://github.com/ocsigen/lwt/blob/master/LICENSE.md. *) - +open Lwt.Infix +module C = Domainslib.Chan (* [Lwt_sequence] is deprecated – we don't want users outside Lwt using it. However, it is still used internally by Lwt. So, briefly disable warning 3 @@ -12,115 +11,54 @@ module Lwt_sequence = Lwt_sequence [@@@ocaml.warning "+3"] -open Lwt.Infix - -(* +-----------------------------------------------------------------+ - | Parameters | - +-----------------------------------------------------------------+ *) - -(* Minimum number of preemptive threads: *) -let min_threads : int ref = ref 0 +(* Minimum number of domains: *) +let min_domains : int ref = ref 0 -(* Maximum number of preemptive threads: *) -let max_threads : int ref = ref 0 +(* Maximum number of domains: *) +let max_domains : int ref = ref 0 (* Size of the waiting queue: *) -let max_thread_queued = ref 1000 - -let get_max_number_of_threads_queued _ = - !max_thread_queued - -let set_max_number_of_threads_queued n = - if n < 0 then invalid_arg "Lwt_preemptive.set_max_number_of_threads_queued"; - max_thread_queued := n - -(* The total number of preemptive threads currently running: *) -let threads_count = ref 0 - -(* +-----------------------------------------------------------------+ - | Preemptive threads management | - +-----------------------------------------------------------------+ *) - -module CELL : -sig - type 'a t +let max_domains_queued = ref 128 - val make : unit -> 'a t - val get : 'a t -> 'a - val set : 'a t -> 'a -> unit -end = -struct - type 'a t = { - m : Mutex.t; - cv : Condition.t; - mutable cell : 'a option; - } +let get_max_number_of_domains_queued _ = + !max_domains_queued - let make () = { m = Mutex.create (); cv = Condition.create (); cell = None } +let set_max_number_of_domains_queued n = + if n < 0 then invalid_arg "Lwt_preemptive.set_max_number_of_domains_queued"; + max_domains_queued := n - let get t = - let rec await_value t = - match t.cell with - | None -> - Condition.wait t.cv t.m; - await_value t - | Some v -> - t.cell <- None; - Mutex.unlock t.m; - v - in - Mutex.lock t.m; - await_value t +let domains_count = ref 0 - let set t v = - Mutex.lock t.m; - t.cell <- Some v; - Mutex.unlock t.m; - Condition.signal t.cv -end +type task = + Task of (int * (unit -> unit)) + | Quit -type thread = { - task_cell: (int * (unit -> unit)) CELL.t; - (* Channel used to communicate notification id and tasks to the - worker thread. *) - - mutable thread : Thread.t; - (* The worker thread. *) - - mutable reuse : bool; - (* Whether the thread must be re-added to the pool when the work is - done. *) +type dom = { + task_chan : task C.t; + mutable domain: unit Domain.t } -(* Pool of worker threads: *) -let workers : thread Queue.t = Queue.create () +let workers : dom Queue.t = Queue.create () -(* Queue of clients waiting for a worker to be available: *) -let waiters : thread Lwt.u Lwt_sequence.t = Lwt_sequence.create () +let waiters : dom Lwt.u Lwt_sequence.t = Lwt_sequence.create () -(* Code executed by a worker: *) let rec worker_loop worker = - let id, task = CELL.get worker.task_cell in - task (); - (* If there is too much threads, exit. This can happen if the user - decreased the maximum: *) - if !threads_count > !max_threads then worker.reuse <- false; - (* Tell the main thread that work is done: *) - Lwt_unix.send_notification id; - if worker.reuse then worker_loop worker + match C.recv worker.task_chan with + | Task (id, task) -> + task (); + Lwt_unix.send_notification id; + worker_loop worker + | Quit -> () -(* create a new worker: *) let make_worker () = - incr threads_count; + incr domains_count; let worker = { - task_cell = CELL.make (); - thread = Thread.self (); - reuse = true; + task_chan = C.make_bounded 0; + domain = Domain.spawn (fun _ -> ()) } in - worker.thread <- Thread.create worker_loop worker; + worker.domain <- Domain.spawn(fun _ -> worker_loop worker); worker -(* Add a worker to the pool: *) let add_worker worker = match Lwt_sequence.take_opt_l waiters with | None -> @@ -128,57 +66,50 @@ let add_worker worker = | Some w -> Lwt.wakeup w worker -(* Wait for worker to be available, then return it: *) let get_worker () = if not (Queue.is_empty workers) then Lwt.return (Queue.take workers) - else if !threads_count < !max_threads then + else if !domains_count < !max_domains then Lwt.return (make_worker ()) else (Lwt.add_task_r [@ocaml.warning "-3"]) waiters -(* +-----------------------------------------------------------------+ - | Initialisation, and dynamic parameters reset | - +-----------------------------------------------------------------+ *) +let get_bounds () = (!min_domains, !max_domains) -let get_bounds () = (!min_threads, !max_threads) - -let set_bounds (min, max) = +let set_bounds (min, max) = if min < 0 || max < min then invalid_arg "Lwt_preemptive.set_bounds"; - let diff = min - !threads_count in - min_threads := min; - max_threads := max; - (* Launch new workers: *) - for _i = 1 to diff do + if (max < !domains_count) then begin + for _i = 1 to (!domains_count - max) do + let worker = Queue.take workers in + C.send worker.task_chan Quit + done; + end; + let diff = min - !domains_count in + min_domains := min; + max_domains := max; + + for _i = 1 to diff do add_worker (make_worker ()) done -let initialized = ref false +let initialized = ref false -let init min max _errlog = +let init min max _errlog = initialized := true; - set_bounds (min, max) + set_bounds (min, max) -let simple_init () = +let simple_init () = if not !initialized then begin initialized := true; set_bounds (0, 4) - end - -let nbthreads () = !threads_count -let nbthreadsqueued () = Lwt_sequence.fold_l (fun _ x -> x + 1) waiters 0 -let nbthreadsbusy () = !threads_count - Queue.length workers - -(* +-----------------------------------------------------------------+ - | Detaching | - +-----------------------------------------------------------------+ *) + end let init_result = Result.Error (Failure "Lwt_preemptive.detach") let detach f args = - simple_init (); + simple_init (); (*Initialise 4 domains*) let result = ref init_result in - (* The task for the worker thread: *) + (*Instead of notify, we can have a channel to send the result*) let task () = try result := Result.Ok (f args) @@ -192,66 +123,42 @@ let detach f args = (fun () -> Lwt.wakeup_result wakener !result) in Lwt.finalize - (fun () -> - (* Send the id and the task to the worker: *) - CELL.set worker.task_cell (id, task); - waiter) - (fun () -> - if worker.reuse then - (* Put back the worker to the pool: *) - add_worker worker - else begin - decr threads_count; - (* Or wait for the thread to terminates, to free its associated - resources: *) - Thread.join worker.thread - end; - Lwt.return_unit) - + (fun () -> + C.send worker.task_chan (Task (id, task)); + waiter) + (fun () -> + add_worker worker; + Lwt.return_unit) (*add worker*) + +let nbdomains () = !domains_count +let nbdomainsqueued () = Lwt_sequence.fold_l (fun _ x -> x + 1) waiters 0 +let nbdomainsbusy () = !domains_count - Queue.length workers (* +-----------------------------------------------------------------+ - | Running Lwt threads in the main thread | + | Running Lwt threads in the main domain | +-----------------------------------------------------------------+ *) -(* Queue of [unit -> unit Lwt.t] functions. *) -let jobs = Queue.create () - -(* Mutex to protect access to [jobs]. *) -let jobs_mutex = Mutex.create () +(* Jobs to be run in the main domain*) +let jobs = C.make_unbounded () -let job_notification = +let job_notification = Lwt_unix.make_notification (fun () -> - (* Take the first job. The queue is never empty at this - point. *) - Mutex.lock jobs_mutex; - let thunk = Queue.take jobs in - Mutex.unlock jobs_mutex; - ignore (thunk ())) + let thunk = C.recv jobs in + ignore (thunk ())) -(* There is a potential performance issue from creating a cell every time this - function is called. See: - https://github.com/ocsigen/lwt/issues/218 - https://github.com/ocsigen/lwt/pull/219 - http://caml.inria.fr/mantis/view.php?id=7158 *) let run_in_main f = - let cell = CELL.make () in - (* Create the job. *) - let job () = - (* Execute [f] and wait for its result. *) - Lwt.try_bind f + let res = ref init_result in + let job () = + Lwt.try_bind f (fun ret -> Lwt.return (Result.Ok ret)) (fun exn -> Lwt.return (Result.Error exn)) >>= fun result -> - (* Send the result. *) - CELL.set cell result; + res := result; Lwt.return_unit in - (* Add the job to the queue. *) - Mutex.lock jobs_mutex; - Queue.add job jobs; - Mutex.unlock jobs_mutex; - (* Notify the main thread. *) + C.send jobs job; + Lwt_unix.send_notification job_notification; - (* Wait for the result. *) - match CELL.get cell with - | Result.Ok ret -> ret + match !res with + | Result.Ok ret -> ret | Result.Error exn -> raise exn + \ No newline at end of file diff --git a/src/unix/lwt_preemptive.mli b/src/unix/lwt_preemptive.mli index 9f06f9b6..c5610819 100644 --- a/src/unix/lwt_preemptive.mli +++ b/src/unix/lwt_preemptive.mli @@ -3,20 +3,17 @@ -(** This module allows to mix preemptive threads with [Lwt] - cooperative threads. It maintains an extensible pool of preemptive - threads to which you can detach computations. - - See {{:https://github.com/hcarty/mwt} Mwt} for a more modern - implementation. *) +(** This module allows to mix multicore parallelism with [Lwt] + cooperative threads. It maintains an extensible pool of domains + to which you can detach computations. *) val detach : ('a -> 'b) -> 'a -> 'b Lwt.t - (** [detach f x] runs the computation [f x] in a separate preemptive thread. + (** [detach f x] runs the computation [f x] in a separate domain in parallel. [detach] evaluates to an Lwt promise, which is pending until the - preemptive thread completes. + domain completes execution. [detach] calls {!simple_init} internally, which means that the number of - preemptive threads is capped by default at four. If you would like a + domains is capped by default at four. If you would like a higher limit, call {!init} or {!set_bounds} directly. Note that Lwt thread-local storage (i.e., {!Lwt.with_value}) cannot be @@ -25,7 +22,7 @@ val detach : ('a -> 'b) -> 'a -> 'b Lwt.t val run_in_main : (unit -> 'a Lwt.t) -> 'a (** [run_in_main f] can be called from a detached computation to execute - [f ()] in the main preemptive thread, i.e. the one executing + [f ()] in the parent domain, i.e. the one executing {!Lwt_main.run}. [run_in_main f] blocks until [f ()] completes, then returns its result. If [f ()] raises an exception, [run_in_main f] raises the same exception. @@ -36,11 +33,11 @@ val run_in_main : (unit -> 'a Lwt.t) -> 'a val init : int -> int -> (string -> unit) -> unit (** [init min max log] initialises this module. i.e. it launches the - minimum number of preemptive threads and starts the {b + minimum number of domains and starts the {b dispatcher}. - @param min is the minimum number of threads - @param max is the maximum number of threads + @param min is the minimum number of domains + @param max is the maximum number of domains @param log is used to log error messages If {!Lwt_preemptive} has already been initialised, this call @@ -48,7 +45,7 @@ val init : int -> int -> (string -> unit) -> unit val simple_init : unit -> unit (** [simple_init ()] checks if the library is not yet initialized, and if not, - does a {i simple initialization}. The minimum number of threads is set to + does a {i simple initialization}. The minimum number of domains is set to zero, maximum to four, and the log function is left unchanged, i.e. the default built-in logging function is used. See {!Lwt_preemptive.init}. @@ -56,22 +53,22 @@ val simple_init : unit -> unit val get_bounds : unit -> int * int (** [get_bounds ()] returns the minimum and the maximum number of - preemptive threads. *) + domains. *) val set_bounds : int * int -> unit (** [set_bounds (min, max)] set the minimum and the maximum number - of preemptive threads. *) + of domains. *) -val set_max_number_of_threads_queued : int -> unit - (** Sets the size of the waiting queue, if no more preemptive - threads are available. When the queue is full, {!detach} will +val set_max_number_of_domains_queued : int -> unit + (** Sets the size of the waiting queue, if no more domains are available. + When the queue is full, {!detach} will sleep until a thread is available. *) -val get_max_number_of_threads_queued : unit -> int - (** Returns the size of the waiting queue, if no more threads are +val get_max_number_of_domains_queued : unit -> int + (** Returns the size of the waiting queue, if no more domains are available *) (**/**) -val nbthreads : unit -> int -val nbthreadsbusy : unit -> int -val nbthreadsqueued : unit -> int +val nbdomains : unit -> int +val nbdomainsbusy : unit -> int +val nbdomainsqueued : unit -> int diff --git a/src/unix/lwt_unix.cppo.ml b/src/unix/lwt_unix.cppo.ml index 89bb8422..cf0643ec 100644 --- a/src/unix/lwt_unix.cppo.ml +++ b/src/unix/lwt_unix.cppo.ml @@ -816,11 +816,11 @@ struct in loop io_vectors.prefix - external stub_iov_max : unit -> int = "lwt_unix_iov_max" + external stub_iov_max : unit -> int option = "lwt_unix_iov_max" let system_limit = if Sys.win32 then None - else Some (stub_iov_max ()) + else stub_iov_max () let check tag io_vector = let buffer_length = diff --git a/src/unix/unix_c/unix_iov_max.c b/src/unix/unix_c/unix_iov_max.c index 27ee4529..dfe7ca61 100644 --- a/src/unix/unix_c/unix_iov_max.c +++ b/src/unix/unix_c/unix_iov_max.c @@ -9,8 +9,24 @@ #define _GNU_SOURCE +#include +#include #include #include -CAMLprim value lwt_unix_iov_max(value unit) { return Val_int(IOV_MAX); } +CAMLprim value lwt_unix_iov_max(value unit) +{ + CAMLparam1(unit); + CAMLlocal1(res); + +#ifdef IOV_MAX + res = caml_alloc(1, 0); + Store_field(res, 0, Val_int(IOV_MAX)); +#else + res = Val_int(0); +#endif + + CAMLreturn(res); +} + #endif