Skip to content

Commit

Permalink
Forward exceptions across streams
Browse files Browse the repository at this point in the history
  • Loading branch information
aantron committed Jan 15, 2022
1 parent 4dce6a7 commit 5cd57e1
Show file tree
Hide file tree
Showing 9 changed files with 206 additions and 98 deletions.
1 change: 1 addition & 0 deletions src/dream.ml
Expand Up @@ -166,6 +166,7 @@ let flush_stream = Stream.flush
let ping_stream = Stream.ping
let pong_stream = Stream.pong
let close_stream = Stream.close
let abort_stream = Stream.abort



Expand Down
17 changes: 11 additions & 6 deletions src/dream.mli
Expand Up @@ -826,10 +826,11 @@ val set_server_stream : 'a message -> stream -> unit
val read_stream :
stream ->
data:(buffer -> int -> int -> bool -> bool -> unit) ->
close:(int -> unit) ->
flush:(unit -> unit) ->
ping:(buffer -> int -> int -> unit) ->
pong:(buffer -> int -> int -> unit) ->
close:(int -> unit) ->
exn:(exn -> unit) ->
unit
(** Waits for the next stream event, and calls:
Expand All @@ -838,23 +839,27 @@ val read_stream :
- [~exn] to report an exception. *)

val ready_stream :
stream -> close:(int -> unit) -> (unit -> unit) -> unit
stream -> close:(int -> unit) -> exn:(exn -> unit) -> (unit -> unit) -> unit

val write_stream :
stream -> buffer -> int -> int -> bool -> bool -> close:(int -> unit) -> (unit -> unit) -> unit
stream -> buffer -> int -> int -> bool -> bool -> close:(int -> unit) -> exn:(exn -> unit) -> (unit -> unit) -> unit

val flush_stream :
stream -> close:(int -> unit) -> (unit -> unit) -> unit
stream -> close:(int -> unit) -> exn:(exn -> unit) -> (unit -> unit) -> unit

val ping_stream :
stream -> buffer -> int -> int -> close:(int -> unit) -> (unit -> unit) -> unit
stream -> buffer -> int -> int -> close:(int -> unit) -> exn:(exn -> unit) -> (unit -> unit) -> unit

val pong_stream :
stream -> buffer -> int -> int -> close:(int -> unit) -> (unit -> unit) -> unit
stream -> buffer -> int -> int -> close:(int -> unit) -> exn:(exn -> unit) -> (unit -> unit) -> unit

val close_stream :
stream -> int -> unit

val abort_stream :
stream -> exn -> unit
(* TODO Line widths above. *)

(**/**)
val write_buffer :
?offset:int -> ?length:int -> response -> buffer -> unit promise
Expand Down
5 changes: 4 additions & 1 deletion src/http/adapt.ml
Expand Up @@ -26,6 +26,8 @@ let forward_body_general
http_flush
close =

let abort _exn = close 1000 in

let bytes_since_flush = ref 0 in

let rec send () =
Expand All @@ -34,10 +36,11 @@ let forward_body_general
Stream.read
stream
~data
~close
~flush
~ping
~pong
~close
~exn:abort

and data chunk off len _binary _fin =
write_buffer ~off ~len chunk;
Expand Down
25 changes: 14 additions & 11 deletions src/http/http.ml
Expand Up @@ -120,7 +120,7 @@ let websocket_handler response socket =

(* TODO Can this be canceled by a user's close? i.e. will that eventually
cause a call to eof above? *)
let rec read ~data ~close ~flush ~ping ~pong =
let rec read ~data ~flush ~ping ~pong ~close ~exn =
if !closed then
close !close_code
else
Expand Down Expand Up @@ -157,10 +157,10 @@ let websocket_handler response socket =
drain_payload payload @@ fun _buffer _offset length ->
websocket_log.warning (fun log ->
log "Unknown frame type with length %i" length);
read ~data ~close ~flush ~ping ~pong
read ~data ~flush ~ping ~pong ~close ~exn
| Some (`Data properties, payload) ->
current_payload := Some (properties, payload);
read ~data ~close ~flush ~ping ~pong
read ~data ~flush ~ping ~pong ~close ~exn
end
| Some ((binary, fin), payload) ->
Websocketaf.Payload.schedule_read
Expand All @@ -169,7 +169,7 @@ let websocket_handler response socket =
match !last_chunk with
| None ->
last_chunk := Some (buffer, off, len);
read ~data ~close ~flush ~ping ~pong
read ~data ~flush ~ping ~pong ~close ~exn
| Some (last_buffer, last_offset, last_length) ->
last_chunk := Some (buffer, off, len);
let binary = binary = `Binary in
Expand All @@ -178,7 +178,7 @@ let websocket_handler response socket =
current_payload := None;
match !last_chunk with
| None ->
read ~data ~close ~flush ~ping ~pong
read ~data ~flush ~ping ~pong ~close ~exn
| Some (last_buffer, last_offset, last_length) ->
last_chunk := None;
let binary = binary = `Binary in
Expand All @@ -204,7 +204,9 @@ let websocket_handler response socket =
end
in

let reader = Stream.reader ~read ~close in
let abort _exn = close 1005 in

let reader = Stream.reader ~read ~close ~abort in
Stream.forward reader (Message.client_stream response);

let rec outgoing_loop () =
Expand All @@ -226,7 +228,6 @@ let websocket_handler response socket =
else
outgoing_loop ()
end)
~close
~flush:(fun () -> flush ~close outgoing_loop)
~ping:(fun _buffer _offset length ->
if length > 125 then
Expand Down Expand Up @@ -256,6 +257,8 @@ let websocket_handler response socket =
Websocketaf.Wsd.send_pong socket;
outgoing_loop ()
end)
~close
~exn:abort
in
outgoing_loop ();

Expand Down Expand Up @@ -305,7 +308,7 @@ let wrap_handler
(* TODO Should the stream be auto-closed? It doesn't even have a closed
state. The whole thing is just a wrapper for whatever the http/af
behavior is. *)
let read ~data ~close ~flush:_ ~ping:_ ~pong:_ =
let read ~data ~flush:_ ~ping:_ ~pong:_ ~close ~exn:_ =
Httpaf.Body.Reader.schedule_read
body
~on_eof:(fun () -> close 1000)
Expand All @@ -314,7 +317,7 @@ let wrap_handler
let close _code =
Httpaf.Body.Reader.close body in
let body =
Stream.reader ~read ~close in
Stream.reader ~read ~close ~abort:close in
let body =
Stream.stream body Stream.no_writer in

Expand Down Expand Up @@ -435,7 +438,7 @@ let wrap_handler_h2

let body =
H2.Reqd.request_body conn in
let read ~data ~close ~flush:_ ~ping:_ ~pong:_ =
let read ~data ~flush:_ ~ping:_ ~pong:_ ~close ~exn:_ =
H2.Body.schedule_read
body
~on_eof:(fun () -> close 1000)
Expand All @@ -444,7 +447,7 @@ let wrap_handler_h2
let close _code =
H2.Body.close_reader body in
let body =
Stream.reader ~read ~close in
Stream.reader ~read ~close ~abort:close in
let body =
Stream.stream body Stream.no_writer in

Expand Down
2 changes: 2 additions & 0 deletions src/pure/message.ml
Expand Up @@ -208,6 +208,7 @@ let write ?kind message chunk =
message.server_stream
buffer 0 length binary true
~close:(fun _code -> Lwt.wakeup_later_exn resolver End_of_file)
~exn:(fun exn -> Lwt.wakeup_later_exn resolver exn)
(fun () -> Lwt.wakeup_later resolver ());
promise

Expand All @@ -218,6 +219,7 @@ let flush message =
Stream.flush
message.server_stream
~close:(fun _code -> Lwt.wakeup_later_exn resolver End_of_file)
~exn:(fun exn -> Lwt.wakeup_later_exn resolver exn)
(Lwt.wakeup_later resolver);
promise

Expand Down

0 comments on commit 5cd57e1

Please sign in to comment.