Skip to content

Commit

Permalink
Merge pull request mirage#170 from talex5/fix-streams
Browse files Browse the repository at this point in the history
Fix races in stream handling
  • Loading branch information
talex5 committed Jul 5, 2016
2 parents 81e2a83 + a9e43d3 commit 19fa69a
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 15 deletions.
45 changes: 30 additions & 15 deletions src/vfs/vfs.ml
Expand Up @@ -92,30 +92,45 @@ module File = struct
let read t = t.read
let write t = t.write

type 'a session = { mutable v: 'a; c: 'a Lwt_condition.t }
type 'a session = { mutable v: 'a; c: unit Lwt_condition.t }

let session v = { v; c = Lwt_condition.create () }

let publish session v =
session.v <- v;
Lwt_condition.broadcast session.c v
Lwt_condition.broadcast session.c ()

let create pp session =
let data = ref (Cstruct.of_string (Fmt.to_to_string pp session.v)) in
let read_line () =
Lwt_condition.wait session.c >|= fun s ->
let line = Fmt.to_to_string pp s in
data := Cstruct.of_string line
in
(* [buffer] is the remainder of the line we're currently sending to the client.
[last] is the whole line (to avoid sending the same line twice).
When [data] is empty, we wait until [session]'s value is no longer [last] and
use that as the next [buffer].
[!buffer] is a thread in case the client does two reads at the same time,
although that doesn't make much sense for streams anyway. *)
let last = ref (Fmt.to_to_string pp session.v) in
let buffer = ref (Lwt.return (Cstruct.of_string !last)) in
let refill () =
if Cstruct.len !data = 0 then read_line () else Lwt.return_unit
let rec next () =
let current = Fmt.to_to_string pp session.v in
if current = !last then Lwt_condition.wait session.c >>= next
else (
last := current;
Lwt.return (Cstruct.of_string current)
)
in
buffer := next ()
in
let read count =
refill () >|= fun () ->
let count = min count (Cstruct.len !data) in
let response = Cstruct.sub !data 0 count in
data := Cstruct.shift !data count;
Ok (response)
let rec read count =
!buffer >>= fun avail ->
if Cstruct.len avail = 0 then (
refill ();
read count
) else (
let count = min count (Cstruct.len avail) in
let response = Cstruct.sub avail 0 count in
buffer := Lwt.return (Cstruct.shift avail count);
Lwt.return (Ok (response))
)
in
let write _ = err_read_only in
{ read; write }
Expand Down
33 changes: 33 additions & 0 deletions tests/test.ml
Expand Up @@ -610,6 +610,38 @@ let test_blobs_random () =
Alcotest.check (vfs_result reject) "Read after EOF" (Vfs.Error.offset_too_large ~offset:6L 5L) (Ivfs_blob.read b ~offset:6L ~count:1);
()

let test_streams () =
let ( >>*= ) x f =
x >>= function
| Ok y -> f y
| Error _ -> Alcotest.fail "VFS error" in
Lwt_main.run begin
let session = Vfs.File.Stream.session 0 in
let s = Vfs.File.Stream.create Fmt.int session in
let f = Vfs.File.of_stream (fun () -> Lwt.return s) in
Vfs.File.open_ f >>*= fun fd ->
let offset = ref 0L in
let rec read ?(saw_flush=false) expect =
Vfs.File.read fd ~offset:!offset ~count:1000 >>*= fun data ->
match Cstruct.to_string data with
| "" when saw_flush -> Alcotest.fail "End-of-file!"
| "" -> read ~saw_flush:true expect
| data ->
offset := Int64.add !offset (Int64.of_int (String.length data));
Alcotest.(check string) "read" expect data;
Lwt.return () in
read "0" >>= fun () ->
Vfs.File.Stream.publish session 1;
read "1" >>= fun () ->
Vfs.File.Stream.publish session 2;
Vfs.File.Stream.publish session 3;
read "3" >>= fun () ->
let th = read "4" in
Vfs.File.Stream.publish session 4;
th >>= fun () ->
Lwt.return ()
end

let run f () = Test_utils.run f

let test_set = [
Expand All @@ -631,6 +663,7 @@ let test_set = [
"Remotes" , `Slow , run test_remotes;
"Blobs fast" , `Quick , test_blobs_fast_path;
"Blobs random", `Quick , test_blobs_random;
"Streams", `Quick , test_streams;
]

let () =
Expand Down

0 comments on commit 19fa69a

Please sign in to comment.