Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Newer
Older
100644 596 lines (484 sloc) 17.704 kb
fccc685 Initial open-source release
MLstate authored
1 (*
2 Copyright © 2011 MLstate
3
6fac5ce @Aqua-Ye [cleanup] ocamllib: typo on Opa
Aqua-Ye authored
4 This file is part of Opa.
fccc685 Initial open-source release
MLstate authored
5
6fac5ce @Aqua-Ye [cleanup] ocamllib: typo on Opa
Aqua-Ye authored
6 Opa is free software: you can redistribute it and/or modify it under the
fccc685 Initial open-source release
MLstate authored
7 terms of the GNU Affero General Public License, version 3, as published by
8 the Free Software Foundation.
9
6fac5ce @Aqua-Ye [cleanup] ocamllib: typo on Opa
Aqua-Ye authored
10 Opa is distributed in the hope that it will be useful, but WITHOUT ANY
fccc685 Initial open-source release
MLstate authored
11 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for
13 more details.
14
15 You should have received a copy of the GNU Affero General Public License
6fac5ce @Aqua-Ye [cleanup] ocamllib: typo on Opa
Aqua-Ye authored
16 along with Opa. If not, see <http://www.gnu.org/licenses/>.
fccc685 Initial open-source release
MLstate authored
17 *)
18
19 #<Debugvar:RESOURCE_TRACKER_DEBUG>
20
21 type ste_private
22 type msg_private
23 type res_private
24
25 type key = int
26
27 type signal =
28 [ `Expired
29 | `Killed
30 | `Closed
31 | `Collected]
32
33 type ('args,'res) sync =
34 | Sync of ('args -> 'res)
35 | ASync of ('args -> ('res -> unit) -> unit)
36
37 let proj_sync f1 f2 = function
38 | Sync f -> Sync (fun x -> (f2 (f (f1 x))))
39 | ASync f -> ASync (fun x k -> f (f1 x) (fun r -> k(f2 r)))
40
41 type cbs =
42 | CbNormal of (key ref * (b * signal ,unit) sync)
43 | CbLight of (signal,unit) sync
44 | Nop
45
46 and ('m,'r,'s) h = {
47 name:string;
48 on_delete: ('s*signal,unit) sync;
49 on_message: (('m,'r)t * 's * 'm, 's * 'r) sync;
50 expire: ('s,signal option) sync;
51 decide: (('m,'r)t * 's * signal, unit) sync;
52 }
53
54 and ('m,'r,'s) t_private = {
55 id : key ref;
56 rh : ('m,'r,'s) h;
57 mutable cbs : cbs ref list;
58 mutable cbs_ref : cbs ref list;
59 mutable gp : g;
60 mutable state : 's;
61 msgs : ('m*('r->unit)*((exn -> unit) option)) Queue.t;
62 mutable rsize : int;
63 mutable active: bool;
64 }
65
66 and ('m,'r)t = ('m,'r,ste_private) t_private
67
68 and b = (msg_private,res_private,ste_private) t_private
69
70 and g' = {
71 mutable mem: key ref list;
72 mutable size: int;
73 }
74
75 and g = g' ref
76
77 exception Not_Alive
78 exception Async_Call
79
80 external black : (_,_) t -> b = "%identity"
81 external grey : ('a,'b,_) t_private -> ('a,'b) t= "%identity"
82 external unblack : b -> (_,_) t = "%identity"
83
84 external unblack_state : ste_private -> _ = "%identity"
85
86 module Manager : sig
87
88 type m
89
90 val make : unit -> m
91
92 val default : m
93
94 val add : m -> (_,_) t -> unit
95
96 val get : m -> key -> b option
97
98 val remove : m -> (_,_) t -> unit
99
100 val fold : m -> ('a -> b -> 'a) -> 'a -> 'a
101
102 end
103 =
104 struct
105
106 module W = WeakResArray
107 module C = Counter
108 type m =
109 {
110 table:b W.t;
111 counter: C.t;
112 }
113
114 let make () =
115 {
116 table=W.create 1024;
117 counter=C.make 1024;
118 }
119
120 (* Default Resource manager *)
121 let default = make ()
122
123 let add vrm r =
124 let key = C.get_key vrm.counter in
125 W.set vrm.table key (Some (black r));
126 r.id:=key
127
128 let get vrm i = W.get vrm.table i
129
130 let remove vrm r = W.remove vrm.table !(r.id); C.release_key vrm.counter !(r.id)
131
132 let fold vrm f a = W.fold_left f a vrm.table
133
134 end
135
136 module type M = sig
137
138 (* module Group : sig *)
139
140 (* val make : unit -> g *)
141
142 (* val empty : g *)
143
144 (* val merge : g -> g -> g *)
145
146 (* val register : (_,_) t -> g -> unit *)
147
148 (* val iter : (b -> unit) -> g -> unit *)
149
150 (* end *)
151
152 module Expire : sig
153 type 'a t = 'a state -> signal option * Time.t option
154 and context =
155 [ `Date of Time.t
156 | `Timeout of Time.t
157 | `Limit of int
158 | `And of context list
159 | `Or of context list ]
160 and 'a state =
161 {
162 mutable limit: int;
163 mutable last_use: Time.t;
164 mutable cancel: Scheduler.async_key option;
165 mutable state: 'a;
166 }
167 val init : 'a -> 'a state
168 val make : context -> 'a t
169 end
170
171 (* val handler' : *)
172 (* string -> *)
173 (* ( 's * signal , unit ) sync -> *)
174 (* ( 's , signal option ) sync -> *)
175 (* (('m,'r)t*'s*'m , 's*'r ) sync -> *)
176 (* (('m,'r)t*'s*signal , unit) sync -> *)
177 (* ('m,'r,'s) h *)
178
179 val handler :
180 string ->
181 ( 's-> signal -> unit ) ->
182 ( 's -> signal option ) ->
183 (('m,'r)t -> 's -> 'm -> 's*'r ) ->
184 (('m,'r)t -> 's -> signal -> unit) ->
185 ('m,'r,'s) h
186
187 val handler_cps :
188 string ->
189 ( 's -> signal -> (unit -> unit) -> unit ) ->
190 ( 's -> (signal option -> unit ) -> unit ) ->
191 (('m,'r)t -> 's -> 'm -> ( ('s*'r) -> unit ) -> unit ) ->
192 (('m,'r)t -> 's -> signal -> (unit -> unit) -> unit) ->
193 ('m,'r,'s) h
194
195 val handler_timer :
196 string ->
197 ('s -> signal -> unit) ->
198 's Expire.t ->
199 (('m,'r) t -> 's -> 'm -> 's * 'r) ->
200 (('m, 'r) t -> 's -> signal -> unit) ->
201 ('m, 'r, 's Expire.state) h
202
203 val resource : ('m,'r,'s) h -> 's -> ?depends: b list -> unit -> ('m,'r) t
204
205 val resource_timer : ('m,'r,'s Expire.state) h -> 's -> ?depends: b list -> unit -> ('m,'r)t
206
207 val expire : (_,_)t -> signal option
208
209 val expire_cps : (_,_)t -> ?err_cont:(exn -> unit) -> (signal option -> unit) -> unit
210
211 val call : ('m,'r)t -> 'm -> 'r
212
213 val call_cps : ('m,'r)t -> 'm -> ?err_cont:(exn -> unit) -> ('r -> unit) -> unit
214
215 val kill : (_,_)t -> signal -> unit
216
217 val alive : (_,_)t -> bool
218
219 val collect : unit -> unit
220
221 val register : (_,_)t -> (signal -> unit) -> unit
222
223 val register_cps : (_,_)t -> (signal -> unit) -> unit
224
225 end
226
227 let string_of_signal = function
228 | `Expired -> "Expired"
229 | `Killed -> "Killed"
230 | `Closed -> "Closed"
231 | `Collected -> "Collected"
232
233 let make sched manager =
234 let module Implem =
235 struct
236 module Group =
237 struct
238
239 let make () = ref {mem=[];size=0}
240
241 let empty = make ()
242
243 let merge c1 c2 =
244 if c1 == empty then (c1:=!c2;c2)
245 else if c2 == empty then (c2:=!c1;c1)
246 else
247 let mem = !(c1).mem@(!(c2).mem) in
248 let size = !(c1).size + !(c2).size in
249 ((!c1).mem <- mem;
250 (!c1).size <- size;
251 c2:=!c1;
252 c1)
253
254 let register r c =
255 (!c).mem <- r.id::(!c).mem;
256 (!c).size <- (!c).size +1
257
258
259 let iter f g =
260 let k id =
261 if !id >= 0
262 then match Manager.get manager !id with
263 | Some r -> f r
264 | None -> ()
265
266 in List.iter k (!g).mem
267
268 end
269
270 module Expire = struct
271 type 'a t = 'a state -> signal option * Time.t option
272 and context =
273 [ `Date of Time.t
274 | `Timeout of Time.t
275 | `Limit of int
276 | `And of context list
277 | `Or of context list ]
278 and 'a state =
279 {
280 mutable limit: int;
281 mutable last_use: Time.t;
282 mutable cancel:Scheduler.async_key option;
283 mutable state: 'a;
284 }
285
286 let init x =
287 {limit=0;last_use=Time.now ();cancel=None;state=x}
288
289 let rec make (c:context) : ('a state -> signal option * Time.t option) = match c with
290 | `Date d -> (fun _ ->
291 let t = Time.difference (Time.now()) d in
292 if Time.is_positive t
293 then None,Some t
294 else (Some `Expired),None)
295 | `Timeout t -> (fun i ->
296 let dur = Time.difference i.last_use (Time.now ()) in
297 let d = Time.difference dur t in
298 if Time.is_positive d
299 then None,Some d
300 else (Some `Expired),None)
301 | `Limit l -> (fun i -> if i.limit >= l then (Some `Expired),None else None,None)
302 | `And [] -> (fun _ -> None,None)
303 | `And (x::[]) -> make x
304 | `And (x::xs) ->
305 let f1 = make x in
306 let f2 = make (`And xs) in
307 (fun i -> match f1 i with
308 | Some _,_ -> f2 i
309 | None,t -> None,t)
310 | `Or [] -> (fun _ -> None,None)
311 | `Or (x::[]) -> make x
312 | `Or (x::xs) ->
313 let f1 = make x in
314 let f2 = make (`Or xs) in
315 (fun i -> match f1 i with
316 | Some x,t -> Some x,t
317 | None,None -> f2 i
318 | None,Some t1 -> match f2 i with
319 | a,Some t2 -> a,Some(max t1 t2)
320 | a,None -> a,Some t1)
321 end
322
323 let alive r = !(r.id) <> -1
324
325
326
327 let kill r s =
328 if alive r
329 then
330 begin
331 #<If> Logger.debug "[ResourceTracker] Remove '%s %d' with signal '%s'" r.rh.name !(r.id) (string_of_signal s) #<End>;
332 (* prevent from calling kill function twice *)
333 Manager.remove manager r;
334 r.id:=-1;
335 Queue.iter (fun (_,_,k) -> Option.iter (fun f -> f Not_Alive) k) r.msgs;
336 Queue.clear r.msgs;
337 (* first cancel all registered callbacks *)
338 let _ = List.iter (fun cb -> cb:=Nop) r.cbs_ref in
339 (* then inform the connected resources *)
340 let _ = List.iter (fun r -> match !r with
341 | Nop -> ()
342 | CbLight f ->
343 begin
344 match f with
345 | Sync f -> f s
346 | ASync f -> Scheduler.push sched (fun () -> f s (fun _ -> ()))
347 end
348 | CbNormal(id,f) ->
349 if !id >= 0
350 then
351 let r = Manager.get manager !id in
352 begin
353 match r,f with
354 | None,_ -> ()
355 | Some r,Sync f -> f (r,s)
356 | Some r,ASync f ->
357 Scheduler.push sched (fun () -> f (r,s) (fun _->()) )
358 end
359 ) r.cbs in
360 (* kill the current resource *)
361 match r.rh.on_delete with
362 | Sync f -> f (r.state,s)
363 | ASync f -> Scheduler.push sched (fun () -> f (r.state,s) (fun _ -> ()))
364 end
365 else
366 raise Not_Alive
367
368
369 let call r m =
370 if alive r
371 then
372 match r.rh.on_message with
373 | Sync f ->
374 let state,result = f (r,r.state,m) in
375 r.state <- state;
376 result
377 | ASync f ->
378 let b = ref None in
379 f (r,r.state,m) (fun res -> b:=Some res);
380 match !b with
381 | None -> raise Async_Call
382 | Some (state,result)-> r.state <- state;result
383 else
384 raise Not_Alive
385
386 let call_cps r m ?err_cont cont =
387 if alive r
388 then
389 match r.rh.on_message with
390 | Sync f ->
391 let state,result = f (r,r.state,m) in
392 r.state <- state;
393 Scheduler.push sched (fun () -> cont result)
394 | ASync f ->
395 Queue.add (m, cont, err_cont) r.msgs;
396 if r.active
397 then r.active <- false;
398 let rec process () =
399 if Queue.is_empty r.msgs
400 then r.active <- true
401 else
402 let message,cont,_ = Queue.pop r.msgs in
403 f (r,r.state,message) (fun (state,result) ->
404 r.state <- state;
405 Scheduler.push sched process;
406 Scheduler.push sched (fun _ -> cont result)
407 )
408 in process ()
409
410 else Option.iter (fun f -> f Not_Alive) err_cont
411
412 (* let project_handler (proj_msg:'a-> 'c) (proj_res:'b->'d) (h:('a,'b,'s)h) : ('c,'d,'s) h = *)
413 (* let on_message = *)
414 (* proj_sync *)
415 (* (fun (r,s,m) -> (r,s,proj_msg m)) *)
416 (* (fun (s,r) -> (s,proj_res r)) *)
417 (* (fun (r,s,m) -> call r (proj_msg m) h.on_message) *)
418 (* in { h with on_message} *)
419
420 (* let project_resource (proj_msg:'a-> 'c) (proj_res:'b->'d) (r:('a,'b,'s)t) : ('c,'d,'s) t = *)
421 (* let rh = project_handler proj_msg proj_res r.rh *)
422 (* in { r with rh } *)
423
424
425 let register r cb =
426 if alive r
427 then
428 (r.cbs <- (ref (CbLight (Sync cb)))::r.cbs;
429 r.rsize<-succ r.rsize)
430 else
431 cb `Closed
432
433 let register_cps r cb =
434 let cb s _ = cb s in
435 if alive r
436 then
437 (r.cbs <- (ref (CbLight (ASync cb)))::r.cbs;
438 r.rsize <-succ r.rsize)
439 else
440 Scheduler.push sched (fun _ -> cb `Closed (fun () -> ()))
441
442 let expire r =
443 if alive r
444 then
445 match r.rh.expire with
446 | Sync f -> f r.state
447 | ASync f ->
448 let b = ref None in
449 f r.state (fun res -> b:=Some res);
450 match !b with
451 | None -> raise Async_Call
452 | Some (exp)-> exp
453 else
454 raise Not_Alive
455
456 let expire_cps r ?err_cont cont =
457 if alive r
458 then
459 match r.rh.expire with
460 | Sync f ->
461 let res = f r.state in
462 Scheduler.push sched (fun () -> cont res)
463 | ASync f -> f r.state cont
464 else Option.iter (fun f -> f Not_Alive) err_cont
465
466 let collect () =
467 let _exp,_nexp,_rem = Manager.fold manager
468 (fun (exp,nexp,rem) r ->
469 (match expire r with
470 None -> (exp,nexp+1,rem)
471 | Some e -> ignore(kill r e); (exp+1,nexp,r::rem)))
472 (0,0,[])
473 in ()
474
475
476 (* Create a resource and add it to Manager *)
477 (* You must provide ... *)
478 let resource rh state ?depends () : ('a,'b) t =
479 let depends = Option.default [] depends in
480 let depends_group = depends in
481 let gp = List.fold_left
482 (fun group parent -> Group.merge group parent.gp) Group.empty depends_group in
483 let resource = {
484 id= ref (-1);
485 rh;
486 cbs=[];
487 cbs_ref=[];
488 gp;
489 state;
490 msgs=Queue.create();
491 rsize=0;
492 active=true;
493 } in
494 let blackresource = grey resource in
495 (* if register then ignore(Group.register blackresource group); *)
496 Manager.add manager blackresource;
497 #<If> Logger.debug "[ResourceTracker] Create %s [%d]" rh.name
498 !(resource.id) #<End>;
499 let _ =
500 let f = match rh.decide with
501 | Sync f ->
502 (Sync(fun ((r:b),(s:signal)) ->
503 #<If> Logger.debug "[ResourceTracker] Remove dependent resource [%d]" !(r.id) #<End>;
504 f (unblack r,unblack_state r.state,s)))
505 | ASync f ->
506 (ASync(fun ((r:b),(s:signal)) _ ->
507 Scheduler.push sched (fun _ -> f (unblack r,unblack_state r.state,s) (fun _ -> ()) )))
508 in
509 let cb = ref (CbNormal(resource.id,f)) in
510 List.iter
511 (fun (p) ->
512 p.cbs <- cb::p.cbs;
513 resource.cbs_ref <- cb::resource.cbs_ref)
514 depends
515 in
516 Scheduler.finalise sched (fun r -> kill r `Collected) blackresource;
517 blackresource
518
519 let handler' name on_delete expire on_message decide = {
520 name;
521 on_delete;
522 on_message;
523 expire;
524 decide;
525 }
526
527 let c2 f (a,b)= f a b
528 let c3 f (a,b,c)=f a b c
529
530 let handler name on_delete expire on_message decide =
531 handler' name (Sync (c2 on_delete)) (Sync expire) (Sync (c3 on_message)) (Sync (c3 decide))
532
533 let handler_cps name on_delete expire on_message decide =
534 handler' name (ASync (c2 on_delete)) (ASync expire) (ASync (c3 on_message)) (ASync (c3 decide))
535
536
537 let resource_timer rh state ?depends () =
538 resource rh (Expire.init state) ?depends ()
539
540 let handler_timer name kill_fun expire_fun on_message_fun decide_fun =
541
542 let cancel_timeout s = match s.Expire.cancel with
543 Some key -> Scheduler.abort sched key; s.Expire.cancel <- None
544 | None -> () in
545
546 let create_timeout s r = function
547 | Some next ->
548 let key = Scheduler.sleep sched next (fun _ -> kill r `Expired) in
549 s.Expire.cancel<- Some key
550 | None -> () in
551
552 let update s =
553 s.Expire.last_use <- Time.now ();
554 s.Expire.limit <- s.Expire.limit+1 in
555
556 let set_timeout r exp s =
557 match exp with
558 | Some signal,_->
559 cancel_timeout s;
560 kill r signal
561 | None,next ->
562 cancel_timeout s;
563 create_timeout s r next;
564 update s
565 in
566 let on_message_fun r s m =
567 let state,res = on_message_fun r s.Expire.state m in
568 s.Expire.state <- state;
569 set_timeout r (expire_fun s) s;
570 s,res
571 in
572 let f = (fun r s signal -> decide_fun r s.Expire.state signal) in
573 handler name
574 (fun s signal -> cancel_timeout s; kill_fun s.Expire.state signal)
575 (fun s -> fst (expire_fun s))
576 on_message_fun
577 f
578
579 end
580 in (module Implem: M)
581
582
583 module Default = (val make Scheduler.default Manager.default : M)
584
585
586 (* let ht = Default.handler_timer *)
587 (* "Test" *)
588 (* (fun () _ -> ()) *)
589 (* (Default.Expire.create (`Timeout 5.)) *)
590 (* (fun _ k () -> k,()) *)
591 (* (fun _ _ _ -> ()) *)
592
593 (* let r = Default.resource_timer ht () () *)
594
595 (* let _ = Default.call r () *)
Something went wrong with that request. Please try again.