Permalink
Browse files

[feature] rpc runtime and compiler: implementing asynchronous server-…

…>client calls
  • Loading branch information...
Valentin Gatien-Baron
Valentin Gatien-Baron committed Jul 7, 2011
1 parent 8edc001 commit 24f427dd37043b3d29c5c193785cd27d6d59466a
View
@@ -269,7 +269,9 @@ module OpaRPC = struct
| `sync -> Opacapi.OpaRPC.client_send_to_server
| `async -> Opacapi.OpaRPC.client_async_send_to_server
else
- Opacapi.OpaRPC.server_send_to_client) ~side
+ match sync with
+ | `sync -> Opacapi.OpaRPC.server_send_to_client
+ | `async -> Opacapi.OpaRPC.server_async_send_to_client) ~side
let fake_stub = TyIdent.get Opacapi.OpaRPC.fake_stub
let error_stub = TyIdent.get Opacapi.OpaRPC.error_stub
@@ -524,7 +526,6 @@ let check_and_get ?(msg="") ~annotmap ~gamma:_ explicit_map expr =
This generate skeleton maybe used both on server and client.
- If the call is synchronous, generate a skeleton such as (in OPA style):
fun str ->
match OpaRPC.unserialize str with
| {some = request} ->
@@ -543,24 +544,6 @@ let check_and_get ?(msg="") ~annotmap ~gamma:_ explicit_map expr =
| _ -> { none = () }
: string -> string option
- If the call is asynchronous, generate a skeleton such as (in OPA style):
- fun str ->
- match OpaRPC.unserialize str with
- | {some = request} ->
- (match OpaRPC.extract_type request with
- | [i1; i2; ...] ->
- (match OpaTsc.implementation [i1; i2; ...] tsc with
- | { TyArrow_params = [t1; t2; t3; ...]; TyArrow_res = tres } ->
- (match OpaRPC.extract_value [t1; t2; t3; ...] request with
- | { some = [a1; a2; a3; ...] } ->
- do Scheduler.push (-> sliced_fun i1 i2 ... a1 a2 a3 ...)
- { some = "Async" }
- | _ -> { none = () })
- | _ -> { none = () })
- | _ -> { none = () })
- | _ -> { none = () }
- : string -> string option
-
Generated skeleton is good only if explicit instantiation has already taken place.
Execution of the generated skeleton:
@@ -576,7 +559,7 @@ let check_and_get ?(msg="") ~annotmap ~gamma:_ explicit_map expr =
@param sync True if the call is synchronous, false if it is asynchronous i.e. result doesn't matter
@param expr The body of the function
*)
-let generate_skeleton explicit_map ~annotmap ~stdlib_gamma ~gamma ~side ~sync expr =
+let generate_skeleton explicit_map ~annotmap ~stdlib_gamma ~gamma ~side expr =
(* Check and get *)
let expr, ident, _ty, _tsc, nb_tyvar, nb_rowvar, nb_colvar, oty, otsc, number_of_lambdas =
check_and_get ~msg:"generate_skeleton" ~annotmap ~gamma explicit_map expr
@@ -657,24 +640,12 @@ let generate_skeleton explicit_map ~annotmap ~stdlib_gamma ~gamma ~side ~sync ex
(* NOT SURE: what if the expression already has no arguments? *)
(*try*)
if is_a_function then
- let ((annotmap, typed_fun_call_expr) as typed_fun_call) =
- let args_ty = list_expr_ty @ list_expr_row @ list_expr_col in
- match number_of_lambdas with
- | `one_lambda -> full_apply gamma annotmap expr args_ty list_expr_val
- | `two_lambdas ->
- let annotmap, apply1 = QmlAstCons.TypedExpr.apply gamma annotmap expr args_ty in
- QmlAstCons.TypedExpr.apply gamma annotmap apply1 list_expr_val in
- match sync, side with
- | `sync, _
- | _, `client -> typed_fun_call
- | `async, `server ->
- let (annotmap, typed_async_fun_call) = TypedExpr.lambda annotmap [] typed_fun_call_expr in
- let (annotmap, typed_oparpc_executor) = TyIdent.get Opacapi.OpaRPC.server_async_execute_without_reply ~side annotmap stdlib_gamma
- in
- TypedExpr.apply gamma annotmap typed_oparpc_executor [typed_async_fun_call]
- (*
- This is <<OpaRPC_Server.async_no_reply(-> $typed_fun_call$)>>
- *)
+ let args_ty = list_expr_ty @ list_expr_row @ list_expr_col in
+ match number_of_lambdas with
+ | `one_lambda -> full_apply gamma annotmap expr args_ty list_expr_val
+ | `two_lambdas ->
+ let annotmap, apply1 = QmlAstCons.TypedExpr.apply gamma annotmap expr args_ty in
+ QmlAstCons.TypedExpr.apply gamma annotmap apply1 list_expr_val
else (
assert (list_expr_val = []);
TypedExpr.may_apply gamma annotmap expr (list_expr_ty @ list_expr_row @ list_expr_col)
@@ -1076,7 +1047,7 @@ let resolving_slicer_directive ~annotmap ~stdlib_gamma ~gamma ~side code
assert(side = directive_to_side d);
let sync = publish_directive_to_sync d in
let annotmap, gamma, nrvals =
- publish_resolver ~annotmap ~stdlib_gamma ~gamma ~side ~sync expr in
+ publish_resolver ~annotmap ~stdlib_gamma ~gamma ~side expr in
let annotmap, gamma, other_nrvals =
if ObjectFiles.Arg.is_fully_separated () then
generate_stub_from_publish ~annotmap ~stdlib_gamma ~gamma ~side ~sync expr
@@ -1210,11 +1181,11 @@ let generate_stub_from_publish,
(** Publish resolver. This resolver can be work on both side. This
resolver generate a skeleton and an expression for register this
skeleton. *)
-let make_publish_resolver genskel explicit_map renamingmap ~annotmap ~stdlib_gamma ~gamma ~side ~sync expr =
+let make_publish_resolver genskel explicit_map renamingmap ~annotmap ~stdlib_gamma ~gamma ~side expr =
try
_skel ();
let annotmap, gamma, (iskeleton, skeleton) =
- genskel explicit_map ~annotmap ~stdlib_gamma ~gamma ~side ~sync expr in
+ genskel explicit_map ~annotmap ~stdlib_gamma ~gamma ~side expr in
let annotmap, gamma, cpl_register =
register_skeleton ~renamingmap ~annotmap ~stdlib_gamma ~gamma ~side expr iskeleton in
(match iskeleton with
View
@@ -61,8 +61,7 @@ let roots_for_s3
Opacapi.OpaRPC.client_async_send_to_server;
Opacapi.OpaRPC.server_send_to_client;
- Opacapi.OpaRPC.server_send_to_client;
- Opacapi.OpaRPC.server_async_execute_without_reply;
+ Opacapi.OpaRPC.server_async_send_to_client;
Opacapi.OpaRPC.fake_stub;
Opacapi.OpaRPC.error_stub;
@@ -557,14 +557,15 @@ var LowLevelPingLoop = {};
/** Contains skeletons. */
var RPC_comet_table = new Object();
+ /* the id is null when the call is async */
function RPC_call(id, name, argument) {
var funct = RPC_comet_table[name];
if ( funct == null ) throw new Error("Rpc client "+name+" doesn't exists");
// hook for try catch there, and return a special case, maybe e.g. rpc_return_exc
var data = funct(argument);
if ('none' in data) {
if(window.console && window.console.error) window.console.error("RPC comet call ", id, " failed, no data in ", argument);
- } else {
+ } else if (id != null) {
internal_ajax({
type : 'POST',
url : "/rpc_return/"+id,
@@ -609,6 +610,9 @@ var LowLevelPingLoop = {};
case "rpc" :
RPC_call(mess.id, mess.name, mess.args);
break;
+ case "asyncrpc" :
+ RPC_call(null, mess.name, mess.args);
+ break;
case "chan" :
recovers_from_server(mess, js_none);
break;
@@ -49,6 +49,7 @@ module Client : sig
| SendCChan of (string * json)
| SendCChanThen of (string * json * (unit -> unit) * (unit -> unit))
| RPC of (string * string * json)
+ | AsyncRPC of (string * json)
val serialize : msg -> json Cps.t
@@ -81,6 +82,7 @@ end = struct
| SendCChan of (string * json)
| SendCChanThen of (string * json * (unit -> unit) * (unit -> unit))
| RPC of (string * string * json)
+ | AsyncRPC of (string * json)
let serialize json k =
match json with
@@ -104,6 +106,10 @@ end = struct
("id", JS.String id);
("name", JS.String name);
("args", json)] |> k
+ | AsyncRPC (name, json) ->
+ JS.Record [("type", JS.String "asyncrpc");
+ ("name", JS.String name);
+ ("args", json)] |> k
end
View
@@ -35,8 +35,9 @@ module RPC : sig
(** Call an rpc on the client identified by [cid] and send an
identifier. Register the cps continuation, this continuation
- will be called on [return].*)
- val call : string -> string -> string QmlCpsServerLib.continuation -> Client.key
+ will be called on [return].
+ When the boolean is true, the call should be synchronous *)
+ val call : bool -> string -> string -> string QmlCpsServerLib.continuation -> Client.key
-> bool
(** [return id response] Call the continuation corresponding to
@@ -77,18 +78,24 @@ end = struct
let _async_key = PingScheduler.sleep rpc_response_delay abort in
()
- let call fun_id args k cid =
- let id = generate_id () in
+ let call sync fun_id args k cid =
#<If:PING_DEBUG>
Logger.debug "[RPC] Try to call rpc %s on client %s"
fun_id (Client.key_to_string cid);
#<End>;
- Hashtbl.add rpc_ids id k;
- (* TODOK1 : args is a string but it should be a json! *)
- let mess = Client.RPC (string_of_int id, fun_id, JsonTypes.String args) in
+ let mess, id_opt =
+ if sync then
+ let id = generate_id () in
+ Hashtbl.add rpc_ids id k;
+ (* TODOK1 : args is a string but it should be a json! *)
+ Client.RPC (string_of_int id, fun_id, JsonTypes.String args), Some id
+ else
+ Client.AsyncRPC (fun_id, JsonTypes.String args), None in
if Ping.mem cid then (
Ping.send mess cid ;
- set_rpc_timeout cid fun_id id ;
+ (match id_opt with
+ | None -> ()
+ | Some id -> set_rpc_timeout cid fun_id id);
true
) else false
@@ -117,9 +124,9 @@ end
but it's back end dependent. We should use a ServerLib
function for translate ML string to OPA string... coming
soon? *)
-##register call : string, string, continuation('a), 'ctx -> bool
-let call fun_id args k key =
- RPC.call fun_id args (Obj.magic k) (Obj.magic key)
+##register call : bool, string, string, continuation('a), 'ctx -> bool
+let call sync fun_id args k key =
+ RPC.call sync fun_id args (Obj.magic k) (Obj.magic key)
(** This module is very dangerous, don't use it directly. It's a
module for RPC.*)
View
@@ -144,7 +144,7 @@ struct
let extract_values = !! "extract_values"
let fake_stub = !! "fake_stub"
let serialize = !! "serialize"
- let server_async_execute_without_reply = !! "Server_async_execute_without_reply"
+ let server_async_send_to_client = !! "Server_async_send_to_client"
let server_dispatcher_register = !! "Server_Dispatcher_register"
let server_send_to_client = !! "Server_send_to_client"
let server_try_cache = !! "Server_try_cache"
@@ -64,7 +64,7 @@
/* Export module OpaRPC_Server*/
@opacapi @server OpaRPC_Server_send_to_client = OpaRPC_Server.send_to_client
-@opacapi @server OpaRPC_Server_async_execute_without_reply = OpaRPC_Server.async_execute_without_reply
+@opacapi @server OpaRPC_Server_async_send_to_client = OpaRPC_Server.async_send_to_client
@opacapi @server OpaRPC_Server_Dispatcher_register = OpaRPC_Server.Dispatcher.register
@opacapi @server OpaRPC_Server_try_cache = OpaRPC_Server.try_cache
@@ -339,13 +339,15 @@ type OpaRPC.timeout = {
/**
* Sending a request to the client
*/
- send_to_client(fun_name : string, request : OpaRPC.request, ty : OpaType.ty) : 'a =
- send_response = %%BslRPC.call%%
- : string, /* id for return */
+ @private send_response = %%BslRPC.call%%
+ : bool, /* synchronous */
+ string, /* id for return */
string, /* serialized arguments */
(continuation(string)), /* continuation */
ThreadContext.client -> /* page id */
bool
+
+ send_to_client(fun_name : string, request : OpaRPC.request, ty : OpaType.ty) : 'a =
id = fun_name //plus some things
arg = OpaRPC.serialize(request)
serialized_return =
@@ -354,21 +356,25 @@ type OpaRPC.timeout = {
t = ThreadContext.get({from = k})
match t with
| {key = {client = x}; details = _; request = _} ->
- if not(send_response(id, arg, k, x)) then
+ if not(send_response(true, id, arg, k, x)) then
error("Server request client rpc but client wasn't ping ({fun_name})")
| _ ->
error("Invalid distant call to function ({fun_name}) at {__POSITION__}: there seems to be no client connected")
end
)
OpaSerialize.unserialize(serialized_return, ty) ? error("OPARPC : Request on client url {fun_name} has failed")
- /**
- * Execute asynchronously, don't produce any meaningful reply
- */
- async_execute_without_reply((expr: -> void)): void =
- (
- Scheduler.push(expr)
- )
+ @private dummy_cont = Continuation.make((_:string) -> @fail("Dummy cont should't be called"))
+ async_send_to_client(fun_name : string, request : OpaRPC.request, _) : 'a =
+ id = fun_name //plus some things
+ arg = OpaRPC.serialize(request)
+ match thread_context() with
+ | {key = {client = x}; details = _; request = _} ->
+ if not(send_response(false, id, arg, dummy_cont, x)) then
+ error("Server request client rpc but client wasn't ping ({fun_name})")
+ | _ ->
+ error("Invalid distant call to function ({fun_name}) at {__POSITION__}: there seems to be no client connected")
+ end
/**
* This module is a dispatcher of RPC on server

0 comments on commit 24f427d

Please sign in to comment.