Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Newer
Older
100644 250 lines (209 sloc) 8.042 kb
fccc685 Initial open-source release
MLstate authored
1 (*
2 Copyright © 2011 MLstate
3
4 This file is part of OPA.
5
6 OPA is free software: you can redistribute it and/or modify it under the
7 terms of the GNU Affero General Public License, version 3, as published by
8 the Free Software Foundation.
9
10 OPA is distributed in the hope that it will be useful, but WITHOUT ANY
11 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for
13 more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with OPA. If not, see <http://www.gnu.org/licenses/>.
17 *)
18 open Cps.Ops
19
20 module Hashtbl = Base.Hashtbl
21
22 #<Debugvar:HLDIR_DEBUG>
23
24 (* ********************************************************)
25 (* DEFINE TYPE FOR HLNET PROTOCOL *************************)
26 type kind = Dir | Loc
27
28 type who = Me | Other of Unix.inet_addr
29
30 type iwho = IMe | IYou | IOther of Unix.inet_addr
31
32 (* Types for [kind=Dir] *)
33 type ('key, 'value) request =
34 | FindOrReplace of ('key * 'value)
35 | Add of ('key * 'value)
36 | Find of 'key
37 | All of 'key
38 | Remove of 'key
39
40 type ('key, 'value) dir_request_channel =
41 (('key, 'value) request, ('value * iwho) list) Hlnet.channel
42
43 type ('key, 'value) dir_response_channel =
44 (('value * iwho) list, ('key, 'value) request) Hlnet.channel
45
46 (* Types for [kind=Loc] *)
47
48 type loc_channel = (Unix.inet_addr, Unix.inet_addr) Hlnet.channel
49
50 (* Type of a directory. *)
51
52 type ('key, 'value) sdirectory = {
53 sched : Scheduler.t;
54 mutable k_myaddr : (Unix.inet_addr -> unit) list;
55 mutable myaddr : Unix.inet_addr option;
56 htbl : ('key, 'value * who) Hashtbl.t;
57 }
58
59 type ('key, 'value) cdirectory = {
60 mutable cmyaddr : Unix.inet_addr option;
61 chan : ('key, 'value) dir_request_channel;
62 }
63
64 type ('key, 'value) t =
65 | Server of ('key, 'value) sdirectory
66 | Client of ('key, 'value) cdirectory
67
68
69
70 (* ********************************************************)
71 (* CONVERSION FUNCTIONS ***********************************)
72 let endpoint_to_inet = function
73 Hlnet.Tcp (x, _) | Hlnet.Ssl (x, _, _) (* | Hlnet.Udp (x, _) *) -> x
74
75 let inet_of_channel c =
76 endpoint_to_inet (Hlnet.remote_of_channel c)
77
78 let who_to_iwho (response_channel:('a, 'b)dir_response_channel) who =
79 let response_addr = inet_of_channel response_channel in
80 match who with
81 | Me -> IMe
82 | Other addr when addr = response_addr -> IYou
83 | Other x -> IOther x
84
85 let iwho_to_who (request_channel:('a, 'b)dir_request_channel) iwho =
86 match iwho with
87 | IMe -> Other (inet_of_channel request_channel)
88 | IYou -> Me
89 | IOther x -> Other x
90
91 let list_to_opt chan = function
92 | [] -> None
93 | [(v,x)] -> Some (v, iwho_to_who chan x)
94 | _ -> failwith("list_to_opt : Unexpected list")
95
96 let opt_to_list chan = function
97 | None -> []
98 | Some (v, x) -> ([v, who_to_iwho chan x] : ('a * iwho) list)
99
100 let list_to_ilist chan list =
101 List.map
102 (function (v,x) -> v, who_to_iwho chan x)
103 list
104
105 let ilist_to_list chan list =
106 List.map
107 (function (v,x) -> v, iwho_to_who chan x)
108 list
109
110 let who_to_string = function
111 | Me -> "Me"
112 | Other x -> Unix.string_of_inet_addr x
113
114
115
116 (* ********************************************************)
117 (* ********************************************************)
118 module ExtendHash = struct
119 let find_or_replace htbl key value =
120 match Hashtbl.find_opt htbl key with
121 | None ->
122 Hashtbl.add htbl key value;
123 None
124 | x -> x
125 end
126
127 let rec make ?(err_cont=fun _ ->
128 #<If> Logger.warning "[DIRECTORY] Make : Uncaught exn"
129 #<Else> ()
130 #<End>)
131 sched endpoint path kind k =
132 let dir_service = Hlnet.make_service_id ~name:(Printf.sprintf "dir/%s" path) ~version:1 in
133 match kind with
134 | `client ->
135 #<If>
136 Logger.debug "[DIRECTORY] Try to create a client directory (%s@ %s)"
137 (Hlnet.print_service_id dir_service) (Hlnet.endpoint_to_string endpoint)
138 #<End>;
139 Hlnet.open_channel sched ~on_disconnect:(fun () -> err_cont Exit; `retry (Time.seconds 5))
140 endpoint (Hlnet.Aux.magic_spec dir_service)
141 @> fun chan -> Client {chan = chan; cmyaddr = None} |> k
142 | `server ->
143 #<If>
144 Logger.debug "[DIRECTORY] Try to create a server directory (%s)"
145 (Hlnet.endpoint_to_string endpoint)
146 #<End>;
147 let server = {
148 sched = sched;
149 k_myaddr = [];
150 htbl = Hashtbl.create 1024;
151 myaddr = None;
152 } in
153 (try
154 let safe = true in
155 Hlnet.accept ~safe sched endpoint (Hlnet.Aux.magic_spec dir_service)
156 (fun (hlchan : ('key, 'value) dir_response_channel) ->
157 Hlnet.setup_respond hlchan
158 (fun request k ->
159 match request with
160 | FindOrReplace (key, value) ->
161 let addr = inet_of_channel hlchan in
162 let res =
163 ExtendHash.find_or_replace server.htbl key
164 (value, Other addr) in
165 opt_to_list hlchan res |> k
166 | Add (key, value) ->
167 let addr = inet_of_channel hlchan in
168 Hashtbl.add server.htbl key (value, Other addr)
169 | Find key ->
170 let res = Hashtbl.find_opt server.htbl key in
171 opt_to_list hlchan res |> k
172 | All key ->
173 list_to_ilist hlchan (Hashtbl.find_all server.htbl key) |> k
174 | Remove key ->
175 Hashtbl.remove server.htbl key
176 )
177 );
178 Hlnet.accept ~safe sched endpoint
179 (Hlnet.Aux.magic_spec (Hlnet.make_service_id ~name:"dir/localize" ~version:1))
180 (fun (hlchan : loc_channel) ->
181 Hlnet.setup_respond hlchan
182 (fun myaddr k ->
183 server.myaddr <- Some myaddr;
184 List.iter
185 (fun k -> Scheduler.push server.sched (fun () -> k myaddr))
186 server.k_myaddr;
187 server.k_myaddr <- [];
188 k (inet_of_channel hlchan))
189 );
190 Server server |> k
191 with e -> err_cont e)
192
193 let add t key value =
194 match t with
195 | Client r -> Hlnet.send r.chan (Add (key, value))
196 | Server r -> Hashtbl.add r.htbl key (value, Me)
197
198 let find_or_replace t key value k =
199 match t with
200 | Client r ->
201 Hlnet.sendreceive r.chan (FindOrReplace (key, value)) @>
202 (function res -> list_to_opt r.chan res |> k)
203 | Server r ->
204 (ExtendHash.find_or_replace r.htbl key (value, Me)) |> k
205
206 let find_opt t key k =
207 match t with
208 | Client r ->
209 Hlnet.sendreceive r.chan (Find key)
210 (function res -> list_to_opt r.chan res |> k)
211 | Server r ->
212 Hashtbl.find_opt r.htbl key |> k
213
214 let find_all t key k =
215 match t with
216 | Client r ->
217 Hlnet.sendreceive r.chan (All key)
218 (function l -> k (ilist_to_list r.chan l))
219 | Server r ->
220 Hashtbl.find_all r.htbl key |> k
221
222 let remove t key =
223 match t with
224 | Client r -> Hlnet.send r.chan (Remove key)
225 | Server r -> Hashtbl.remove r.htbl key
226
227 let my_public_addr_opt t k =
228 match t with
229 | Client r ->
230 (match r.cmyaddr with
231 | Some _ as a -> k a
232 | None ->
233 let endpoint = Hlnet.remote_of_channel r.chan in
234 let addr = endpoint_to_inet endpoint in
235 let sched = Hlnet.scheduler_of_channel r.chan in
236 Hlnet.open_channel sched endpoint
237 (Hlnet.Aux.magic_spec (Hlnet.make_service_id ~name:"dir/localize" ~version:1))
238 @> (function (chan : loc_channel) ->
239 Hlnet.sendreceive chan addr @> function a ->
240 r.cmyaddr <- Some a; r.cmyaddr |> k))
241 | Server r -> r.myaddr |> k
242
243 let my_public_addr t k =
244 match t with
245 | Client _ -> my_public_addr_opt t @> (fun x -> Option.get x |> k)
246 | Server r ->
247 (match r.myaddr with
248 | Some x -> x |> k
249 | None -> r.k_myaddr <- k::r.k_myaddr)
Something went wrong with that request. Please try again.