Skip to content
Newer
Older
100644 573 lines (469 sloc) 16.9 KB
fccc685 Initial open-source release
MLstate authored
1 (*
2 Copyright © 2011 MLstate
3
4 This file is part of OPA.
5
6 OPA is free software: you can redistribute it and/or modify it under the
7 terms of the GNU Affero General Public License, version 3, as published by
8 the Free Software Foundation.
9
10 OPA is distributed in the hope that it will be useful, but WITHOUT ANY
11 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for
13 more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with OPA. If not, see <http://www.gnu.org/licenses/>.
17 *)
18 #<Debugvar:PING_DEBUG>
19
20 module JS = JsonTypes
21 module RD = Requestdef
22 module HS = HttpServer
23 module HSC = HttpServerCore
24 module HST = HttpServerTypes
25
26 type json = JS.json
27
28 let (@>) f k = f k
29
30 let (|>) = InfixOperator.(|>)
31
32 module type CLIENT = sig
33 type key
34 type msg
35 val serialize : msg -> json Cps.t
36 val key_to_string : key -> string
37 end
38
39 module type SCHEDULER = sig
40 type async_key
41 val sleep : int -> (unit -> unit) -> async_key
42 val abort : async_key -> unit
43 end
44
45
46 let ping_debug level fmt =
47 Logger.debug ("[PING][%s] "^^fmt^^"%!") level
48
49 let ping_info level fmt =
50 Logger.info ("[PING][%s] "^^fmt^^"%!") level
51
52 let ping_error level fmt =
53 Logger.error ("[PING][%s] "^^fmt^^"%!") level
54
55 let send_txt_response winfo txt =
56 winfo.HST.cont (HS.make_response_modified_since
57 (Time.now ())
58 winfo.HST.request
59 Requestdef.SC_OK
60 "text/plain, charset=utf-8"
61 (Http_common.Result txt))
62
63 let send_json_response winfo json =
64 let txt = Json_utils.to_string json in
65 #<If>
66 ping_debug "SEND" "Sending json (%s)" txt;
67 #<End>;
68 send_txt_response winfo txt
69
70 let send_error winfo txt =
71 winfo.HST.cont (HS.make_response ~req:winfo.HST.request Requestdef.SC_Unauthorized
72 "text/plain" (Http_common.Result txt))
73
74 let disconnection_state_delay = 120 * 1000
75 let ping_delay_client_msecond_rush = 3 * 1000
76 let ping_delay_client_msecond_normal = 30 * 1000
77
78 let max_package_size = 10
79
80 module Make (S : SCHEDULER) (C : CLIENT) = struct
81
82 (** Connection events*)
83 type event =
84 | Connect
85 | Disconnect
86
87 (** Type of a ping(/pang) loop response. *)
88 type response =
89 | Msgs of json list
90 | Pong
91 | Break
92 | Result of int * string
93
94 (** Event to string (for debugging)*)
95 let event_to_string = function
96 | Connect -> "Connect"
97 | Disconnect -> "Disconnect"
98
99 (** Make a json response which may be interpreted by client ping
100 loop. *)
101 let response_to_json = function
102 | Msgs l -> JS.Record [("type", JS.String "msgs"); ("body", JS.Array l)]
103 | Pong -> JS.Record [("type", JS.String "pong")]
104 | Break -> JS.Record [("type", JS.String "break")]
105 | Result (id, result) -> JS.Record [("type", JS.String "result");
106 ("body", JS.String result);
107 ("id", JS.Int id);]
108
109 let send_response winfo response =
110 send_json_response winfo (response_to_json response)
111
112 (** Manage communications with clients *)
113 module Entry : sig
114
115 type t =
116 | Ajax_call of HST.web_info * int * S.async_key
117 | Messages of json Queue.t
118
119 (** Bind a [cid] and an entry. The [cid]
120 must not be already binded (assertion).*)
121 val add : C.key -> t -> unit
122
123 (** Find the entry corresponding to a cid. If the given [cid] is
124 not binded throw [Not_found].*)
125 val find : C.key -> t
126
127 (** Remove the binding for a [cid]. *)
128 val remove : C.key -> unit
129
130 (** Replace the binding of [cid], if it not binded add this one.*)
131 val replace : C.key -> t -> unit
132
133 (** Pong the client identified by the given parameter if the given
134 number coresponding. *)
135 val pong : ?pong_callback:(C.key -> unit) -> C.key -> int -> unit
136
137 (** Sending the json message on the given client connection
138 identifier. *)
139 val send : C.msg -> C.key -> unit
140
141 (** Remove the client connection from this manager*)
142 val remove : C.key -> unit
143
144 (** Update the status of the client connection and update the
145 [web_info] that allows to send a message to the corresponding
146 client with [send]. *)
147 val ping : ?crush:bool -> ?find_delay:(C.key -> int) -> ?pong_callback:(C.key -> unit) ->
148 C.key -> HST.web_info -> int -> unit
149
150 (** Like [ping] but allows to return to this specific pang with
151 [return]. *)
152 val pang : C.key -> HST.web_info -> int -> unit
153
154 (** Return a result of a pang. *)
155 val return : C.key -> int -> string -> unit
156
157 end = struct
158
159 type t =
160 | Ajax_call of HST.web_info * int * S.async_key
161 | Messages of json Queue.t
162
163 let entry_tbl = Hashtbl.create 1024
164
165 (* Client identifier to all PANG result *)
166 let pang_tbl : (C.key, string IntMap.t) Hashtbl.t =
167 Hashtbl.create 512
168
169 let find key =
170 Hashtbl.find entry_tbl key
171
172 let remove key =
173 Hashtbl.remove entry_tbl key
174
175 let replace key =
176 Hashtbl.replace entry_tbl key
177
178 let add key entry =
179 assert (not (Hashtbl.mem entry_tbl key));
180 Hashtbl.add entry_tbl key entry
181
182 (** Pop [max_package_size] on the queue and send it on winfo. *)
183 let send_with_winfo _key winfo queue =
184 let rec aux i =
185 let t = Queue.pop queue in
186 if Queue.is_empty queue then
187 [t]
188 else
189 t::(aux (i+1))
190 in
191 let to_send = aux 0 in
192 #<If>
193 let l = (List.length to_send) in
194 ping_debug "PING" "Send %d/%d messages to %s"
195 l (Queue.length queue + l) (C.key_to_string _key);
196 #<End>;
197 send_response winfo (Msgs to_send)
198
199 let send mess key =
200 C.serialize mess @> function json ->
201 #<If>
202 let json = (Json_utils.to_string json) in
203 ping_debug "PING" "Try send to %s => (%s)" (C.key_to_string key) json;
204 #<End>;
205 try
206 match find key with
207 | Ajax_call(winfo, _, sk) ->
208 #<If>
209 ping_debug "PING" "Send to %s => (%s)"
210 (C.key_to_string key) (Json_utils.to_string json);
211 #<End>;
212 send_response winfo (Msgs [json]);
213 S.abort sk;
214 remove key
215 | Messages lst ->
216 Queue.push json lst;
217 #<If>
218 let json = (Json_utils.to_string json) in
219 ping_debug "PING" "Store (%d) for %s => (%s)"
220 (Queue.length lst) (C.key_to_string key) json;
221 #<End>;
222 ()
223 with Not_found ->
224 (* TODOK1 : Check if cid is registered on Connection*)
225 let lst = Queue.create () in
226 Queue.push json lst;
227 #<If>
228 ping_debug "PING" "Store (%d) for %s => (%s)"
229 (Queue.length lst) (C.key_to_string key) (Json_utils.to_string json);
230 #<End>;
231 add key (Messages lst)
232
233 let pong ?(pong_callback=fun _ -> ()) key nb =
234 try
235 match find key with
236 | Ajax_call (winfo, nb2, _) when nb = nb2 ->
237 #<If>
238 ping_debug "PING" "Sending a pong to %s" (C.key_to_string key);
239 #<End>;
240 send_response winfo Pong;
241 remove key;
242 pong_callback key
243 | _ ->
244 #<If>
245 ping_debug "PING"
246 "PONG (%d) for %s not sended, request is already consumed"
247 nb (C.key_to_string key);
248 #<End>;
249 ()
250 with Not_found ->
251 #<If>
252 ping_debug "PING"
253 "PONG (%d) for %s not sended, request is already consumed"
254 nb (C.key_to_string key);
255 #<End>;
256 ()
257
258 let ping ?(crush=false) ?(find_delay=fun _ -> ping_delay_client_msecond_normal)
259 ?(pong_callback=fun _ -> ()) key winfo nb =
260 let iping () =
261 let sleep_pong () =
262 #<If>
263 ping_debug "PING""A PONG (%d) is programmed for %s"
264 nb (C.key_to_string key);
265 #<End>;
266 S.sleep
267 (find_delay key)
268 (fun () -> pong ~pong_callback key nb)
269 in
270 try
271 match find key with
272 | Ajax_call (owinfo, n, sk) ->
273 if crush then (
274 S.abort sk;
275 replace key (Ajax_call (winfo, nb, sleep_pong ()));
276 send_response owinfo Break
277 ) else (
278 ping_error "PING"
279 "PING(%d) not registered PING(%d) already present"
280 n nb;
281 send_error winfo "Already present"
282 )
283 | Messages q ->
284 if Queue.is_empty q then (
285 replace key (Ajax_call (winfo, nb, sleep_pong ()))
286 ) else (
287 send_with_winfo key winfo q;
288 if (Queue.is_empty q) then remove key
289 )
290 with Not_found -> add key (Ajax_call (winfo, nb, sleep_pong ()))
291 in
292 if Hashtbl.mem pang_tbl key then
293 let map = Hashtbl.find pang_tbl key in
294 if not (IntMap.is_empty map) then (
295 let n, result = IntMap.min map in
296 Hashtbl.replace pang_tbl key (IntMap.remove n map);
297 send_response winfo (Result (n, result))
298 ) else iping ()
299 else iping ()
300
301 let pang key winfo nb =
302 if not (Hashtbl.mem pang_tbl key) then
303 Hashtbl.add pang_tbl key IntMap.empty;
304 ping ~crush:true key winfo nb
305
306 let return key nb result =
307 #<If>
308 ping_debug "PING" "PANG(%d) result sent to %s => (%s)"
309 nb (C.key_to_string key) result;
310 #<End>;
311 (* For add result to the pang table *)
312 let add_to_pang_tbl () =
313 #<If>
314 ping_debug "PING" "PANG(%d) wait client %s request for send result"
315 nb (C.key_to_string key);
316 #<End>;
317 let map = Hashtbl.find pang_tbl key in
318 let map =
319 if IntMap.mem nb map then (
320 ping_error "PING"
321 "PANG(%d) result is already present" nb;
322 invalid_arg "return"
323 ) else
324 IntMap.add nb result map
325 in
326 Hashtbl.replace pang_tbl key map;
327 in
328 try
329 match find key with
330 | Ajax_call (winfo, _, sk) ->
331 S.abort sk;
332 remove key;
333 send_response winfo (Result (nb, result))
334 | _ -> add_to_pang_tbl ()
335 with Not_found -> add_to_pang_tbl ()
336
337 end
338
339 (** Manage the status of connection with client *)
340 module Connection : sig
341
342 (** Type of key for callback*)
343 type event_key
344
345 (** Register a callback that will be executed when a corresponding
346 event will be launched. The first parameters indicates for
347 which client the callback will be registered, if it's None
348 then the callback will be executed for all client. *)
349 val register_event : C.key option -> event -> (C.key -> unit) -> event_key
350
351 (** Remove callback event registered with the given
352 [event_key]. *)
353 val remove_event : event_key -> unit
354
523cb66 @BourgerieQuentin [fix] ping_register: A client connection is created on first ping, bu…
BourgerieQuentin authored
355 (** Create connexion of a client. *)
356 val create : C.key -> unit
357
358 (** Delete connexion of a client. *)
fccc685 Initial open-source release
MLstate authored
359 val delete : C.key -> unit
360
361 (** Return [true] if the client connection identifier exists on
362 the manager. *)
363 val mem : C.key -> bool
364
365 (** Iter on client connection identifier. *)
366 val iter : (C.key -> unit) -> unit
367
368 (** Update the status of the client connection. If the status is
369 not updating during [disconnection_state_delay] the client
370 connection will deleted. *)
371 val ping : C.key -> HST.web_info -> int -> unit
372
373 (** Like ping but allows to reply with [Entry.return] *)
374 val pang : C.key -> HST.web_info -> int -> unit
375
376 (** Broadcast the json message. *)
377 val broadcast : C.msg -> unit
378
606eb78 @BourgerieQuentin [feature] ping_register: Add a bypass that indicates the number of da…
BourgerieQuentin authored
379 (** Returns the number of connections. *)
380 val size : unit -> int
381
fccc685 Initial open-source release
MLstate authored
382 end = struct
383
384 type event_key = (C.key option * event * int)
385
386 module EventMap = BaseMap.Make(
387 struct
388 type t = event
389 let compare = Pervasives.compare
390 end)
391
392 (* Client identifier to last ping number. *)
393 let state_tbl : (C.key, (int * S.async_key * int (* delay *))) Hashtbl.t =
394 Hashtbl.create 512
395
396 (* Client identifier to event map that contains list of
397 callback *)
398 let event_tbl : (C.key option, int * ((C.key -> unit) IntMap.t) EventMap.t) Hashtbl.t =
399 Hashtbl.create 512
400
401 let register_event cid event callback =
402 let k, emap =
403 try Hashtbl.find event_tbl cid with Not_found -> 0, EventMap.empty in
404 let nk = k+1 in
405 let emap =
406 let imap = Option.default IntMap.empty (EventMap.find_opt event emap) in
407 EventMap.add event (IntMap.add nk callback imap) emap
408 in Hashtbl.replace event_tbl cid (nk, emap);
409 (cid, event, nk)
410
411 let remove_event (cid, event, k) =
412 try
413 let _, emap = Hashtbl.find event_tbl cid in
414 let imap = EventMap.find event emap in
415 let imap = IntMap.remove k imap in
416 let emap = EventMap.add event imap emap in
417 Hashtbl.replace event_tbl cid (k, emap)
418 with Not_found -> ()
419
420 let get_callbacks cid event =
421 let emap =
422 try
423 snd(Hashtbl.find event_tbl cid)
424 with Not_found -> EventMap.empty in
425 try
426 EventMap.find event emap
427 with Not_found -> IntMap.empty
428
429 let raise_event cid event =
430 #<If>
431 ping_debug "PING" "Event (%s) for client %s was raised"
432 (event_to_string event) (C.key_to_string cid);
433 #<End>;
434 (* Execute specialized callbacks *)
435 let imap = get_callbacks (Some cid) event in
436 #<If>
437 ping_debug "PING"
438 "Event (%s) %d specialized callbacks was registered"
439 (event_to_string event) (IntMap.size imap)
440 #<End>;
441 IntMap.iter (fun _ cb -> cb cid) imap;
442
443 (* Execute gloabal callbacks *)
444 let imap = get_callbacks None event in
445 #<If>
446 ping_debug "PING"
447 "Event (%s) %d global callbacks was registered"
448 (event_to_string event) (IntMap.size imap)
449 #<End>;
450 IntMap.iter (fun _ cb -> cb cid) imap
451
452 let remove_events cid =
453 #<If>
454 ping_debug "PING" "Remove callbacks events for client %s"
455 (C.key_to_string cid);
456 #<End>;
457 Hashtbl.remove event_tbl (Some cid)
458
459 let delete key =
460 #<If>
461 ping_debug "PING" "Remove the client %s" (C.key_to_string key);
462 #<End>;
463 raise_event key Disconnect;
464 Entry.remove key;
1f4ad42 @BourgerieQuentin [fix] ping_register: Leak
BourgerieQuentin authored
465 Hashtbl.remove state_tbl key;
fccc685 Initial open-source release
MLstate authored
466 remove_events key
467
468 let update key (nb:int) =
469 let s =
470 S.sleep disconnection_state_delay
471 (fun () ->
472 try
473 let (n, _, _) = Hashtbl.find state_tbl key in
474 if n=nb then delete key
475 with Not_found -> delete key
476 ) in
477 try
478 let (_, old_s, d) = Hashtbl.find state_tbl key in
479 S.abort old_s; (* Abort the previous sleep *)
480 Hashtbl.replace state_tbl key (nb, s, d)
481 with
482 | Not_found ->
483 Hashtbl.add state_tbl key (nb, s, ping_delay_client_msecond_rush)
484
523cb66 @BourgerieQuentin [fix] ping_register: A client connection is created on first ping, bu…
BourgerieQuentin authored
485 let create key = update key 0
486
fccc685 Initial open-source release
MLstate authored
487 let find_delay key =
488 try
489 let (_, _, d) = Hashtbl.find state_tbl key in d
490 with
491 | Not_found ->
492 #<If>
493 ping_debug "PING" "Delay not found for client %s" (C.key_to_string key);
494 #<End>;
495 ping_delay_client_msecond_normal
496
497 let end_of_rush_delay key =
498 try
499 let (n, s, _) = Hashtbl.find state_tbl key in
500 Hashtbl.replace state_tbl key (n, s, ping_delay_client_msecond_normal)
501 with
502 | Not_found ->
503 #<If>
504 ping_debug "PING" "End of rush delay: not found for client %s" (C.key_to_string key);
505 #<End>;
506 ()
507
508
509 let mem cid =
510 #<If>
511 ping_debug "PING" "State table : size (%d)"
512 (Hashtbl.length state_tbl);
513 #<End>;
514 Hashtbl.mem state_tbl cid
515
516 let iter f =
517 Hashtbl.iter (fun cid _ -> f cid) state_tbl
518
519 let ping key winfo nb =
520 #<If>
521 ping_debug "PING"
522 "PING(%d) received from %s" nb (C.key_to_string key);
523 #<End>;
524 update key nb;
525 Entry.ping ~crush:(nb = 1) ~find_delay:find_delay ~pong_callback:end_of_rush_delay key winfo nb
526
527
528 let pang key winfo nb =
529 #<If>
530 ping_debug "PING"
531 "PANG (%d) received from %s" nb (C.key_to_string key);
532 #<End>;
533 update key nb;
534 Entry.pang key winfo nb
535
536 let broadcast mess =
537 #<If>
538 ping_debug "PING" "Broadcasting to clients"
606eb78 @BourgerieQuentin [feature] ping_register: Add a bypass that indicates the number of da…
BourgerieQuentin authored
539 #<End>;
fccc685 Initial open-source release
MLstate authored
540 iter (Entry.send mess)
541
606eb78 @BourgerieQuentin [feature] ping_register: Add a bypass that indicates the number of da…
BourgerieQuentin authored
542 let size () = Hashtbl.length state_tbl
543
fccc685 Initial open-source release
MLstate authored
544 end
545
546 type event_key = Connection.event_key
547
548 (** {6 Exported functions}*)
549
550 let register_event = Connection.register_event
551
552 let remove_event = Connection.remove_event
553
554 let send = Entry.send
555
556 let broadcast = Connection.broadcast
557
558 let ping = Connection.ping
559
560 let pang = Connection.pang
561
562 let return = Entry.return
563
564 let mem = Connection.mem
565
566 let delete = Connection.delete
567
523cb66 @BourgerieQuentin [fix] ping_register: A client connection is created on first ping, bu…
BourgerieQuentin authored
568 let create = Connection.create
569
606eb78 @BourgerieQuentin [feature] ping_register: Add a bypass that indicates the number of da…
BourgerieQuentin authored
570 let size = Connection.size
571
fccc685 Initial open-source release
MLstate authored
572 end
Something went wrong with that request. Please try again.