Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

various fixes to performance (but its still bottlenecked on TX memory…

… alloc for large streams)
  • Loading branch information...
commit bc8747ec97a5d6e8f739a363011da3584fd34f1d 1 parent 480ec8c
@avsm authored
Showing with 33 additions and 21 deletions.
  1. +33 −21 lib/shm_pipe.ml
View
54 lib/shm_pipe.ml
@@ -58,7 +58,7 @@ module MD = struct
(* Initialise a metadata buffer set *)
let init fd =
- let len = 32768 in
+ let len = 16384 in
let off = 0 in
let buf = String.create len in
let spare = Lwt_sequence.create () in
@@ -73,7 +73,7 @@ module MD = struct
* will result in t.closed being set *)
let rec send_all t buf off len =
try_lwt
- match_lwt Lwt_unix.send t.fd buf off len [] with
+ match_lwt Lwt_unix.write t.fd buf off len with
|0 | (-1) ->
t.closed <- true;
(* Return the buffer to the spare pool *)
@@ -91,6 +91,7 @@ module MD = struct
(* Continue sending more *)
send_all t buf (off+sent) (len-sent)
with exn ->
+ (try Unix.close (Lwt_unix.unix_file_descr t.fd) with exn -> ());
t.closed <- true;
return ()
@@ -105,7 +106,7 @@ module MD = struct
match Lwt_sequence.take_opt_l t.spare with
|None ->
lwt () = Lwt_condition.wait t.spare_waiters in
- flush t
+ if t.closed then return () else flush t
|Some sbuf ->
let buf = t.buf in
let off = t.off in
@@ -124,7 +125,7 @@ module MD = struct
flush t
else
(* XXX set up autoflush *)
- let _ = Lwt_unix.sleep 0.003 >> flush t in
+ let _ = Lwt_unix.sleep 0.001 >> flush t in
return ()
(* Notify the other side of a frame to be transmitted *)
@@ -141,11 +142,14 @@ module MD = struct
(* Release a buffer back to the transmitter *)
let free t (off,len) =
- let len' = Int32.of_int len in let off' = Int32.of_int off in
- let (v,_,_) = BITSTRING { 2L:64; off':32:int; len':32 } in
- String.blit v 0 t.buf t.off plen;
- t.off <- t.off + plen;
- check_flush t
+ match t.closed with
+ |false ->
+ let len' = Int32.of_int len in let off' = Int32.of_int off in
+ let (v,_,_) = BITSTRING { 2L:64; off':32:int; len':32 } in
+ String.blit v 0 t.buf t.off plen;
+ t.off <- t.off + plen;
+ check_flush t
+ |true -> return ()
(* Flush and close a connection *)
let close t =
@@ -215,15 +219,14 @@ let make_flow handle =
with exn ->
return false
in
- let tx_close () = MD.close t in
let rx_release ext = MD.free t ((Simplex.offset ext),(Simplex.length ext)) in
(* The metadata pipe coordinates all this *)
- let _ =
+ let read_t =
let rbuf = String.create t.MD.len in
- (* Create a buffered pipe *)
- try_lwt
- while_lwt true do begin
- match_lwt Lwt_unix.recv t.MD.fd rbuf 0 t.MD.len [] with
+ try_lwt
+ while_lwt t.MD.closed <> true do begin
+ let got_free = ref 0 in
+ match_lwt Lwt_unix.read t.MD.fd rbuf 0 t.MD.len with
|0 | (-1) -> fail (Failure "")
|sz ->
let off = ref 0 in
@@ -245,18 +248,26 @@ let make_flow handle =
let off = Int32.to_int off in let len = Int32.to_int len in
let ext = Simplex.make_tx handle.tx off len in
Simplex.release ext;
- match Lwt_sequence.take_opt_l tx_waiters with
- | None -> ()
- | Some u -> Lwt.wakeup u ()
+ incr got_free;
end
)
done;
+ (* Notify all the receivers in one go, rather than iddy biddy wakeups
+ * during the frees themselves (as they might want larger blocks *)
+ if !got_free > 0 then begin
+ Lwt_sequence.iter_node_l (fun node ->
+ let u = Lwt_sequence.get node in
+ Lwt_sequence.remove node;
+ Lwt.wakeup_later u ()
+ ) tx_waiters
+ end;
return ()
end done with exn ->
handle.rx_closed <- true;
Lwt_sequence.iter_l (fun u -> Lwt.wakeup u None) rx_waiters;
return ()
in
+ let tx_close () = Lwt.cancel read_t; MD.close t in
Lwt_flow.make ~rx_stream ~rx_release ~tx_send ~tx_release ~tx_close ~tx_alloc
(* A connect consists of a single handshake "packet" that
@@ -277,7 +288,7 @@ type handshake_client = {
}
let make_recv_ivs () =
- let length = 8192 in
+ let length = 32768 in
let buffer = String.create length in
[Lwt_unix.io_vector ~buffer ~offset:0 ~length], buffer
@@ -288,12 +299,13 @@ let make_send_ivs v =
let listen fd =
Sys.set_signal Sys.sigpipe Sys.Signal_ignore;
- let recv_buf = 32768 in
let io_vectors, buffer = make_recv_ivs () in
Lwt_stream.from (fun () ->
try_lwt
lwt d, fds = Lwt_unix.recv_msg ~socket:fd ~io_vectors in
let client_h = Marshal.from_string buffer 0 in
+ (* Mirror the transmit buffer size for now *)
+ let recv_buf = client_h.hc_tx_len in
let md, shm =
match fds with
|[md;shm] ->
@@ -320,7 +332,7 @@ let listen fd =
(* Connect to the listening socket and establish a duplex channel *)
let connect fd =
Sys.set_signal Sys.sigpipe Sys.Signal_ignore;
- let nr_pages = 64 in
+ let nr_pages = 4096 in
let nr_bytes = nr_pages * 4096 in
(* connect to the socket and send the shmem fd *)
(* make an shm fd *)
Please sign in to comment.
Something went wrong with that request. Please try again.