Permalink
Browse files

The lwt push request functions take a (unit -> unit) callback to allo…

…w a notification to be sent to the other end.

A user may decide to pass in (fun () -> ()) and batch notifications, or
send one notification per request.
  • Loading branch information...
1 parent d3534a1 commit 91de275a623142c6f362b0ac57274253b32fc208 David Scott committed Dec 14, 2012
Showing with 28 additions and 25 deletions.
  1. +10 −10 lwt/lwt_ring.ml
  2. +11 −10 lwt/lwt_ring.mli
  3. +7 −5 lwt_test/lwt_test.ml
View
20 lwt/lwt_ring.ml
@@ -17,7 +17,7 @@
open Lwt
open Printf
-module Client = struct
+module Front = struct
type ('a, 'b) t = {
ring: ('a, 'b) Ring.Rpc.Front.t;
@@ -55,14 +55,14 @@ module Client = struct
|None -> ()
|Some u -> Lwt.wakeup u ()
- let rec push_request_and_wait t reqfn =
+ let rec push_request_and_wait t notifyfn reqfn =
if Ring.Rpc.Front.get_free_requests t.ring > 0 then begin
let slot_id = Ring.Rpc.Front.next_req_id t.ring in
let slot = Ring.Rpc.Front.slot t.ring slot_id in
let th,u = Lwt.task () in
let id = reqfn slot in
- if Ring.Rpc.Front.push_requests_and_check_notify t.ring
- then printf "TX: need to signal event channel\n%!";
+ if Ring.Rpc.Front.push_requests_and_check_notify t.ring
+ then notifyfn ();
Lwt.on_cancel th (fun _ -> Hashtbl.remove t.wakers id);
Hashtbl.add t.wakers id u;
th
@@ -71,38 +71,38 @@ module Client = struct
let node = Lwt_sequence.add_r u t.waiters in
Lwt.on_cancel th (fun _ -> Lwt_sequence.remove node);
th >>
- push_request_and_wait t reqfn
+ push_request_and_wait t notifyfn reqfn
end
- let push_request_async t reqfn freefn =
+ let push_request_async t notifyfn reqfn freefn =
lwt () = wait_for_free_slot t in
let slot_id = Ring.Rpc.Front.next_req_id t.ring in
let slot = Ring.Rpc.Front.slot t.ring slot_id in
let th,u = Lwt.task () in
let id = reqfn slot in
if Ring.Rpc.Front.push_requests_and_check_notify t.ring
- then printf "TX: need to signal event channel\n%!";
+ then notifyfn ();
Lwt.on_cancel th (fun _ -> Hashtbl.remove t.wakers id);
Hashtbl.add t.wakers id u;
let _ = th >> return (freefn ()) in
return ()
end
-module Server = struct
+module Back = struct
type ('a, 'b) t = {
ring: ('a, 'b) Ring.Rpc.Back.t;
}
let init ring =
{ ring }
- let push_response t rspfn =
+ let push_response t notifyfn rspfn =
let slot_id = Ring.Rpc.Back.next_res_id t.ring in
let slot = Ring.Rpc.Back.slot t.ring slot_id in
rspfn slot;
if Ring.Rpc.Back.push_responses_and_check_notify t.ring
- then printf "TX: need to signal event channel\n%!"
+ then notifyfn ()
let poll t fn =
Ring.Rpc.Back.ack_requests t.ring fn
View
21 lwt/lwt_ring.mli
@@ -18,8 +18,8 @@
open Ring
-(** The client front-end connection to the shared ring *)
-module Client : sig
+(** The (client) front-end connection to the shared ring *)
+module Front : sig
(** 'a is the response type, and 'b is the request id type (e.g. int or int64) *)
type ('a,'b) t
@@ -31,16 +31,17 @@ module Client : sig
val init : ('a, 'b) Ring.Rpc.Front.t -> ('a,'b) t
(** Push an asynchronous request to the slot and call [freefn] when a response comes in *)
- val push_request_async : ('a,'b) t -> (buf -> 'b) -> (unit -> unit) -> unit Lwt.t
+ val push_request_async : ('a,'b) t -> (unit -> unit) -> (buf -> 'b) -> (unit -> unit) -> unit Lwt.t
- (** Given a function {[fn]} which writes to a slot and returns
+ (** Given a function {[fn] [notify_cb]} which writes to a slot and returns
the request id, this will wait for a free request slot,
write the request, and return with the response when it
is available.
@param fn Function that writes to a request slot and returns the request id
+ @param notify_cb Callback function which should trigger a notify of the remote
@return Thread which returns the response value to the input request
*)
- val push_request_and_wait : ('a,'b) t -> (buf -> 'b) -> 'a Lwt.t
+ val push_request_and_wait : ('a,'b) t -> (unit -> unit) -> (buf -> 'b) -> 'a Lwt.t
(** Poll the ring for responses, and wake up any threads that are
sleeping (as a result of calling {[push_request_and_wait]}).
@@ -50,8 +51,8 @@ module Client : sig
val poll : ('a,'b) t -> (buf -> ('b * 'a)) -> unit
end
-(** The server back-end connection to the shared ring *)
-module Server : sig
+(** The (server) back-end connection to the shared ring *)
+module Back : sig
(** 'a is the response type, and 'b is the request id type (e.g. int or int64) *)
type ('a,'b) t
@@ -62,8 +63,8 @@ module Server : sig
*)
val init : ('a, 'b) Ring.Rpc.Back.t -> ('a,'b) t
- (** [push_response t fn] finds a free slot and applies it to [fn],
- signalling the client that a response is ready. *)
- val push_response : ('a, 'b) t -> (buf -> unit) -> unit
+ (** [push_response t notifyfn fn] finds a free slot and applies it to [fn],
+ signalling the client via [notifyfn] that a response is ready. *)
+ val push_response : ('a, 'b) t -> (unit -> unit) -> (buf -> unit) -> unit
end
View
12 lwt_test/lwt_test.ml
@@ -32,24 +32,26 @@ let one_request_response () =
Printf.fprintf stdout "%s\n%!" (Ring.Rpc.Back.to_string back);
assert_equal ~msg:"more_to_do" ~printer:string_of_bool false (Ring.Rpc.Back.more_to_do back);
- let client = Lwt_ring.Client.init front in
- let server = Lwt_ring.Server.init back in
+ let client = Lwt_ring.Front.init front in
+ let server = Lwt_ring.Back.init back in
let id = () in
- let request_th = Lwt_ring.Client.push_request_and_wait client (fun _ -> id) in
+ let must_notify = ref false in
+ let request_th = Lwt_ring.Front.push_request_and_wait client (fun () -> must_notify := true) (fun _ -> id) in
+ assert_equal ~msg:"must_notify" ~printer:string_of_bool true must_notify;
Printf.fprintf stdout "%s\n%!" (Ring.Rpc.Back.to_string back);
assert_equal ~msg:"more_to_do" ~printer:string_of_bool true (Ring.Rpc.Back.more_to_do back);
let finished = ref false in
Ring.Rpc.Back.ack_requests back (fun _ -> finished := true);
assert_equal ~msg:"ack_requests" ~printer:string_of_bool true (!finished);
- Lwt_ring.Server.push_response server (fun _ -> ());
+ Lwt_ring.Back.push_response server (fun _ -> ());
Printf.fprintf stdout "%s\n%!" (Ring.Rpc.Back.to_string back);
let replied = ref false in
- Lwt_ring.Client.poll client (fun _ -> replied := true; id, ());
+ Lwt_ring.Front.poll client (fun _ -> replied := true; id, ());
assert_equal ~msg:"poll" ~printer:string_of_bool true (!replied);
assert_equal ~msg:"more_to_do" ~printer:string_of_bool false (Ring.Rpc.Back.more_to_do back);

0 comments on commit 91de275

Please sign in to comment.