Skip to content
This repository
tag: v948
Fetching contributors…

Cannot retrieve contributors at this time

file 147 lines (124 sloc) 4.843 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
(*
Copyright © 2011 MLstate

This file is part of OPA.

OPA is free software: you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License, version 3, as published by
the Free Software Foundation.

OPA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for
more details.

You should have received a copy of the GNU Affero General Public License
along with OPA. If not, see <http://www.gnu.org/licenses/>.
*)
module Ping = BslPingRegister.M
module Client = BslPingRegister.Client
module PingScheduler = BslPingRegister.PingScheduler

(*
A delay for a specific RPC call.
A client may be still connected, and continue to ping, but an error occurred
and this client will never respond to a rpc_call.
This timeout is meant to abort the rpc_call, and to raise an exception on the server side.
It is also used as timeout of distant cellules calls.
Keep consistent with values defined in [pingRegister.ml]
*)
##register rpc_response_delay : int
let rpc_response_delay = 45 * 1000

(** Primitive for make rpc call to the client. *)
module RPC : sig

  (** Call an rpc on the client identified by [cid] and send an
identifier. Register the cps continuation, this continuation
will be called on [return].
When the boolean is true, the call should be synchronous *)
  val call : bool -> string -> string -> string QmlCpsServerLib.continuation -> Client.key
    -> bool

  (** [return id response] Call the continuation corresponding to
the given identifier. This identifier has been sent to the
client... *)
  val return : string -> string -> bool

end = struct

  let random_int () = Random.int 1073741823 (* 2^30 -1 *)

  let generate_without_conflicts exists =
    let rec aux () =
      let id = random_int () in
      if exists id then
        aux ()
      else id
    in aux ()

  (** Store the rpc continuation while waiting the response. *)
  let rpc_ids = Hashtbl.create 512

  let generate_id () =
    generate_without_conflicts (fun id -> Hashtbl.mem rpc_ids id)

  let set_rpc_timeout (cid : Client.key) fun_id id =
    let abort () =
      (* if the id is still in the rpc table, then remove it, and abort *)
      try
        let k = Hashtbl.find rpc_ids id in
        Hashtbl.remove rpc_ids id ;
        let exc = BslNativeLib.OpaExc.OpaRPC.timeout cid fun_id in
        let k_exc = QmlCpsServerLib.handler_cont k in
        QmlCpsServerLib.push_cont k_exc exc
      with
      | Not_found -> ()
    in
    let _async_key = PingScheduler.sleep rpc_response_delay abort in
    ()

  let call sync fun_id args k cid =
    #<If:PING_DEBUG>
      Logger.debug "[RPC] Try to call rpc %s on client %s"
      fun_id (Client.key_to_string cid);
    #<End>;
    let mess, id_opt =
      if sync then
        let id = generate_id () in
        Hashtbl.add rpc_ids id k;
        (* TODOK1 : args is a string but it should be a json! *)
        Client.RPC (string_of_int id, fun_id, JsonTypes.String args), Some id
      else
        Client.AsyncRPC (fun_id, JsonTypes.String args), None in
    if Ping.mem cid then (
      Ping.send mess cid ;
      (match id_opt with
      | None -> ()
      | Some id -> set_rpc_timeout cid fun_id id);
      true
    ) else false

  let return id response =
    try
      let id = int_of_string id in
      try
        #<If:PING_DEBUG>
          Logger.debug "[RPC] Return %d received" id;
        #<End>;
        let k = Hashtbl.find rpc_ids id in
        Hashtbl.remove rpc_ids id;
        QmlCpsServerLib.push_cont k response;
        true
      with Not_found ->
        Logger.error "[RPC] No continuation stored for %d" id;
        false
    with Failure "int_of_string" ->
      Logger.error "[RPC] Identifier %s isn't an int" id;
      false

end

(** Given continuation must be a string continuation. That works
because OPA string and ML string have the same representation,
but it's back end dependent. We should use a ServerLib
function for translate ML string to OPA string... coming
soon? *)
##register call : bool, string, string, continuation('a), 'ctx -> bool
let call sync fun_id args k key =
   RPC.call sync fun_id args (Obj.magic k) (Obj.magic key)

(** This module is very dangerous, don't use it directly. It's a
module for RPC.*)
##module Dispatcher

  let rpctbl : (string, Obj.t) Hashtbl.t = Hashtbl.create 1024

  ##register register : string, 'a -> void
  let register name rpc_fun =
    Hashtbl.add rpctbl name (Obj.repr rpc_fun)

  ##register get : string -> option('a)
  let get name =
    try
      Some (Obj.obj (Hashtbl.find rpctbl name))
    with Not_found -> None

##endmodule
Something went wrong with that request. Please try again.