/
om_reactor.ml
79 lines (64 loc) · 2.4 KB
/
om_reactor.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
module Timers = Map.Make(struct type t = float let compare = compare end);;
class reactor () =
object(self)
val mutable running = false
val mutable should_stop = false
val mutable conns = Hashtbl.create 100
val mutable conns_to_delete = []
val mutable sighandler = (fun _ -> ())
val mutable timers = Timers.empty
initializer
sighandler <- (fun _ -> should_stop <- true);
Sys.set_signal Sys.sigint (Sys.Signal_handle sighandler);
method run ((on_start : reactor -> unit), (on_stop : reactor -> unit)) =
should_stop <- false;
running <- true;
on_start(self :> reactor);
while should_stop == false do
self#tick();
done;
running <- false;
Hashtbl.iter (fun fd conn -> conn#close()) conns;
on_stop(self :> reactor);
();
method tick () =
let current_time = Unix.gettimeofday() in
(* This is naive and needs to be fixed so we don't iterate over the entire bunch every tick *)
let timers_to_run = Timers.fold
(fun time fn l -> if time <= current_time then l @ [time] else l)
timers
[]
in
List.iter
(fun time ->
let fn = Timers.find time timers in
timers <- Timers.remove time timers;
fn();
)
timers_to_run;
let read_fds = Hashtbl.fold
(fun fd conn l -> if conn#select_for_read() then fd :: l else l)
conns
[]
in
let write_fds = Hashtbl.fold
(fun fd conn l -> if conn#select_for_write() then fd :: l else l)
conns
[]
in
let readable, writeable, _ = Om_misc.restart (Unix.select read_fds write_fds []) 0.01 in
List.iter (fun fd -> let conn = Hashtbl.find conns fd in conn#handle_readable()) readable;
List.iter (fun fd -> let conn = Hashtbl.find conns fd in conn#handle_writeable()) writeable;
(* remove any connections that closed themselves during the tick *)
List.iter (fun fd -> Hashtbl.remove conns fd) conns_to_delete;
conns_to_delete <- [];
();
method add (conn : Om_eventable.eventable) =
Hashtbl.add conns (conn#get_fd()) conn;
method remove (conn : Om_eventable.eventable) =
conns_to_delete <- (conn#get_fd()) :: conns_to_delete;
method stop () =
should_stop <- true;
method add_timer (delay, fn) =
timers <- Timers.add (Unix.gettimeofday() +. delay) fn timers;
end;;