forked from ocaml/ocaml
-
Notifications
You must be signed in to change notification settings - Fork 1
/
join.ml
614 lines (514 loc) · 16.6 KB
/
join.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
(***********************************************************************)
(* *)
(* Objective Caml *)
(* *)
(* Luc Maranget, projet Moscova, INRIA Rocquencourt *)
(* *)
(* Copyright 2004 Institut National de Recherche en Informatique et *)
(* en Automatique. All rights reserved. This file is distributed *)
(* under the terms of the Q Public License version 1.0. *)
(* *)
(***********************************************************************)
(* $Id$ *)
open Printf
(*DEBUG*)let verbose =
(*DEBUG*) try int_of_string (Sys.getenv "VERBOSE") with | _ -> 0
(*DEBUG*)
(*DEBUG*)let debug_mutex = Mutex.create ()
(*DEBUG*)
(*DEBUG*)let debug lvl source msg =
(*DEBUG*) if verbose >= lvl then begin
(*DEBUG*) Mutex.lock debug_mutex ;
(*DEBUG*) eprintf "%s[%i]: %s\n" source (Thread.id (Thread.self ())) msg ;
(*DEBUG*) flush stderr ;
(*DEBUG*) Mutex.unlock debug_mutex
(*DEBUG*) end
(*DEBUG*)
(*DEBUG*)let debug1 = debug 1
(*DEBUG*)and debug2 = debug 2
(*DEBUG*)and debug3 = debug 3
(*
Active tasks.
A task is active when :
* running compiled code
* or in lib code, doomed to become inactive and not to create
other tasks.
A task is inactive when :
* suspended, awaiting synchronous reply
* simply finished (ie exit_thread is performed)
*)
let active_mutex = Mutex.create ()
and active_condition = Condition.create ()
and active = ref 1
and in_pool = ref 0
and pool_konts = ref 0
(* Number of threads devoted to join *)
(*DEBUG*)and nthreads = ref 1
(*DEBUG*)and suspended = ref 0
(*DEBUG*)and nthreads_mutex = Mutex.create()
let incr_locked r =
Mutex.lock nthreads_mutex ;
incr r ;
Mutex.unlock nthreads_mutex
and decr_locked r =
Mutex.lock nthreads_mutex ;
decr r ;
Mutex.unlock nthreads_mutex
let check_active () =
(*DEBUG*)debug2 "CHECK"
(*DEBUG*) (sprintf "active=%i, nthreads=%i, suspended=%i[%i,%i]"
(*DEBUG*) !active !nthreads !suspended !in_pool !pool_konts) ;
if !active <= 0 then Condition.signal active_condition
let become_inactive () =
Mutex.lock active_mutex ;
decr active ;
Mutex.unlock active_mutex ;
(* if active reaches 0, this cannot change, so we unlock now *)
check_active () ;
(* incr_active is performed by task creator or awaker *)
and incr_active () =
Mutex.lock active_mutex ;
incr active ;
Mutex.unlock active_mutex
(*************************************)
(* Real threads creation/destruction *)
(*************************************)
external thread_new : (unit -> unit) -> Thread.t = "caml_thread_new"
external thread_uncaught_exception : exn -> unit = "caml_thread_uncaught_exception"
let pool_size =
try
int_of_string (Sys.getenv "POOLSIZE")
with
| _ -> 10
and runmax =
try
Some (int_of_string (Sys.getenv "RUNMAX"))
with
| _ -> None
(*DEBUG*)let tasks_status () =
(*DEBUG*)sprintf "active=%i, nthread=%i suspended=%i[%i, %i]"
(*DEBUG*) !active !nthreads !suspended !in_pool
(*DEBUG*) (!pool_konts)
let really_exit_thread () =
decr_locked nthreads ;
(*DEBUG*)debug1 "REAL EXIT" (sprintf "nthreads=%i" !nthreads);
Thread.exit ()
(* Note: really_create_process
uses thread_new, to short-circuit handling of exceptions by Thread *)
exception MaxRun
let really_create_process f =
incr_locked nthreads ;
try
begin match runmax with
| Some k when !nthreads - !suspended > k -> raise MaxRun
| _ -> ()
end ;
let t = Thread.id (thread_new f) in
(*DEBUG*)debug1 "REAL FORK"
(*DEBUG*) (sprintf "%i nthread=%i suspended=%i[%i]"
(*DEBUG*) t !nthreads !suspended !in_pool) ;
ignore(t) ;
true
with
| e ->
(*DEBUG*)debug2 "REAL FORK FAILED"
(*DEBUG*) (sprintf "%s, %s" (tasks_status ()) (Printexc.to_string e)) ;
decr_locked nthreads ;
false
(****************)
(* Thread cache *)
(****************)
let pool_condition = Condition.create ()
and pool_mutex = Mutex.create ()
and pool_kont = ref []
let rec do_pool () =
incr in_pool ;
(*DEBUG*)incr_locked suspended ;
(*DEBUG*)debug2 "POOL SLEEP" (tasks_status ()) ;
Condition.wait pool_condition pool_mutex ;
(*DEBUG*)decr_locked suspended ;
(*DEBUG*)debug2 "POOL AWAKE" (tasks_status ()) ;
decr in_pool ;
match !pool_kont with
| f::rem ->
pool_kont := rem ; decr pool_konts ;
(*DEBUG*)debug2 "POOL RUN" (sprintf "%i" (Thread.id (Thread.self()))) ;
Mutex.unlock pool_mutex ;
f ()
| [] ->
do_pool ()
(* Get a chance to avoid suspending *)
let pool_enter () =
Mutex.lock pool_mutex ;
match !pool_kont with
| f::rem ->
pool_kont := rem ; decr pool_konts ;
Mutex.unlock pool_mutex ;
(*DEBUG*)debug2 "POOL FIRST RUN" (sprintf "%i" (Thread.id (Thread.self()))) ;
f ()
| [] ->
do_pool ()
let rec grab_from_pool delay =
Mutex.lock pool_mutex ;
if !in_pool > 0 then begin
Condition.signal pool_condition ;
Mutex.unlock pool_mutex
end else match !pool_kont with
| f::rem ->
pool_kont := rem ; decr pool_konts ;
Mutex.unlock pool_mutex ;
if not (really_create_process f) then begin
Mutex.lock pool_mutex ;
pool_kont := f :: !pool_kont ; incr pool_konts ;
Mutex.unlock pool_mutex ;
prerr_endline "Threads exhausted" ;
Thread.delay delay ;
grab_from_pool (1.0 +. delay)
end
| [] ->
Mutex.unlock pool_mutex
let exit_thread () =
(*DEBUG*)debug2 "EXIT THREAD" (tasks_status ()) ;
become_inactive () ;
if !in_pool >= pool_size && !active > !pool_konts then
really_exit_thread ()
else
pool_enter ()
let put_pool f =
Mutex.lock pool_mutex ;
pool_kont := f :: !pool_kont ; incr pool_konts ;
Condition.signal pool_condition ;
(*DEBUG*)debug2 "PUT POOL" (tasks_status ()) ;
Mutex.unlock pool_mutex
let create_process f =
(*DEBUG*)debug2 "CREATE_PROCESS" (tasks_status ()) ;
incr_active () ;
(* Wapper around f, to be sure to call my exit_thread *)
let g () =
begin try f ()
with e ->
flush stdout; flush stderr;
thread_uncaught_exception e
end ;
exit_thread () in
if !in_pool = 0 then begin
if not (really_create_process g) then put_pool g
end else begin
put_pool g
end
type queue = Obj.t list
(* set idx sets status bit idx, it answers true if
that bit status changes (ie it was unset) *)
type 'a status =
{ set : int -> bool ;
erase : int -> unit ;
includes : 'a -> bool ;
to_string : unit -> string ; }
type 'a automaton = {
status : 'a status ;
mutex : Mutex.t ;
queues : queue array ;
mutable matches : ('a reaction) array ;
names : Obj.t ;
}
and 'a reaction = 'a * int * (Obj.t -> Obj.t)
let put_queue auto idx a = auto.queues.(idx) <- a :: auto.queues.(idx)
let get_queue auto idx = match auto.queues.(idx) with
| [] -> assert false
| a::rem ->
auto.queues.(idx) <- rem ;
begin match rem with
| [] -> auto.status.erase idx
| _ -> ()
end ;
a
let int_ops () =
let me = ref 0 in
{
set = (fun i ->
let old_me = !me in
let new_me = old_me lor (1 lsl i) in
me := new_me ;
old_me <> new_me) ;
erase = (fun i -> me := !me land (lnot (1 lsl i))) ;
includes = (fun mask -> !me land mask = mask) ;
to_string = (fun () -> sprintf "%08x" !me) ;
}
let major i = i / 31
and minor i = i mod 31
let bv_ops nchans =
let nslots = (nchans + 30) / 31 in (* eh oui *)
let me = Array.create nslots 0 in
let set i =
let slot = major i and idx = minor i in
let old_me = me.(slot) in
let new_me = old_me lor (1 lsl idx) in
me.(slot) <- new_me ;
old_me <> new_me in
let erase i =
let slot = major i and idx = minor i in
me.(slot) <- me.(slot) land (lnot (1 lsl idx)) in
let rec do_includes mask slot =
if slot >= nslots then true
else
let m = mask.(slot) in
me.(slot) land m = m && do_includes mask (slot+1) in
let includes mask = do_includes mask 0 in
let rec do_to_string slot =
if slot >= nslots then []
else
Printf.sprintf "%x08" me.(slot)::
do_to_string (slot+1) in
let to_string () = String.concat "" (do_to_string 0) in
{
set = set ;
erase = erase ;
includes = includes ;
to_string = to_string ;
}
let empty_status nchans =
if nchans > 31 then
Obj.magic (int_ops ())
else
Obj.magic (bv_ops nchans)
let create_automaton_debug nchans names =
let status = empty_status nchans in
{
status = status ;
mutex = Mutex.create () ;
queues = Array.create nchans [] ;
matches = [| |] ;
names = names ;
}
let create_automaton nchans = create_automaton_debug nchans (Obj.magic 0)
let get_name auto idx = Obj.magic (Obj.field auto.names idx)
let patch_table auto t = auto.matches <- t
type kval = Start | Go of (unit -> Obj.t) | Ret of Obj.t
type continuation =
{ kmutex : Mutex.t ;
kcondition : Condition.t ;
mutable kval : kval }
(* Continuation mutex is automaton mutex *)
let kont_create auto =
{kmutex = auto.mutex ;
kcondition = Condition.create () ;
kval = Start}
(**********************)
(* Asynchronous sends *)
(**********************)
type 'a async =
Async of ('a automaton) * int
| Alone of ('a automaton) * int
let create_async auto i = Async (auto, i)
and create_async_alone auto g = Alone (auto, g)
(* Callbacks from compiled code *)
(* Transfert control to frozen principal thread *)
let kont_go k f =
incr_active () ;
(*DEBUG*)debug2 "KONT_GO" "" ;
k.kval <- Go f ;
Condition.signal k.kcondition ;
Mutex.unlock k.kmutex
(* Spawn new process *)
let fire_go auto f =
(*DEBUG*)debug3 "FIRE_GO" "" ;
Mutex.unlock auto.mutex ;
create_process f
(* Transfer control to current thread
can be called when send triggers a match in the async case
in thread-tail position *)
let just_go_async auto f =
(*DEBUG*)debug3 "JUST_GO_ASYNC" "" ;
Mutex.unlock auto.mutex ;
f ()
let rec attempt_match tail auto reactions idx i =
if i >= Obj.size reactions then begin
(*DEBUG*)debug3 "ATTEMPT FAILED" (sprintf "%s %s"
(*DEBUG*) (get_name auto idx) (auto.status.to_string ())) ;
Mutex.unlock auto.mutex
end else begin
let (ipat, iprim, f) = Obj.magic (Obj.field reactions i) in
if auto.status.includes ipat then
if iprim < 0 then begin
f (if tail then just_go_async else fire_go) (* f will unlock auto's mutex *)
end else begin
f kont_go
end
else
attempt_match tail auto reactions idx (i+1)
end
let direct_send_async auto idx a =
(*DEBUG*)debug3 "SEND_ASYNC" (sprintf "channel=%s, status=%s"
(*DEBUG*) (get_name auto idx) (auto.status.to_string ())) ;
(* Acknowledge new message by altering queue and status *)
Mutex.lock auto.mutex ;
put_queue auto idx a ;
if not (auto.status.set idx) then begin
(*DEBUG*)debug3 "SEND_ASYNC" (sprintf "Return: %s"
(*DEBUG*) (auto.status.to_string ())) ;
Mutex.unlock auto.mutex
end else begin
attempt_match false auto (Obj.magic auto.matches) idx 0
end
(* Optimize forwarders *)
and direct_send_async_alone auto g a =
(*DEBUG*) debug3 "SEND_ASYNC_ALONE" (sprintf "match %i" g) ;
let _,_,f = Obj.magic (Obj.field (Obj.magic auto.matches) g) in
create_process (fun () -> f a)
let send_async chan a = match chan with
| Async (auto, idx) -> direct_send_async auto idx a
| Alone (auto, g) -> direct_send_async_alone auto g a
let tail_direct_send_async auto idx a =
(*DEBUG*)debug3 "TAIL_ASYNC" (sprintf "channel %s, status=%s"
(*DEBUG*) (get_name auto idx) (auto.status.to_string ())) ;
(* Acknowledge new message by altering queue and status *)
Mutex.lock auto.mutex ;
put_queue auto idx a ;
if not (auto.status.set idx) then begin
(*DEBUG*)debug3 "TAIL_ASYNC" (sprintf "Return: %s"
(*DEBUG*) (auto.status.to_string ())) ;
Mutex.unlock auto.mutex
end else begin
attempt_match true auto (Obj.magic auto.matches) idx 0
end
(* Optimize forwarders *)
and tail_direct_send_async_alone auto g a =
(*DEBUG*) debug3 "TAIL_ASYNC_ALONE" (sprintf "match %i" g) ;
let _,_,f = Obj.magic (Obj.field (Obj.magic auto.matches) g) in
f a
let tail_send_async chan a = match chan with
| Async (auto, idx) -> tail_direct_send_async auto idx a
| Alone (auto, g) -> tail_direct_send_async_alone auto g a
(*********************)
(* Synchronous sends *)
(*********************)
(* No match was found *)
let kont_suspend k =
(*DEBUG*)debug3 "KONT_SUSPEND" (tasks_status ()) ;
(*DEBUG*)incr_locked suspended ;
become_inactive () ;
if !active = !pool_konts then grab_from_pool 0.1 ;
Condition.wait k.kcondition k.kmutex ;
(*DEBUG*)decr_locked suspended ;
Mutex.unlock k.kmutex ;
match k.kval with
| Go f ->
(*DEBUG*)debug3 "REACTIVATED" (tasks_status ()) ;
f ()
| Ret v ->
(*DEBUG*)debug3 "REPLIED" (tasks_status ()) ;
v
| Start -> assert false
(* Suspend current thread when some match was found *)
let suspend_for_reply k =
(*DEBUG*)incr_locked suspended ;
become_inactive () ;
Condition.wait k.kcondition k.kmutex ;
(*DEBUG*)decr_locked suspended ;
Mutex.unlock k.kmutex ;
match k.kval with
| Ret v ->
(*DEBUG*)debug3 "REPLIED" (tasks_status ()) ;
v
| Start|Go _ -> assert false
(* Transfert control to frozen principal thread and suspend current thread *)
let kont_go_suspend kme kpri f =
(*DEBUG*)debug2 "KONT_GO_SUSPEND" "" ;
(* awake principal *)
incr_active () ;
kpri.kval <- Go f ;
Condition.signal kpri.kcondition ;
suspend_for_reply kme
let just_go k f =
(*DEBUG*)debug3 "JUST_GO" "" ;
Mutex.unlock k.kmutex ;
f ()
(* Fire process and suspend : no principal name *)
let fire_suspend k _ f =
(*DEBUG*) debug2 "FIRE_SUSPEND" "" ;
create_process f ;
suspend_for_reply k
let rec attempt_match_sync idx kont auto reactions i =
if i >= Obj.size reactions then begin
(*DEBUG*)debug3 "SYNC ATTEMPT FAILED" (sprintf "%s %s"
(*DEBUG*) (get_name auto idx) (auto.status.to_string ())) ;
kont_suspend kont
end else begin
let (ipat, ipri, f) = Obj.magic (Obj.field reactions i) in
if auto.status.includes ipat then begin
if ipri < 0 then
f (fire_suspend kont) (* will create other thread *)
else if ipri = idx then begin
f just_go (* will continue evaluation *)
end else begin
f (kont_go_suspend kont) (* will awake principal thread *)
end
end else attempt_match_sync idx kont auto reactions (i+1)
end
let send_sync auto idx a =
(*DEBUG*) debug3 "SEND_SYNC" (sprintf "channel %s" (get_name auto idx)) ;
(* Acknowledge new message by altering queue and status *)
Mutex.lock auto.mutex ;
let kont = kont_create auto in
put_queue auto idx (Obj.magic (kont,a)) ;
if not (auto.status.set idx) then begin
(*DEBUG*)debug3 "SEND_SYNC" (sprintf "Return: %s"
(*DEBUG*) (auto.status.to_string ())) ;
kont_suspend kont
end else begin
attempt_match_sync idx kont auto (Obj.magic auto.matches) 0
end
(* Optimize forwarders *)
and send_sync_alone auto g a =
(*DEBUG*) debug3 "SEND_SYNC_ALONE" (sprintf "match %i" g) ;
let _,ipri,f = Obj.magic (Obj.field (Obj.magic auto.matches) g) in
if ipri >= 0 then begin
(*DEBUG*) debug3 "SEND_SYNC_ALONE" "direct" ;
f a
end else begin
(*DEBUG*) debug3 "SEND_SYNC_ALONE" "fire" ;
Mutex.lock auto.mutex ;
let k = kont_create auto in
fire_suspend k auto (fun () -> f (k,a))
end
let reply_to v k =
(*DEBUG*) debug3 "REPLY" (sprintf "%i" (Obj.magic v)) ;
Mutex.lock k.kmutex ;
k.kval <- Ret v ;
Condition.signal k.kcondition ;
Mutex.unlock k.kmutex
(********************************)
(* Management of initial thread *)
(********************************)
(* Called when all active tasks are waiting in thread pool *)
let from_pool () =
if !in_pool > 0 then begin
(*DEBUG*)debug1 "HOOK" "SHOULD PERPHAPS SIGNAL" ;
(* Condition.signal pool_condition *) ()
end else begin (* Create a new thread to enter pool *)
(*DEBUG*)debug1 "HOOK" "CREATE" ;
incr_active () ;
let b = really_create_process exit_thread in
(*DEBUG*)debug1 "HOOK" (if b then "PROCESS CREATED" else "FAILED");
if not b then begin
prerr_endline "Threads are exhausted, good bye !"
end
end
let rec exit_hook () =
(*DEBUG*)debug1 "HOOK" "enter" ;
(*DEBUG*)decr_locked nthreads ;
Mutex.lock active_mutex ;
decr active ;
begin if !active > 0 then begin
if !pool_konts = !active then begin
Mutex.unlock active_mutex ;
from_pool ()
end ;
(*DEBUG*)debug1 "HOOK" "suspend" ;
Condition.wait active_condition active_mutex
end else
Mutex.unlock active_mutex
end ;
(*DEBUG*)debug1 "HOOK" "over" ;
()
let _ = at_exit exit_hook