From 480ec8cb0e0f7b4b66c96906fc3d81b368ba5837 Mon Sep 17 00:00:00 2001 From: Anil Madhavapeddy Date: Sun, 1 Apr 2012 21:46:23 +0100 Subject: [PATCH] bug fixes to Shm_pipe to double buffer metadata pages more carefully --- lib/shm_pipe.ml | 217 ++++++++++++++++++++++++++++++++++++------------ 1 file changed, 165 insertions(+), 52 deletions(-) diff --git a/lib/shm_pipe.ml b/lib/shm_pipe.ml index 5517ce2..30aa47d 100644 --- a/lib/shm_pipe.ml +++ b/lib/shm_pipe.ml @@ -14,10 +14,14 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. *) -(* Fast bi-directional shared-memory, with reliable acks and flow control *) +(* Fast bi-directional shared-memory, which uses a metadata pipe to communicate + * extents that pass across to the other side, and a shared memory segment + * for the actual data transmission *) + open Lwt open Printf +(* The state handle for a Shm_pipe *) type handle = { (* human-readable name for this handle *) name: string; @@ -29,30 +33,133 @@ type handle = { mutable rx_closed: bool; } +let dprintf fmt = + let xfn ch = fprintf ch fmt in + kfprintf xfn stderr "[%d] " (Unix.getpid ()) + +(* Metadata buffering and transmission *) module MD = struct - let msg_send oc off len = - let (v,_,_) = BITSTRING { 0:2; off:30; len:31 } in - Lwt_io.write oc v + (* Metadata is marshalled into a string [buf] of length [len] via + * a simple wire protocol. The [off] tracks how full the buffer is + * and upon transmission, a set of spare buffers are swapped in + * while the main one is being transmitted. *) + type t = { + mutable buf: string; + len: int; + fd: Lwt_unix.file_descr; + mutable off: int; + spare: string Lwt_sequence.t; + spare_waiters: unit Lwt_condition.t; + mutable closed: bool; + } - let msg_free oc off len = - let (v,_,_) = BITSTRING { 1:2; off:30; len:31 } in - Lwt_io.write oc v + let plen = 16 - let msg_close oc = - let (v,_,_) = BITSTRING { 2:2; 0L: 62 } in - Lwt_io.write oc v -end + (* Initialise a metadata buffer set *) + let init fd = + let len = 32768 in + let off = 0 in + let buf = String.create len in + let spare = Lwt_sequence.create () in + let _ = Lwt_sequence.add_r (String.create len) spare in + let _ = Lwt_sequence.add_r (String.create len) spare in + let _ = Lwt_sequence.add_r (String.create len) spare in + let spare_waiters = Lwt_condition.create () in + let closed = false in + { buf; off; fd; len; spare; spare_waiters; closed } -let dprintf fmt = - let xfn ch = fprintf ch fmt in - kfprintf xfn stderr "[%d] " (Unix.getpid()) + (* Transmit all metadata buffers to the other side. Errors + * will result in t.closed being set *) + let rec send_all t buf off len = + try_lwt + match_lwt Lwt_unix.send t.fd buf off len [] with + |0 | (-1) -> + t.closed <- true; + (* Return the buffer to the spare pool *) + let _ = Lwt_sequence.add_r buf t.spare in + (* Wake up anyone waiting for it *) + Lwt_condition.signal t.spare_waiters (); + return () + |sent when sent = len -> + (* Return the buffer to the spare pool *) + let _ = Lwt_sequence.add_r buf t.spare in + (* Wake up anyone waiting for it *) + Lwt_condition.signal t.spare_waiters (); + return () + |sent -> + (* Continue sending more *) + send_all t buf (off+sent) (len-sent) + with exn -> + t.closed <- true; + return () + + (* Flush all outstanding metadata to the other side *) + let rec flush t = + if t.closed then return () else + match t.off with + |0 -> return () + |off -> begin + (* Grab a spare to double buffer the active connection, + * as it may take some time to send the metadata *) + match Lwt_sequence.take_opt_l t.spare with + |None -> + lwt () = Lwt_condition.wait t.spare_waiters in + flush t + |Some sbuf -> + let buf = t.buf in + let off = t.off in + t.off <- 0; + t.buf <- sbuf; + send_all t buf 0 off + end + + (* Check if a buffer needs to be flushed to the other side. + * XXX Has a hacky timer that should be an autoflush or TCP delayed ack *) + let check_flush t = + match t.off with + |0 -> return () + |off -> + if t.off >= t.len then + flush t + else + (* XXX set up autoflush *) + let _ = Lwt_unix.sleep 0.003 >> flush t in + return () -(* Given a handle, retrieve a pair of input/output streams *) + (* Notify the other side of a frame to be transmitted *) + let send t (off,len) = + match t.closed with + |false -> + let len' = Int32.of_int len in let off' = Int32.of_int off in + let (v,_,_) = BITSTRING { 1L:64; off':32; len':32 } in + String.blit v 0 t.buf t.off plen; + t.off <- t.off + plen; + check_flush t >> + return true + |true -> return false + + (* Release a buffer back to the transmitter *) + let free t (off,len) = + let len' = Int32.of_int len in let off' = Int32.of_int off in + let (v,_,_) = BITSTRING { 2L:64; off':32:int; len':32 } in + String.blit v 0 t.buf t.off plen; + t.off <- t.off + plen; + check_flush t + + (* Flush and close a connection *) + let close t = + lwt () = flush t in + let buf = String.make plen '\000' in + lwt _ = Lwt_unix.send t.fd buf 0 plen [] in + return () +end + +(* Convert a Shm_pipe handle into an Lwt_flow construct. This gets + * Simplex.extents directly, without wrapping them in a Bigarray for now. + *) let make_flow handle = - (* Buffered metadata channels *) - let ic = Lwt_io.(of_fd ~buffer_size:65535 ~mode:input handle.fd) in - let oc = Lwt_io.(of_fd ~buffer_size:65535 ~mode:output handle.fd) in + let t = MD.init handle.fd in (* Listeners waiting for new incoming extents *) let rx_waiters = Lwt_sequence.create () in (* Listeners waiting for free space on the transmit queue *) @@ -101,46 +208,51 @@ let make_flow handle = (* Release a local TX buffer transmitter has decided not to use * any more *) let tx_release ext = Simplex.release ext; return () in - (* XXX notify the transmitter that the receiver has stopped listening, somehow. - * Right now we just ignore the metadata pipe disappearing *) let tx_send ext = try_lwt - MD.msg_send oc (Simplex.offset ext) (Simplex.length ext) >> + MD.send t ((Simplex.offset ext),(Simplex.length ext)) >> return true - with exn -> return false + with exn -> + return false in - let tx_close () = - MD.msg_close oc - in - let rx_release ext = - MD.msg_free oc (Simplex.offset ext) (Simplex.length ext) - in + let tx_close () = MD.close t in + let rx_release ext = MD.free t ((Simplex.offset ext),(Simplex.length ext)) in (* The metadata pipe coordinates all this *) let _ = - let rbuf = String.create 8 in - let rbs = rbuf, 0, (8*8) in + let rbuf = String.create t.MD.len in (* Create a buffered pipe *) - try_lwt while_lwt true do - lwt () = Lwt_io.read_into_exactly ic rbuf 0 8 in - bitmatch rbs with - | {0:2; off:30; len:31} -> - let ext = Simplex.make_rx handle.rx off len in - (match Lwt_sequence.take_opt_l rx_waiters with - |None -> ignore(Lwt_sequence.add_r ext rx_q) - |Some u -> Lwt.wakeup u (Some ext)); - return () - | {1:2; off:30; len:31} -> - let ext = Simplex.make_tx handle.tx off len in - Simplex.release ext; - (match Lwt_sequence.take_opt_l tx_waiters with - |None -> () - |Some u -> Lwt.wakeup u ()); - return (); - | {2:2; 0:30 } -> - handle.rx_closed <- true; - Lwt_sequence.iter_l (fun u -> Lwt.wakeup u None) rx_waiters; - return () - done with exn -> + try_lwt + while_lwt true do begin + match_lwt Lwt_unix.recv t.MD.fd rbuf 0 t.MD.len [] with + |0 | (-1) -> fail (Failure "") + |sz -> + let off = ref 0 in + while !off < sz do + let rbs = (rbuf, (!off * 8), (MD.plen*8)) in + off := !off + MD.plen; + (bitmatch rbs with + | {0L:64 } -> + handle.rx_closed <- true; + Lwt_sequence.iter_l (fun u -> Lwt.wakeup u None) rx_waiters + | {1L:64; off:32; len:32} -> begin + let off = Int32.to_int off in let len = Int32.to_int len in + let ext = Simplex.make_rx handle.rx off len in + match Lwt_sequence.take_opt_l rx_waiters with + |None -> ignore(Lwt_sequence.add_r ext rx_q) + |Some u -> Lwt.wakeup u (Some ext); + end + | {2L:64; off:32; len:32} -> begin + let off = Int32.to_int off in let len = Int32.to_int len in + let ext = Simplex.make_tx handle.tx off len in + Simplex.release ext; + match Lwt_sequence.take_opt_l tx_waiters with + | None -> () + | Some u -> Lwt.wakeup u () + end + ) + done; + return () + end done with exn -> handle.rx_closed <- true; Lwt_sequence.iter_l (fun u -> Lwt.wakeup u None) rx_waiters; return () @@ -208,7 +320,7 @@ let listen fd = (* Connect to the listening socket and establish a duplex channel *) let connect fd = Sys.set_signal Sys.sigpipe Sys.Signal_ignore; - let nr_pages = 8 in + let nr_pages = 64 in let nr_bytes = nr_pages * 4096 in (* connect to the socket and send the shmem fd *) (* make an shm fd *) @@ -232,3 +344,4 @@ let connect fd = let recv_fd = match fds with |[fd] -> Shm.shm_of_unix_descr fd |_ -> assert false in let recv_ring_rx = Simplex.attach_rx recv_fd server_h.hc_rx_len in return { name=server_h.hs_name; tx=send_ring_tx; rx=recv_ring_rx; fd=md1; rx_closed=false } +