Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

[enhance] rpc: add timeout feature in calls from server to clients

 * rpc call from server to client are now limited with a timeout,
   raising an exception on the server side at the end of the timeout.

 * session: add a function to know if a session needs distant calls or not

 * cellules calls from server to client are also limited with the same
   mechanism if the session needs distant calls

 * the exception is caught by sessions and cellules handlers on the server
   side, making so that the sessions are not blocked anymore by client
   errors

 * add some javascript exception catcher arround execution of
   message handlers to avoid to kill a client in case of a non fatal error.
   The current action (e.g. a click on a specific button) is aborted,
   but the state of the client remain consistent :
  • Loading branch information...
commit 45d0b17b18c14f34fc2b48014a9b6c8a651d23d5 1 parent e7282e7
Mathieu Barbin authored
View
61 opabsl/jsbsl/bslSession.js
@@ -186,7 +186,10 @@ var LowLevelPingLoop = {};
sess_debug("Call a killed session : "+this.lchan_id);
#<End>
if (herror != undefined) herror();
- throw new Killed();
+ #<Ifstatic:PING_DEBUG>
+ sess_debug("[LocalChannel.send] Killed :"+er);
+ #<End>
+ return;
}
// Get the good context (owner if setted, sender else)
var ctx;
@@ -195,15 +198,23 @@ var LowLevelPingLoop = {};
// Perform action
#<Ifstatic:PING_DEBUG> ping_debug("Start handler"); #<End>
if (hsuccess != undefined) hsuccess();
- var new_st = this.action(this.state, msg, ctx);
- #<Ifstatic:PING_DEBUG> ping_debug("End handler"); #<End>
- if ('none' in new_st){
- // Stop session
- this.state = null;
- this.kill();
- } else {
- // Update state
- this.state = new_st.some;
+ var new_st = null ;
+ try {
+ new_st = this.action(this.state, msg, ctx);
+ #<Ifstatic:PING_DEBUG> ping_debug("End handler"); #<End>
+ if ('none' in new_st){
+ // Stop session
+ this.state = null;
+ this.kill();
+ return;
+ } else {
+ // Update state
+ this.state = new_st.some;
+ return;
+ }
+ } catch (er) {
+ %%bslsyslog_error%%("[LocalChannel.send] Catch :", er);
+ return;
}
},
#<End>
@@ -259,20 +270,7 @@ var LowLevelPingLoop = {};
#<Else>
var lchan = this;
function aux(){
- try {
- lchan.send_no_cps_aux(msg, ctx, herror, hsuccess);
- } catch (er) {
- if(er instanceof Killed) {
- #<Ifstatic:PING_DEBUG>
- sess_debug("[LocalChannel.send] Killed :"+er);
- #<End>
- return;
- }
- #<Ifstatic:PING_DEBUG>
- sess_debug("[LocalChannel.send] Catch :"+er);
- #<End>
- throw er;
- }
+ lchan.send_no_cps_aux(msg, ctx, herror, hsuccess);
}
if(this.concurrent){
aux();
@@ -309,8 +307,7 @@ var LowLevelPingLoop = {};
this.action = f;
this.send_no_cps_aux(msg, js_none);
} catch (er) {
- if (er instanceof Killed) error("Call to a killed cell");
- else throw er
+ %%bslsyslog_error%%("[LocalChannel.call] Cell :", er);
}
if(res === null) error("Call failed, result was [null]");
return res;
@@ -563,6 +560,7 @@ var LowLevelPingLoop = {};
function RPC_call(id, name, argument) {
var funct = RPC_comet_table[name];
if ( funct == null ) throw new Error("Rpc client "+name+" doesn't exists");
+ // hook for try catch there, and return a special case, maybe e.g. rpc_return_exc
var data = funct(argument);
if ('none' in data) {
if(window.console && window.console.error) window.console.error("RPC comet call ", id, " failed, no data in ", argument);
@@ -592,7 +590,7 @@ var LowLevelPingLoop = {};
herror = function(){unserialize_uu(srvmsg.herror)()};
if (hsuccess != undefined)
hsuccess = function(){unserialize_uu(srvmsg.hsuccess)()};
- if (lchan != null) {
+ if (lchan != null){
var unser_msg = lchan.unserialize(message);
lchan.send(null, unser_msg, ctx, herror, hsuccess);
} else if (herror != undefined){
@@ -996,6 +994,15 @@ var LowLevelPingLoop = {};
return ('addr' in chan);
}
+##register is_local : Session.private.native('msg, 'ctx) -> bool
+##args(_)
+{
+ // This is incorrect, but unused yet.
+ // If we implement a feature about timeout for cell calls
+ // on the client side, this will probably need an update.
+ return false;
+}
+
##register owner : Session.private.native('msg, 'ctx) -> option(Session.entity)
##args(chan)
{
View
60 opabsl/mlbsl/bslNativeLib.ml
@@ -178,3 +178,63 @@ let opa_tuple_4 (a, b, c, d) =
{1 Continuations}
*)
##extern-type continuation('a) = 'a QmlCpsServerLib.continuation
+
+(**
+ {1 Standard Exceptions}
+*)
+
+module OpaExc =
+struct
+ (**
+ Keep synchronized with stdlib.core/exception.opa
+ *)
+
+ let f_fail = ServerLib.static_field_of_name "fail"
+ let f_position = ServerLib.static_field_of_name "position"
+ let fail ~message ~position =
+ let r = ServerLib.empty_record_constructor in
+ let r = ServerLib.add_field r f_fail (ServerLib.wrap_string message) in
+ let r = ServerLib.add_field r f_position (ServerLib.wrap_string position) in
+ ServerLib.make_record r
+
+ let f_transaction_failure = ServerLib.static_field_of_name "Transaction_failure"
+ let transaction_failure = ServerLib.make_simple_record f_transaction_failure
+
+ let f_ocaml_exc = ServerLib.static_field_of_name "ocaml_exc"
+ let f_bslkey = ServerLib.static_field_of_name "bslkey"
+ let ocaml_exc bslkey exc =
+ let message = Printexc.to_string exc in
+ let r = ServerLib.empty_record_constructor in
+ let r = ServerLib.add_field r f_ocaml_exc (ServerLib.wrap_string message) in
+ let r = ServerLib.add_field r f_bslkey (ServerLib.wrap_string bslkey) in
+ ServerLib.make_record r
+
+
+ (**
+ Keep synchronized with stdlib.core.rpc.core/oparpc.opa
+ *)
+ let f_OpaRPC_Server = ServerLib.static_field_of_name "OpaRPC_Server"
+ let f_timeout = ServerLib.static_field_of_name "timeout"
+ let f_client = ServerLib.static_field_of_name "client"
+ let f_fun_id = ServerLib.static_field_of_name "fun_id"
+
+ module OpaRPC =
+ struct
+ (*
+ client : Client.key from BslRPC
+ fun_id : the name of the distant function
+ *)
+ let timeout client fun_id =
+ let timeout = ServerLib.empty_record_constructor in
+ let timeout = ServerLib.add_field timeout f_client client in
+ let timeout =
+ ServerLib.add_field timeout f_fun_id (ServerLib.wrap_string fun_id) in
+ let timeout = ServerLib.make_record timeout in
+ let rpc = ServerLib.empty_record_constructor in
+ let rpc = ServerLib.add_field rpc f_timeout timeout in
+ let rpc = ServerLib.make_record rpc in
+ let exc = ServerLib.empty_record_constructor in
+ let exc = ServerLib.add_field exc f_OpaRPC_Server rpc in
+ ServerLib.make_record exc
+ end
+end
View
27 opabsl/mlbsl/bslPervasives.ml
@@ -163,32 +163,7 @@ let jlog s =
##register [opacapi] id \ `Obj.magic` : 'a -> 'b
##endmodule
-module OpaExc =
-struct
- (**
- Keep synchronized with stdlib.core/exception.opa
- *)
-
- let f_fail = ServerLib.static_field_of_name "fail"
- let f_position = ServerLib.static_field_of_name "position"
- let fail ~message ~position =
- let r = ServerLib.empty_record_constructor in
- let r = ServerLib.add_field r f_fail (ServerLib.wrap_string message) in
- let r = ServerLib.add_field r f_position (ServerLib.wrap_string position) in
- ServerLib.make_record r
-
- let f_transaction_failure = ServerLib.static_field_of_name "Transaction_failure"
- let transaction_failure = ServerLib.make_simple_record f_transaction_failure
-
- let f_ocaml_exc = ServerLib.static_field_of_name "ocaml_exc"
- let f_bslkey = ServerLib.static_field_of_name "bslkey"
- let ocaml_exc bslkey exc =
- let message = Printexc.to_string exc in
- let r = ServerLib.empty_record_constructor in
- let r = ServerLib.add_field r f_ocaml_exc (ServerLib.wrap_string message) in
- let r = ServerLib.add_field r f_bslkey (ServerLib.wrap_string bslkey) in
- ServerLib.make_record r
-end
+module OpaExc = BslNativeLib.OpaExc
(**
Bypass used in the compilation of the directive {[\@fail]}.
View
31 opabsl/mlbsl/bslRPC.ml
@@ -17,6 +17,18 @@
*)
module Ping = BslPingRegister.M
module Client = BslPingRegister.Client
+module PingScheduler = BslPingRegister.PingScheduler
+
+(*
+ A delay for a specific RPC call.
+ A client may be still connected, and continue to ping, but an error occurred
+ and this client will never respond to a rpc_call.
+ This timeout is meant to abort the rpc_call, and to raise an exception on the server side.
+ It is also used as timeout of distant cellules calls.
+ Keep consistent with values defined in [pingRegister.ml]
+*)
+##register rpc_response_delay : int
+let rpc_response_delay = 45 * 1000
(** Primitive for make rpc call to the client. *)
module RPC : sig
@@ -33,6 +45,7 @@ module RPC : sig
val return : string -> string -> bool
end = struct
+
let random_int () = Random.int 1073741823 (* 2^30 -1 *)
let generate_without_conflicts exists =
@@ -49,6 +62,21 @@ end = struct
let generate_id () =
generate_without_conflicts (fun id -> Hashtbl.mem rpc_ids id)
+ let set_rpc_timeout (cid : Client.key) fun_id id =
+ let abort () =
+ (* if the id is still in the rpc table, then remove it, and abort *)
+ try
+ let k = Hashtbl.find rpc_ids id in
+ Hashtbl.remove rpc_ids id ;
+ let exc = BslNativeLib.OpaExc.OpaRPC.timeout cid fun_id in
+ let k_exc = QmlCpsServerLib.handler_cont k in
+ QmlCpsServerLib.push_cont k_exc exc
+ with
+ | Not_found -> ()
+ in
+ let _async_key = PingScheduler.sleep rpc_response_delay abort in
+ ()
+
let call fun_id args k cid =
let id = generate_id () in
#<If:PING_DEBUG>
@@ -59,7 +87,8 @@ end = struct
(* TODOK1 : args is a string but it should be a json! *)
let mess = Client.RPC (string_of_int id, fun_id, JsonTypes.String args) in
if Ping.mem cid then (
- Ping.send mess cid;
+ Ping.send mess cid ;
+ set_rpc_timeout cid fun_id id ;
true
) else false
View
4 opabsl/mlbsl/bslSession.ml
@@ -1203,6 +1203,10 @@ let is_remote chan =
| Some (OpaNetwork.RemoteClient _) -> true
| _ -> false
+##register is_local : Session.private.native('msg, 'ctx) -> bool
+let is_local chan =
+ try Option.is_none (Channel.owner chan) with Channel.Unregistered -> false
+
##register get_endpoint : Session.private.native('msg, 'ctx) -> option(endpoint)
let get_endpoint chan =
match Channel.identify chan with
View
13 opacapi/opacapi.ml
@@ -282,6 +282,12 @@ struct
let xhtml_href = !! "xhtml_href"
let xml = !! "xml"
+ module Cell =
+ struct
+ let (!!) s = !! ("Cell." ^ s)
+ let timeout = !! "timeout"
+ end
+
module Css =
struct
let (!!) s= !! ("Css." ^ s)
@@ -324,6 +330,12 @@ struct
end
end
+ module Exception =
+ struct
+ let (!!) s = !! ("Exception." ^ s)
+ let common = !! "common"
+ end
+
module FunAction =
struct
let (!!) s = ("FunAction." ^ s)
@@ -344,6 +356,7 @@ struct
struct
let (!!) s = !! ("OpaRPC." ^s)
let request = !! "request"
+ let timeout = !! "timeout"
end
module OpaSerialize =
View
12 stdlib/core/exception.opa
@@ -57,6 +57,10 @@
* {1 What if I need more?}
**/
+/**
+ * <!> Built in [BslNativeLib.ml]
+**/
+@opacapi
type Exception.common =
{ fail : string ; position : string }
/**
@@ -75,3 +79,11 @@ type Exception.common =
* using [Printexc.to_string], and the [bslkey] of the bypass is
* stored in the Opa exception.
**/
+
+// This is a hack for preserving the type of exceptions
+@private @server_private _dead_code() =
+ match 0 with
+ | 1 -> @throw( @opensums( { fail = "" ; position = "" } : Exception.common ) )
+ | 2 -> @throw( @opensums( { Transaction_failure } : Exception.common ) )
+ | 3 -> @throw( @opensums( { ocaml_exc = "" ; bslkey = "" } : Exception.common ) )
+ | _ -> void
View
68 stdlib/core/rpc/core/cell.opa
@@ -188,6 +188,28 @@ Cell_private = {{
*/
/**
+ * Generating fresh identifiers for call remote cells
+ **/
+ @private @server fresh_call_id = Mutable.make(0)
+ @private @server call_tbl = Mutable.make(IntSet.empty)
+ @private @server gen_call_id() =
+ id = fresh_call_id.get()
+ do fresh_call_id.set(succ(id))
+ set = IntSet.add(id, call_tbl.get())
+ do call_tbl.set(set)
+ id
+ @private @server unset_gen_call(id) =
+ set = call_tbl.get()
+ if IntSet.mem(id, set)
+ then
+ do call_tbl.set(IntSet.remove(id, set))
+ true
+ else
+ false
+
+ @private @publish rpc_response_delay = %%BslRPC.rpc_response_delay%%
+
+ /**
* Send a message to a cell
*
* @param cell Cell to which message is sent
@@ -217,7 +239,7 @@ Cell_private = {{
#<Ifstatic:OPA_CPS_CLIENT>
#<Else>
@sliced_expr({
- client =
+ client = (
serialize(x) =
x = serialize(x)
x = {List = {hd={String = "PleaseCallForMe"};tl={hd=x;tl=[]}}}
@@ -229,15 +251,18 @@ Cell_private = {{
unserialize_result(lljson:RPC.Json.private.native):'result =
json = Json.from_ll_json(lljson) ? error("CELL : Convert RPC.Json.private.native to json failed")
unserialize_result(json)
- on_message =
+ on_message =
match gm(cell) with
|{some = {cell = ~{on_message ...}}} -> on_message
|{none} ->
_, _ -> error("No handler on this cells (That case should never happens)")
bsl_llcall(cell, message, serialize, unserialize_result, on_message)
+ )
+
+ ;
- server =
+ server = (
#<End>
/* Encapsulated session */
sess = cell
@@ -258,14 +283,47 @@ Cell_private = {{
@with_thread_context(
ThreadContext.get({from = k}),
Session_private.llsend(sess, {~serialize message=(k, message)}))
- @callcc(callbis)
+
+ if not(Session.is_local(sess))
+ then (
+ id = gen_call_id()
+ @server timeout() =
+ if unset_gen_call(id)
+ then
+ // if needed, we can retrieve the context corresponding to the client
+ // and store it in the exception
+ @throw( { Cell = { timeout } } )
+ do sleep(rpc_response_delay, timeout)
+ r = @callcc(callbis)
+ if unset_gen_call(id) then r
+ else error("This cell call was aborted, result ignored")
+ )
+ else
+ @callcc(callbis)
+
#<Ifstatic:OPA_CPS_CLIENT>
#<Else>
- })
+ )
+ })
#<End>
}}
+/**
+ * {1 Special Cell exception}
+ *
+**/
+@opacapi
+type Cell.timeout = {
+ Cell : {
+ timeout : { }
+ }
+}
+
+// hack
+@server_private @private _please_type_me_this_cell_exception() =
+ exc = { Cell = { timeout } }
+ @throw( @opensums(exc) )
/**
* {1 High-level module}
View
22 stdlib/core/rpc/core/oparpc.opa
@@ -250,6 +250,27 @@ type OpaRPC.interface = {{
}}
+/**
+ * {1 Special RPC exception}
+ *
+ * <!> Built in [BslNativeLib.ml]
+**/
+@opacapi
+type OpaRPC.timeout = {
+ OpaRPC_Server : {
+ timeout : {
+ client : ThreadContext.client ;
+ fun_id : string ;
+ }
+ }
+}
+
+// hack
+@server_private @private _please_type_me_this_rpc_exception(client : ThreadContext.client) =
+ timeout = { ~client ; fun_id = ""}
+ exc = { OpaRPC_Server = { ~timeout } }
+ @throw( @opensums(exc) )
+
/**
* {1 Specific server module for RPC}
@@ -257,7 +278,6 @@ type OpaRPC.interface = {{
@server OpaRPC_Server =
-
TCMap =
tc_order = Order.make(
x, y ->
View
9 stdlib/core/rpc/core/session.opa
@@ -599,13 +599,20 @@ Session = {{
bsl(chan)
/**
- * Returns true if the channel is not owned by this server
+ * Returns true if the channel is not owned by this server.
+ * Beware, this returns false in case of non RemoteClient.
*/
is_remote(chan : channel) =
bsl = %%BslSession.is_remote%%
bsl(chan)
/**
+ * Returns true only if the channel is owned by this server.
+ * Returns false in case of Client.
+ **/
+ is_local = %%BslSession.is_local%% : channel -> bool
+
+ /**
* [on_remove channel f] Execute f when the [channel] is removed
* of this server.
*/
Please sign in to comment.
Something went wrong with that request. Please try again.