@@ -166,13 +166,17 @@ module Multicore_backend (Backend : No_device_backend) : Backend = struct
166166 type task_list = Tnode .task Utils .mutable_list [@@ deriving sexp_of ]
167167
168168 module Mut = Stdlib. Mutex
169+ module Queue = Saturn_lockfree. Single_prod_single_cons_queue
170+
171+ type task_queue = Tnode .task Queue .t
172+
173+ let sexp_of_task_queue q =
174+ Sexp. (List [ Atom " task_queue_of_size" ; Atom (Int. to_string @@ Queue. size q) ])
169175
170176 type device_state = {
171177 mutable keep_spinning : bool ;
172178 mutable device_error : exn option ;
173- mutable host_pos : task_list ;
174- mutable dev_pos : task_list ;
175- mutable dev_previous_pos : task_list ;
179+ queue : task_queue ;
176180 mut : (Mut .t [@ sexp.opaque]);
177181 host_wait_for_idle : (Stdlib.Condition .t [@ sexp.opaque]);
178182 dev_wait_for_work : (Stdlib.Condition .t [@ sexp.opaque]);
@@ -201,7 +205,7 @@ module Multicore_backend (Backend : No_device_backend) : Backend = struct
201205
202206 let expected_merge_node (code : code ) = Backend. expected_merge_node code
203207 let expected_merge_nodes (codes : code_batch ) = Backend. expected_merge_nodes codes
204- let is_dev_queue_empty state = Utils. (is_empty @@ tl_exn state.dev_previous_pos)
208+ let is_dev_queue_empty state = Queue. size state.queue = 0
205209 let is_idle device = is_dev_queue_empty device.state && device.state.is_ready
206210 let name = " multicore " ^ Backend. name
207211
@@ -227,7 +231,9 @@ module Multicore_backend (Backend : No_device_backend) : Backend = struct
227231 Option. iter d.device_error ~f: (fun e ->
228232 Exn. reraise e @@ name ^ " device " ^ Int. to_string device.ordinal);
229233 if not d.keep_spinning then invalid_arg " Multicore_backend: device not available" ;
230- d.host_pos < - Utils. insert ~next: task d.host_pos;
234+ if not @@ Queue. try_push d.queue task then (
235+ await device;
236+ Queue. push_exn d.queue task);
231237 if d.is_ready then (
232238 Mut. lock d.mut;
233239 Stdlib.Condition. broadcast d.dev_wait_for_work;
@@ -237,22 +243,11 @@ module Multicore_backend (Backend : No_device_backend) : Backend = struct
237243
238244 let % track3_l_sexp spinup_device ~(ordinal : int ) : device =
239245 Int. incr global_run_no;
240- let init_pos =
241- Utils. Cons
242- {
243- hd =
244- Tnode. Task
245- { context_lifetime = () ; description = " root of task queue" ; work = (fun () -> () ) };
246- tl = Empty ;
247- }
248- in
249246 let state =
250247 {
251248 keep_spinning = true ;
252249 device_error = None ;
253- host_pos = init_pos;
254- dev_pos = Empty ;
255- dev_previous_pos = init_pos;
250+ queue = Queue. create ~size_exponent: 12 ;
256251 mut = Mut. create () ;
257252 is_ready = false ;
258253 host_wait_for_idle = Stdlib.Condition. create () ;
@@ -264,8 +259,8 @@ module Multicore_backend (Backend : No_device_backend) : Backend = struct
264259 assert (not @@ Domain. is_main_domain () );
265260 try
266261 while state.keep_spinning do
267- match state.dev_pos with
268- | Empty ->
262+ match Queue. pop_opt state.queue with
263+ | None ->
269264 Mut. lock state.mut;
270265 if is_dev_queue_empty state && state.keep_spinning then (
271266 state.is_ready < - true ;
@@ -274,12 +269,8 @@ module Multicore_backend (Backend : No_device_backend) : Backend = struct
274269 Stdlib.Condition. wait state.dev_wait_for_work state.mut
275270 done ;
276271 state.is_ready < - false );
277- state.dev_pos < - Utils. tl_exn state.dev_previous_pos;
278272 Mut. unlock state.mut
279- | Cons { hd; tl } ->
280- Tnode. run hd;
281- state.dev_previous_pos < - state.dev_pos;
282- state.dev_pos < - tl
273+ | Some task -> Tnode. run task
283274 done
284275 with e ->
285276 state.device_error < - Some e;
0 commit comments