Browse files

Add Lwt_ring.{write,push} where 'write' writes to ring slots without …

…updating the request pointers and 'push' updates the request pointers

This allows us to support protocols like that between netback/netfront
where a network packet consisting of multiple fragments must only be
exposed to the backend as a complete list.
  • Loading branch information...
1 parent 11e64b9 commit 36d27a333f62dcd20976ec18e0ef17f6f660744d David Scott committed Jan 10, 2013
Showing with 27 additions and 27 deletions.
  1. +20 −27 lwt/lwt_ring.ml
  2. +7 −0 lwt/lwt_ring.mli
View
47 lwt/lwt_ring.ml
@@ -57,35 +57,28 @@ module Front = struct
|None -> ()
|Some u -> Lwt.wakeup u ()
- let rec push_request_and_wait t notifyfn reqfn =
- if Ring.Rpc.Front.get_free_requests t.ring > 0 then begin
- let slot_id = Ring.Rpc.Front.next_req_id t.ring in
- let slot = Ring.Rpc.Front.slot t.ring slot_id in
- let th,u = Lwt.task () in
- let id = reqfn slot in
- if Ring.Rpc.Front.push_requests_and_check_notify t.ring
- then notifyfn ();
- Lwt.on_cancel th (fun _ -> Hashtbl.remove t.wakers id);
- Hashtbl.add t.wakers id u;
- th
- end else begin
- let th,u = Lwt.task () in
- let node = Lwt_sequence.add_r u t.waiters in
- Lwt.on_cancel th (fun _ -> Lwt_sequence.remove node);
- th >>
- push_request_and_wait t notifyfn reqfn
- end
+ let write t reqfn =
+ lwt () = wait_for_free_slot t in
+ let slot_id = Ring.Rpc.Front.next_req_id t.ring in
+ let slot = Ring.Rpc.Front.slot t.ring slot_id in
+ let th, u = Lwt.task () in
+ let id = reqfn slot in
+ Lwt.on_cancel th (fun _ -> Hashtbl.remove t.wakers id);
+ Hashtbl.add t.wakers id u;
+ return th
+
+ let push t notifyfn =
+ if Ring.Rpc.Front.push_requests_and_check_notify t.ring
+ then notifyfn ()
+
+ let push_request_and_wait t notifyfn reqfn =
+ lwt th = write t reqfn in
+ push t notifyfn;
+ th
let push_request_async t notifyfn reqfn freefn =
- lwt () = wait_for_free_slot t in
- let slot_id = Ring.Rpc.Front.next_req_id t.ring in
- let slot = Ring.Rpc.Front.slot t.ring slot_id in
- let th,u = Lwt.task () in
- let id = reqfn slot in
- if Ring.Rpc.Front.push_requests_and_check_notify t.ring
- then notifyfn ();
- Lwt.on_cancel th (fun _ -> Hashtbl.remove t.wakers id);
- Hashtbl.add t.wakers id u;
+ lwt th = write t reqfn in
+ push t notifyfn;
let _ = freefn th in
return ()
View
7 lwt/lwt_ring.mli
@@ -32,6 +32,13 @@ module Front : sig
*)
val init : ('a, 'b) Ring.Rpc.Front.t -> ('a,'b) t
+ (** Block until a ring slot is free, write the request and return the response thread *)
+ val write : ('a, 'b) t -> (buf -> 'b) -> 'a Lwt.t Lwt.t
+
+ (** Advance the shared ring pointers, exposing the written requests to the other end.
+ If the other end won't see the update, call the provided notify function to signal it. *)
+ val push : ('a, 'b) t -> (unit -> unit) -> unit
+
(** Push an asynchronous request to the slot and call [freefn] when a response comes in *)
val push_request_async : ('a,'b) t -> (unit -> unit) -> (buf -> 'b) -> ('a Lwt.t -> unit Lwt.t) -> unit Lwt.t

0 comments on commit 36d27a3

Please sign in to comment.