Skip to content

Commit

Permalink
adapt the Lwt_flow.TX.send to return a bool on success, to indicate E…
Browse files Browse the repository at this point in the history
…OF conditions to the transmitter
  • Loading branch information
avsm committed Mar 30, 2012
1 parent 7e8392c commit b37840a
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 5 deletions.
5 changes: 3 additions & 2 deletions lib/Lwt_flow.ml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ open Printf
type ba = (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t

type 'a tx = {
tx_send: 'a -> unit Lwt.t;
tx_send: 'a -> bool Lwt.t;
tx_close: unit -> unit Lwt.t;
tx_alloc: int -> 'a Lwt.t;
tx_release: 'a -> unit Lwt.t;
Expand All @@ -41,7 +41,8 @@ type ('a, 'b) t = {
let make ~rx_stream ~rx_release ~tx_send ~tx_release ~tx_close ~tx_alloc =
let tx = { tx_send; tx_release; tx_close; tx_alloc } in
let rx = { rx_stream; rx_release } in
{tx; rx}
let t, u = Lwt.task () in
{tx; rx; }

module TX = struct

Expand Down
5 changes: 4 additions & 1 deletion lib/shm_pipe.ml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ let make_flow handle =
let md_write op = try_lwt Lwt_io.write_value oc op with exn -> return () in
(* XXX notify the transmitter that the receiver has stopped listening, somehow.
* Right now we just ignore the metadata pipe disappearing *)
let tx_send extent = md_write (Simplex.to_send_op extent) in
let tx_send extent =
try_lwt Lwt_io.write_value oc (Simplex.to_send_op extent) >> return true
with exn -> return false
in
let tx_close () = md_write (Simplex.to_close_op) in
let rx_release extent = md_write(Simplex.to_free_op extent) in
(* The metadata pipe coordinates all this *)
Expand Down
7 changes: 5 additions & 2 deletions lib/tcp_pipe.ml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ let dprintf fmt =
let make_flow h =
let rx_stream = Lwt_stream.from
(fun () ->
match_lwt Lwt_io.read h.ic with
try_lwt
|"" -> return None
|x -> return (Some x)
)
Expand All @@ -52,7 +52,10 @@ let make_flow h =
(try Unix.close (Lwt_unix.unix_file_descr h.fd) with _ -> ());
return ()
in
let tx_send buf = Lwt_io.write h.oc buf in
let tx_send buf =
try_lwt Lwt_io.write h.oc buf >> return true
with exn -> return false
in
Lwt_flow.make ~rx_stream ~rx_release ~tx_send ~tx_release ~tx_close ~tx_alloc

let string_of_sockaddr =
Expand Down

0 comments on commit b37840a

Please sign in to comment.