Skip to content

Commit

Permalink
implement a simple lock for multicore
Browse files Browse the repository at this point in the history
  • Loading branch information
Sudha247 committed Sep 24, 2020
1 parent ee4e122 commit 23b2bb2
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 12 deletions.
2 changes: 1 addition & 1 deletion src/core/dune
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ let () = Jbuild_plugin.V1.send @@ {|
(synopsis "Monadic promises and concurrent I/O")
(wrapped false)
|} ^ preprocess ^ {|
(libraries bytes result seq)
(libraries bytes result seq domainslib)
(flags (:standard -w +A-29)))

(documentation
Expand Down
32 changes: 32 additions & 0 deletions src/core/lwt_lock.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
(** Locks for multicore*)
module C = Domainslib.Chan

type t = int C.t

let create_lock () =
let c = C.make_bounded 1 in
C.send c 0;
c

let acquire_lock lock =
C.recv lock |> ignore

let release_lock lock =
C.send lock 0

let is_locked lock =
let c = C.recv_poll lock in
match c with
| Some v ->
C.send lock v;
false
| None ->
true

let try_acquire lock =
match C.recv_poll lock with
| Some _ -> true
| None -> false

let try_release lock =
C.send_poll lock 0
9 changes: 9 additions & 0 deletions src/core/lwt_lock.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
(** Locks for m*)

type t
val create_lock : unit -> t
val acquire_lock : t -> unit
val release_lock : t -> unit
val is_locked : t -> bool
val try_acquire : t -> bool
val try_release : t -> bool
11 changes: 9 additions & 2 deletions src/core/lwt_mutex.ml
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,25 @@ type t = { mutable locked : bool; mutable waiters : unit Lwt.u Lwt_sequence.t }

let create () = { locked = false; waiters = Lwt_sequence.create () }

let l = Lwt_lock.create_lock ()

let lock m =
if m.locked then
(Lwt.add_task_r [@ocaml.warning "-3"]) m.waiters
else begin
Lwt_lock.acquire_lock l;
m.locked <- true;
Lwt_lock.release_lock l;
Lwt.return_unit
end

let unlock m =
if m.locked then begin
if Lwt_sequence.is_empty m.waiters then
m.locked <- false
if Lwt_sequence.is_empty m.waiters then begin
Lwt_lock.acquire_lock l;
m.locked <- false;
Lwt_lock.release_lock l
end
else
(* We do not use [Lwt.wakeup] here to avoid a stack overflow
when unlocking a lot of threads. *)
Expand Down
7 changes: 7 additions & 0 deletions src/core/lwt_mvar.ml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type 'a t = {
(* Threads waiting for a value *)
}

let lock = Lwt_lock.create_lock ()

let create_empty () =
{ mvar_contents = None;
writers = Lwt_sequence.create ();
Expand All @@ -60,6 +62,7 @@ let create v =
readers = Lwt_sequence.create () }

let put mvar v =
Lwt_lock.acquire_lock lock;
match mvar.mvar_contents with
| None ->
begin match Lwt_sequence.take_opt_l mvar.readers with
Expand All @@ -68,11 +71,13 @@ let put mvar v =
| Some w ->
Lwt.wakeup_later w v
end;
Lwt_lock.release_lock lock;
Lwt.return_unit
| Some _ ->
let (res, w) = Lwt.task () in
let node = Lwt_sequence.add_r (v, w) mvar.writers in
Lwt.on_cancel res (fun _ -> Lwt_sequence.remove node);
Lwt_lock.release_lock lock;
res

let next_writer mvar =
Expand All @@ -86,7 +91,9 @@ let next_writer mvar =
let take_available mvar =
match mvar.mvar_contents with
| Some v ->
Lwt_lock.acquire_lock lock;
next_writer mvar;
Lwt_lock.release_lock lock;
Some v
| None ->
None
Expand Down
27 changes: 23 additions & 4 deletions src/core/lwt_sequence.ml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type 'a node = {
external seq_of_node : 'a node -> 'a t = "%identity"
external node_of_seq : 'a t -> 'a node = "%identity"

let lock = Lwt_lock.create_lock ()
(* +-----------------------------------------------------------------+
| Operations on nodes |
+-----------------------------------------------------------------+ *)
Expand All @@ -28,14 +29,20 @@ let get node =
node.node_data

let set node data =
node.node_data <- data
Lwt_lock.acquire_lock lock;
node.node_data <- data;
Lwt_lock.release_lock lock

let remove node =
if node.node_active then begin
let is_locked = Lwt_lock.try_acquire lock in
node.node_active <- false;
let seq = seq_of_node node in
seq.prev.next <- seq.next;
seq.next.prev <- seq.prev
seq.next.prev <- seq.prev;
match is_locked with
| true -> Lwt_lock.release_lock lock
| false -> ()
end

(* +-----------------------------------------------------------------+
Expand All @@ -62,32 +69,40 @@ let length seq =
loop seq.next 0

let add_l data seq =
Lwt_lock.acquire_lock lock;
let node = { node_prev = seq; node_next = seq.next; node_data = data; node_active = true } in
seq.next.prev <- seq_of_node node;
seq.next <- seq_of_node node;
Lwt_lock.release_lock lock;
node

let add_r data seq =
Lwt_lock.acquire_lock lock;
let node = { node_prev = seq.prev; node_next = seq; node_data = data; node_active = true } in
seq.prev.next <- seq_of_node node;
seq.prev <- seq_of_node node;
Lwt_lock.release_lock lock;
node

let take_l seq =
if is_empty seq then
raise Empty
else begin
Lwt_lock.acquire_lock lock;
let node = node_of_seq seq.next in
remove node;
Lwt_lock.release_lock lock;
node.node_data
end

let take_r seq =
if is_empty seq then
raise Empty
else begin
Lwt_lock.acquire_lock lock;
let node = node_of_seq seq.prev in
remove node;
Lwt_lock.release_lock lock;
node.node_data
end

Expand All @@ -110,20 +125,24 @@ let take_opt_r seq =
end

let transfer_l s1 s2 =
Lwt_lock.acquire_lock lock;
s2.next.prev <- s1.prev;
s1.prev.next <- s2.next;
s2.next <- s1.next;
s1.next.prev <- s2;
s1.prev <- s1;
s1.next <- s1
s1.next <- s1;
Lwt_lock.release_lock lock

let transfer_r s1 s2 =
Lwt_lock.acquire_lock lock;
s2.prev.next <- s1.next;
s1.next.prev <- s2.prev;
s2.prev <- s1.prev;
s1.prev.next <- s2;
s1.prev <- s1;
s1.next <- s1
s1.next <- s1;
Lwt_lock.release_lock lock

let iter_l f seq =
let rec loop curr =
Expand Down
6 changes: 3 additions & 3 deletions src/unix/lwt_preemptive.ml
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ let max_domains : int ref = ref 0
(* Size of the waiting queue: *)
let max_domains_queued = ref 128

let get_max_number_of_threads_queued _ =
let get_max_number_of_domains_queued _ =
!max_domains_queued

let set_max_number_of_threads_queued n =
if n < 0 then invalid_arg "Lwt_preemptive.set_max_number_of_threads_queued";
let set_max_number_of_domains_queued n =
if n < 0 then invalid_arg "Lwt_preemptive.set_max_number_of_domains_queued";
max_domains_queued := n

let domains_count = ref 0
Expand Down
4 changes: 2 additions & 2 deletions src/unix/lwt_preemptive.mli
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ val set_bounds : int * int -> unit
(** [set_bounds (min, max)] set the minimum and the maximum number
of preemptive threads. *)

val set_max_number_of_threads_queued : int -> unit
val set_max_number_of_domains_queued : int -> unit
(** Sets the size of the waiting queue, if no more preemptive
threads are available. When the queue is full, {!detach} will
sleep until a thread is available. *)

val get_max_number_of_threads_queued : unit -> int
val get_max_number_of_domains_queued : unit -> int
(** Returns the size of the waiting queue, if no more threads are
available *)

Expand Down

0 comments on commit 23b2bb2

Please sign in to comment.