forked from lefessan/ocaml-actors
/
actorssg.ml
124 lines (99 loc) · 3 KB
/
actorssg.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
type arg =
| Actor of actor
| C of char
| S of string
| I of int
| F of float
| L of arg list
| A of arg array
| D of (string * arg) list
| LC of char list
| LS of string list
| LI of int list
| LF of float list
| AC of char array
| AS of string array
| AI of int array
| AF of float array
and message = string * arg list
and local_actor = {
mailbox : message Queue.t;
mutex : Mutex.t;
(* handler : (message -> unit);*)
}
and remote_actor = {
actor_host : string; (* uniq identifier of the machine on which it was created (20-byte string generated randomly at startup) *)
remote_ip : string;
remote_port : int;
}
and location =
| Local of local_actor
| Remote of remote_actor
and actor = {
actor_id : int; (* local number of the actor when it was created *)
actor_location : location;
}
let mutables_copy (s, al) =
let rec mutables_copy_aux_d (str, argt) =
(str, mutables_copy_aux argt)
and mutables_copy_aux argt =
match argt with
| A a -> A (Array.copy a);
| AC ac -> AC (Array.copy ac);
| AS ast -> AS (Array.copy ast);
| AI ai -> AI (Array.copy ai);
| AF af -> AF (Array.copy af);
| L [] -> L [];
| L l -> L (List.map mutables_copy_aux l);
| D [] -> D [];
| D l -> D (List.map mutables_copy_aux_d l);
| _ -> argt in
(s, List.map mutables_copy_aux al);;
let actors = Hashtbl.create 1313 (* Should probably be a weak hashtbl *)
(* let machines = Hashtbl.create 97 *)
let actors_id = ref 0
let a_mutex = Mutex.create()
let receive_scheduler = Queue.create()
let rs_mutex = Mutex.create()
let send a m =
match a.actor_location with
| Local lac -> begin Mutex.lock lac.mutex; Queue.add (mutables_copy m) lac.mailbox; Mutex.unlock lac.mutex end
| Remote o -> ();;
let schedule_receive a f =
Mutex.lock rs_mutex;
Queue.add (a, f) receive_scheduler;
Mutex.unlock rs_mutex;;
exception React of (message -> unit);;
exception NotHandled;;
let execute a f =
try f()
with React g -> schedule_receive a g;;
let create f =
incr actors_id;
let l_act = {mailbox = Queue.create() ; mutex = Mutex.create()} in
let new_actor = {actor_id = !actors_id; actor_location = Local l_act} in
Hashtbl.add actors new_actor new_actor.actor_id;
execute new_actor f;
new_actor;;
let rec reacting a g =
match a.actor_location with
| Local lac -> begin Mutex.lock lac.mutex;
let rec reacting_aux() =
try
let m = Queue.pop lac.mailbox in
try (g m)
with
| React f -> begin Mutex.unlock lac.mutex;
reacting a f end
| NotHandled -> reacting_aux();
with Queue.Empty -> ();
in
reacting_aux(); end
| Remote rac -> failwith "You cannot run a remote actor";;
let recieve_handler =
try
let (a, f) = Queue.pop receive_scheduler in reacting a f;
with Queue.Empty -> ();;
(*val create : (actor -> unit) -> actor
val receive : actor -> (message -> unit) -> unit
val send : actor -> msg -> unit*)