Skip to content

Commit ccf03e0

Browse files
committed
Revert to using our own SPSC unlimited FIFO queue
1 parent 418af7c commit ccf03e0

File tree

4 files changed

+26
-21
lines changed

4 files changed

+26
-21
lines changed

arrayjit.opam

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ depends: [
2424
"ocannl_npy"
2525
"stdio"
2626
"num"
27-
"saturn_lockfree"
2827
"ppxlib"
2928
"ppx_jane"
3029
"ppx_expect"

arrayjit/lib/backends.ml

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -166,17 +166,13 @@ module Multicore_backend (Backend : No_device_backend) : Backend = struct
166166
type task_list = Tnode.task Utils.mutable_list [@@deriving sexp_of]
167167

168168
module Mut = Stdlib.Mutex
169-
module Queue = Saturn_lockfree.Single_prod_single_cons_queue
170-
171-
type task_queue = Tnode.task Queue.t
172-
173-
let sexp_of_task_queue q =
174-
Sexp.(List [ Atom "task_queue_of_size"; Atom (Int.to_string @@ Queue.size q) ])
175169

176170
type device_state = {
177171
mutable keep_spinning : bool;
178172
mutable device_error : exn option;
179-
queue : task_queue;
173+
mutable host_pos : task_list;
174+
mutable dev_pos : task_list;
175+
mutable dev_previous_pos : task_list;
180176
mut : (Mut.t[@sexp.opaque]);
181177
host_wait_for_idle : (Stdlib.Condition.t[@sexp.opaque]);
182178
dev_wait_for_work : (Stdlib.Condition.t[@sexp.opaque]);
@@ -205,7 +201,7 @@ module Multicore_backend (Backend : No_device_backend) : Backend = struct
205201

206202
let expected_merge_node (code : code) = Backend.expected_merge_node code
207203
let expected_merge_nodes (codes : code_batch) = Backend.expected_merge_nodes codes
208-
let is_dev_queue_empty state = Queue.size state.queue = 0
204+
let is_dev_queue_empty state = Utils.(is_empty @@ tl_exn state.dev_previous_pos)
209205
let is_idle device = is_dev_queue_empty device.state && device.state.is_ready
210206
let name = "multicore " ^ Backend.name
211207

@@ -231,9 +227,7 @@ module Multicore_backend (Backend : No_device_backend) : Backend = struct
231227
Option.iter d.device_error ~f:(fun e ->
232228
Exn.reraise e @@ name ^ " device " ^ Int.to_string device.ordinal);
233229
if not d.keep_spinning then invalid_arg "Multicore_backend: device not available";
234-
if not @@ Queue.try_push d.queue task then (
235-
await device;
236-
Queue.push_exn d.queue task);
230+
d.host_pos <- Utils.insert ~next:task d.host_pos;
237231
if d.is_ready then (
238232
Mut.lock d.mut;
239233
Stdlib.Condition.broadcast d.dev_wait_for_work;
@@ -243,11 +237,22 @@ module Multicore_backend (Backend : No_device_backend) : Backend = struct
243237

244238
let%track3_l_sexp spinup_device ~(ordinal : int) : device =
245239
Int.incr global_run_no;
240+
let init_pos =
241+
Utils.Cons
242+
{
243+
hd =
244+
Tnode.Task
245+
{ context_lifetime = (); description = "root of task queue"; work = (fun () -> ()) };
246+
tl = Empty;
247+
}
248+
in
246249
let state =
247250
{
248251
keep_spinning = true;
249252
device_error = None;
250-
queue = Queue.create ~size_exponent:12;
253+
host_pos = init_pos;
254+
dev_pos = Empty;
255+
dev_previous_pos = init_pos;
251256
mut = Mut.create ();
252257
is_ready = false;
253258
host_wait_for_idle = Stdlib.Condition.create ();
@@ -259,8 +264,8 @@ module Multicore_backend (Backend : No_device_backend) : Backend = struct
259264
assert (not @@ Domain.is_main_domain ());
260265
try
261266
while state.keep_spinning do
262-
match Queue.pop_opt state.queue with
263-
| None ->
267+
match state.dev_pos with
268+
| Empty ->
264269
Mut.lock state.mut;
265270
if is_dev_queue_empty state && state.keep_spinning then (
266271
state.is_ready <- true;
@@ -269,8 +274,12 @@ module Multicore_backend (Backend : No_device_backend) : Backend = struct
269274
Stdlib.Condition.wait state.dev_wait_for_work state.mut
270275
done;
271276
state.is_ready <- false);
277+
state.dev_pos <- Utils.tl_exn state.dev_previous_pos;
272278
Mut.unlock state.mut
273-
| Some task -> Tnode.run task
279+
| Cons { hd; tl } ->
280+
Tnode.run hd;
281+
state.dev_previous_pos <- state.dev_pos;
282+
state.dev_pos <- tl
274283
done
275284
with e ->
276285
state.device_error <- Some e;
@@ -594,12 +603,11 @@ module Pipes_multicore_backend (Backend : No_device_backend) : Backend = struct
594603
| Empty ->
595604
let _host_released : bool = host_wait_for_idle.release_if_waiting () in
596605
let _could_wait : bool = wait_by_dev () in
597-
(* not _host_released && not _could_wait: we busy-loop until host processes its release. *)
598-
(* [%log "WORK WHILE LOOP: EMPTY AFTER WAIT -- dev pos:", (state.dev_pos : task_list)]; *)
606+
(* not _host_released && not _could_wait: we busy-loop until host processes its
607+
release. *)
599608
state.dev_pos <- Utils.tl_exn state.dev_previous_pos
600609
| Cons { hd; tl } ->
601610
Tnode.run hd;
602-
(* [%log "WORK WHILE LOOP: AFTER WORK"]; *)
603611
state.dev_previous_pos <- state.dev_pos;
604612
state.dev_pos <- tl
605613
done

arrayjit/lib/dune

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
str
1212
ctypes
1313
ctypes.foreign
14-
saturn_lockfree
1514
(select
1615
gcc_backend.ml
1716
from

dune-project

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@
6565
ocannl_npy
6666
stdio
6767
num
68-
saturn_lockfree
6968
ppxlib
7069
ppx_jane
7170
ppx_expect

0 commit comments

Comments
 (0)