Skip to content


bug fixes to Shm_pipe to double buffer metadata pages more carefully
Browse files Browse the repository at this point in the history
  • Loading branch information
avsm committed Apr 1, 2012
1 parent 7fa095e commit 480ec8c
Showing 1 changed file with 165 additions and 52 deletions.
217 changes: 165 additions & 52 deletions lib/
Expand Up @@ -14,10 +14,14 @@

(* Fast bi-directional shared-memory, with reliable acks and flow control *)
(* Fast bi-directional shared-memory, which uses a metadata pipe to communicate
* extents that pass across to the other side, and a shared memory segment
* for the actual data transmission *)

open Lwt
open Printf

(* The state handle for a Shm_pipe *)
type handle = {
(* human-readable name for this handle *)
name: string;
Expand All @@ -29,30 +33,133 @@ type handle = {
mutable rx_closed: bool;

let dprintf fmt =
let xfn ch = fprintf ch fmt in
kfprintf xfn stderr "[%d] " (Unix.getpid ())

(* Metadata buffering and transmission *)
module MD = struct

let msg_send oc off len =
let (v,_,_) = BITSTRING { 0:2; off:30; len:31 } in
Lwt_io.write oc v
(* Metadata is marshalled into a string [buf] of length [len] via
* a simple wire protocol. The [off] tracks how full the buffer is
* and upon transmission, a set of spare buffers are swapped in
* while the main one is being transmitted. *)
type t = {
mutable buf: string;
len: int;
fd: Lwt_unix.file_descr;
mutable off: int;
spare: string Lwt_sequence.t;
spare_waiters: unit Lwt_condition.t;
mutable closed: bool;

let msg_free oc off len =
let (v,_,_) = BITSTRING { 1:2; off:30; len:31 } in
Lwt_io.write oc v
let plen = 16

let msg_close oc =
let (v,_,_) = BITSTRING { 2:2; 0L: 62 } in
Lwt_io.write oc v
(* Initialise a metadata buffer set *)
let init fd =
let len = 32768 in
let off = 0 in
let buf = String.create len in
let spare = Lwt_sequence.create () in
let _ = Lwt_sequence.add_r (String.create len) spare in
let _ = Lwt_sequence.add_r (String.create len) spare in
let _ = Lwt_sequence.add_r (String.create len) spare in
let spare_waiters = Lwt_condition.create () in
let closed = false in
{ buf; off; fd; len; spare; spare_waiters; closed }

let dprintf fmt =
let xfn ch = fprintf ch fmt in
kfprintf xfn stderr "[%d] " (Unix.getpid())
(* Transmit all metadata buffers to the other side. Errors
* will result in t.closed being set *)
let rec send_all t buf off len =
match_lwt Lwt_unix.send t.fd buf off len [] with
|0 | (-1) ->
t.closed <- true;
(* Return the buffer to the spare pool *)
let _ = Lwt_sequence.add_r buf t.spare in
(* Wake up anyone waiting for it *)
Lwt_condition.signal t.spare_waiters ();
return ()
|sent when sent = len ->
(* Return the buffer to the spare pool *)
let _ = Lwt_sequence.add_r buf t.spare in
(* Wake up anyone waiting for it *)
Lwt_condition.signal t.spare_waiters ();
return ()
|sent ->
(* Continue sending more *)
send_all t buf (off+sent) (len-sent)
with exn ->
t.closed <- true;
return ()

(* Flush all outstanding metadata to the other side *)
let rec flush t =
if t.closed then return () else
match with
|0 -> return ()
|off -> begin
(* Grab a spare to double buffer the active connection,
* as it may take some time to send the metadata *)
match Lwt_sequence.take_opt_l t.spare with
|None ->
lwt () = Lwt_condition.wait t.spare_waiters in
flush t
|Some sbuf ->
let buf = t.buf in
let off = in <- 0;
t.buf <- sbuf;
send_all t buf 0 off

(* Check if a buffer needs to be flushed to the other side.
* XXX Has a hacky timer that should be an autoflush or TCP delayed ack *)
let check_flush t =
match with
|0 -> return ()
|off ->
if >= t.len then
flush t
(* XXX set up autoflush *)
let _ = Lwt_unix.sleep 0.003 >> flush t in
return ()

(* Given a handle, retrieve a pair of input/output streams *)
(* Notify the other side of a frame to be transmitted *)
let send t (off,len) =
match t.closed with
|false ->
let len' = Int32.of_int len in let off' = Int32.of_int off in
let (v,_,_) = BITSTRING { 1L:64; off':32; len':32 } in
String.blit v 0 t.buf plen; <- + plen;
check_flush t >>
return true
|true -> return false

(* Release a buffer back to the transmitter *)
let free t (off,len) =
let len' = Int32.of_int len in let off' = Int32.of_int off in
let (v,_,_) = BITSTRING { 2L:64; off':32:int; len':32 } in
String.blit v 0 t.buf plen; <- + plen;
check_flush t

(* Flush and close a connection *)
let close t =
lwt () = flush t in
let buf = String.make plen '\000' in
lwt _ = Lwt_unix.send t.fd buf 0 plen [] in
return ()

(* Convert a Shm_pipe handle into an Lwt_flow construct. This gets
* Simplex.extents directly, without wrapping them in a Bigarray for now.
let make_flow handle =
(* Buffered metadata channels *)
let ic = Lwt_io.(of_fd ~buffer_size:65535 ~mode:input handle.fd) in
let oc = Lwt_io.(of_fd ~buffer_size:65535 ~mode:output handle.fd) in
let t = MD.init handle.fd in
(* Listeners waiting for new incoming extents *)
let rx_waiters = Lwt_sequence.create () in
(* Listeners waiting for free space on the transmit queue *)
Expand Down Expand Up @@ -101,46 +208,51 @@ let make_flow handle =
(* Release a local TX buffer transmitter has decided not to use
* any more *)
let tx_release ext = Simplex.release ext; return () in
(* XXX notify the transmitter that the receiver has stopped listening, somehow.
* Right now we just ignore the metadata pipe disappearing *)
let tx_send ext =
MD.msg_send oc (Simplex.offset ext) (Simplex.length ext) >>
MD.send t ((Simplex.offset ext),(Simplex.length ext)) >>
return true
with exn -> return false
with exn ->
return false
let tx_close () =
MD.msg_close oc
let rx_release ext =
MD.msg_free oc (Simplex.offset ext) (Simplex.length ext)
let tx_close () = MD.close t in
let rx_release ext = t ((Simplex.offset ext),(Simplex.length ext)) in
(* The metadata pipe coordinates all this *)
let _ =
let rbuf = String.create 8 in
let rbs = rbuf, 0, (8*8) in
let rbuf = String.create t.MD.len in
(* Create a buffered pipe *)
try_lwt while_lwt true do
lwt () = Lwt_io.read_into_exactly ic rbuf 0 8 in
bitmatch rbs with
| {0:2; off:30; len:31} ->
let ext = Simplex.make_rx handle.rx off len in
(match Lwt_sequence.take_opt_l rx_waiters with
|None -> ignore(Lwt_sequence.add_r ext rx_q)
|Some u -> Lwt.wakeup u (Some ext));
return ()
| {1:2; off:30; len:31} ->
let ext = Simplex.make_tx handle.tx off len in
Simplex.release ext;
(match Lwt_sequence.take_opt_l tx_waiters with
|None -> ()
|Some u -> Lwt.wakeup u ());
return ();
| {2:2; 0:30 } ->
handle.rx_closed <- true;
Lwt_sequence.iter_l (fun u -> Lwt.wakeup u None) rx_waiters;
return ()
done with exn ->
while_lwt true do begin
match_lwt Lwt_unix.recv t.MD.fd rbuf 0 t.MD.len [] with
|0 | (-1) -> fail (Failure "")
|sz ->
let off = ref 0 in
while !off < sz do
let rbs = (rbuf, (!off * 8), (MD.plen*8)) in
off := !off + MD.plen;
(bitmatch rbs with
| {0L:64 } ->
handle.rx_closed <- true;
Lwt_sequence.iter_l (fun u -> Lwt.wakeup u None) rx_waiters
| {1L:64; off:32; len:32} -> begin
let off = Int32.to_int off in let len = Int32.to_int len in
let ext = Simplex.make_rx handle.rx off len in
match Lwt_sequence.take_opt_l rx_waiters with
|None -> ignore(Lwt_sequence.add_r ext rx_q)
|Some u -> Lwt.wakeup u (Some ext);
| {2L:64; off:32; len:32} -> begin
let off = Int32.to_int off in let len = Int32.to_int len in
let ext = Simplex.make_tx handle.tx off len in
Simplex.release ext;
match Lwt_sequence.take_opt_l tx_waiters with
| None -> ()
| Some u -> Lwt.wakeup u ()
return ()
end done with exn ->
handle.rx_closed <- true;
Lwt_sequence.iter_l (fun u -> Lwt.wakeup u None) rx_waiters;
return ()
Expand Down Expand Up @@ -208,7 +320,7 @@ let listen fd =
(* Connect to the listening socket and establish a duplex channel *)
let connect fd =
Sys.set_signal Sys.sigpipe Sys.Signal_ignore;
let nr_pages = 8 in
let nr_pages = 64 in
let nr_bytes = nr_pages * 4096 in
(* connect to the socket and send the shmem fd *)
(* make an shm fd *)
Expand All @@ -232,3 +344,4 @@ let connect fd =
let recv_fd = match fds with |[fd] -> Shm.shm_of_unix_descr fd |_ -> assert false in
let recv_ring_rx = Simplex.attach_rx recv_fd server_h.hc_rx_len in
return { name=server_h.hs_name; tx=send_ring_tx; rx=recv_ring_rx; fd=md1; rx_closed=false }

0 comments on commit 480ec8c

Please sign in to comment.