Permalink
Browse files

Merge pull request #97 from avsm/fix-pipelining

The `Request` and `Response` module types now explictly signal `Eof`
  • Loading branch information...
2 parents 01a7da6 + 83a8034 commit 824b95b819e81eb9107bb5a03d751732cf6bceb7 @avsm committed Mar 1, 2014
Showing with 71 additions and 61 deletions.
  1. +2 −0 CHANGES
  2. +1 −2 Makefile
  3. +17 −12 async/cohttp_async.ml
  4. +12 −9 cohttp/request.ml
  5. +1 −1 cohttp/request.mli
  6. +11 −11 cohttp/response.ml
  7. +1 −1 cohttp/response.mli
  8. +16 −16 lib_test/test_parser.ml
  9. +10 −9 lwt/cohttp_lwt.ml
View
@@ -1,4 +1,6 @@
0.10.0 (trunk):
+* Interface change: The `Request` and `Response` module types now explictly
+ signal `Eof` and `Invalid` (for errors), to help the backend distinguish them.
* Interface change: Unify HTTP body handling across backends into a `Cohttp.Body`
module. This is extended by Async/Lwt implementations with their specific
ways of handling bodies (Pipes for Async, or Lwt_stream for Lwt).
View
@@ -7,8 +7,7 @@ NAME=cohttp
LWT ?= $(shell if ocamlfind query lwt >/dev/null 2>&1; then echo --enable-lwt; fi)
LWT_UNIX ?= $(shell if ocamlfind query lwt.ssl >/dev/null 2>&1; then echo --enable-lwt-unix; fi)
ASYNC ?= $(shell if ocamlfind query async >/dev/null 2>&1; then echo --enable-async; fi)
-# disabled by default as they hang at the moment for Async
-# NETTESTS ?= --enable-nettests
+NETTESTS ?= --enable-nettests
setup.bin: setup.ml
ocamlopt.opt -o $@ $< || ocamlopt -o $@ $< || ocamlc -o $@ $<
View
@@ -210,11 +210,13 @@ module Client = struct
Request.write_footer req oc
>>= fun () ->
Response.read ic
- >>= fun res ->
- let res = Option.value_exn ~message:"Error reading HTTP response" res in
- (* Build a response pipe for the body *)
- let rd = pipe_of_body (Response.read_body_chunk res) ic oc in
- return (res, `Pipe rd)
+ >>= function
+ | `Eof -> raise (Failure "Connection closed by remote host")
+ | `Invalid reason -> raise (Failure reason)
+ | `Ok res ->
+ (* Build a response pipe for the body *)
+ let rd = pipe_of_body (Response.read_body_chunk res) ic oc in
+ return (res, `Pipe rd)
let get ?interrupt ?headers uri =
call ?interrupt ?headers ~chunked:false `GET uri
@@ -261,13 +263,16 @@ module Server = struct
`Pipe (pipe_of_body read_chunk rd wr)
let handle_client handle_request sock rd wr =
- let requests_pipe = Reader.read_all rd (fun rd ->
- Request.read rd >>| fun req ->
- let req = Option.value_exn ~message:"Error reading HTTP request" req in
- let body = read_body req rd wr in
- if not (Request.is_keep_alive req)
- then don't_wait_for (Reader.close rd);
- `Ok (req, body)
+ let requests_pipe =
+ Reader.read_all rd (fun rd ->
+ Request.read rd
+ >>| function
+ | `Eof | `Invalid _ -> `Eof
+ | `Ok req ->
+ let body = read_body req rd wr in
+ if not (Request.is_keep_alive req)
+ then don't_wait_for (Reader.close rd);
+ `Ok (req, body)
) in
Pipe.iter requests_pipe ~f:(fun (req, body) ->
handle_request ~body sock req >>= fun (res, body) ->
View
@@ -63,7 +63,7 @@ let make_for_client ?headers ?(chunked=true) ?(body_length=0) meth uri =
module type S = sig
module IO : IO.S
- val read : IO.ic -> t option IO.t
+ val read : IO.ic -> [ `Eof | `Invalid of string | `Ok of t ] IO.t
val has_body : t -> bool
val read_body_chunk :
t -> IO.ic -> Transfer.chunk IO.t
@@ -90,29 +90,32 @@ module Make(IO : IO.S) = struct
let parse_request_fst_line ic =
let open Code in
read_line ic >>= function
- |Some request_line -> begin
+ | Some request_line -> begin
match Re_str.split_delim pieces_sep request_line with
| [ meth_raw; path; http_ver_raw ] -> begin
match method_of_string meth_raw, version_of_string http_ver_raw with
- |Some m, Some v -> return (Some (m, path, v))
- |_ -> return None
+ | Some m, Some v -> return (`Ok (m, path, v))
+ | None, Some v -> return (`Invalid ("Malformed request method: " ^ meth_raw))
+ | Some v, None -> return (`Invalid ("Malformed request HTTP version: " ^ http_ver_raw))
+ | None, None -> return (`Invalid ("Malformed request method and version: " ^ request_line))
end
- | _ -> return None
+ | _ -> return (`Invalid ("Malformed request header: " ^ request_line))
end
- |None -> return None
+ | None -> return `Eof
let read ic =
parse_request_fst_line ic >>= function
- |None -> return None
- |Some (meth, path, version) ->
+ | `Eof -> return `Eof
+ | `Invalid reason as r -> return r
+ | `Ok (meth, path, version) ->
Header_IO.parse ic >>= fun headers ->
let uri = match (Header.get headers "host") with
| None -> Uri.of_string path
| Some h ->
(* XXX: Could be https:// as well... *)
Uri.of_string ("http://" ^ h ^ path) in
let encoding = Header.get_transfer_encoding headers in
- return (Some { headers; meth; uri; version; encoding })
+ return (`Ok { headers; meth; uri; version; encoding })
let has_body req = Transfer.has_body req.encoding
let read_body_chunk req ic = Transfer_IO.read req.encoding ic
View
@@ -49,7 +49,7 @@ val make_for_client:
module type S = sig
module IO : IO.S
- val read : IO.ic -> t option IO.t
+ val read : IO.ic -> [ `Eof | `Invalid of string | `Ok of t ] IO.t
val has_body : t -> bool
val read_body_chunk : t -> IO.ic -> Transfer.chunk IO.t
View
@@ -32,10 +32,9 @@ let make ?(version=`HTTP_1_1) ?(status=`OK) ?(flush=false) ?(encoding=Transfer.C
module type S = sig
module IO : IO.S
- val read : IO.ic -> t option IO.t
+ val read : IO.ic -> [ `Eof | `Invalid of string | `Ok of t ] IO.t
val has_body : t -> bool
- val read_body_chunk :
- t -> IO.ic -> Transfer.chunk IO.t
+ val read_body_chunk : t -> IO.ic -> Transfer.chunk IO.t
val is_form: t -> bool
val read_form : t -> IO.ic -> (string * string list) list IO.t
@@ -59,25 +58,26 @@ module Make(IO : IO.S) = struct
let parse_response_fst_line ic =
let open Code in
read_line ic >>= function
- |Some response_line -> begin
+ | Some response_line -> begin
match Re_str.split_delim pieces_sep response_line with
| version_raw :: code_raw :: _ -> begin
match version_of_string version_raw with
- |Some v -> return (Some (v, (status_of_code (int_of_string code_raw))))
- |_ -> return None
+ | Some v -> return (`Ok (v, (status_of_code (int_of_string code_raw))))
+ | None -> return (`Invalid ("Malformed response version: " ^ version_raw))
end
- | _ -> return None
+ | _ -> return (`Invalid ("Malformed response first line: " ^ response_line))
end
- |None -> return None
+ | None -> return `Eof
let read ic =
parse_response_fst_line ic >>= function
- |None -> return None
- |Some (version, status) ->
+ | `Eof -> return `Eof
+ | `Invalid reason as r -> return r
+ | `Ok (version, status) ->
Header_IO.parse ic >>= fun headers ->
let encoding = Header.get_transfer_encoding headers in
let flush = false in
- return (Some { encoding; headers; version; status; flush })
+ return (`Ok { encoding; headers; version; status; flush })
let has_body {encoding} = Transfer.has_body encoding
let read_body_chunk {encoding} ic = Transfer_IO.read encoding ic
View
@@ -46,7 +46,7 @@ val make :
module type S = sig
module IO : IO.S
- val read : IO.ic -> t option IO.t
+ val read : IO.ic -> [ `Eof | `Invalid of string | `Ok of t ] IO.t
val has_body : t -> bool
val read_body_chunk : t -> IO.ic -> Transfer.chunk IO.t
@@ -106,20 +106,20 @@ let basic_req_parse () =
let ic = ic_of_buffer (Lwt_bytes.of_string basic_req) in
CU.Request.read ic >>=
function
- |Some req ->
+ | `Ok req ->
assert_equal (Cohttp.Request.version req) `HTTP_1_1;
assert_equal (CU.Request.meth req) `GET;
assert_equal (Uri.to_string (CU.Request.uri req)) "http://www.example.com/index.html";
return ()
- |None -> assert false
+ | _ -> assert false
let basic_res_parse res () =
let open Cohttp in
let open Cohttp_lwt_unix in
let ic = ic_of_buffer (Lwt_bytes.of_string res) in
Response.read ic >>=
function
- |Some res ->
+ | `Ok res ->
(* Parse first line *)
assert_equal (Response.version res) `HTTP_1_1;
assert_equal (Response.status res) `OK;
@@ -129,25 +129,24 @@ let basic_res_parse res () =
assert_equal (Header.get headers "content-type")
(Some "text/html; charset=UTF-8");
return ()
- |None -> assert false
+ | _ -> assert false
let req_parse () =
let open Cohttp_lwt_unix in
let ic = ic_of_buffer (Lwt_bytes.of_string basic_req) in
Request.read ic >>= function
- |None -> assert false
- |Some req ->
+ | `Ok req ->
assert_equal `GET (Request.meth req);
assert_equal "/index.html" ((Uri.path (Request.uri req)));
assert_equal `HTTP_1_1 (Request.version req);
return ()
+ | _ -> assert false
let post_form_parse () =
let open Cohttp_lwt_unix in
let ic = ic_of_buffer (Lwt_bytes.of_string post_req) in
Request.read ic >>= function
- |None -> assert false
- |Some req ->
+ | `Ok req ->
assert_equal true (Request.is_form req);
Request.read_form req ic >>= fun params ->
assert_equal ["Cosby"] (List.assoc "home" params);
@@ -156,62 +155,63 @@ let post_form_parse () =
(* multiple requests should still work *)
assert_equal ["Cosby"] (List.assoc "home" params);
return ()
+ | _ -> assert false
let post_data_parse () =
let open Cohttp in
let open Cohttp_lwt_unix in
let ic = ic_of_buffer (Lwt_bytes.of_string post_data_req) in
Request.read ic >>= function
- |None -> assert false
- |Some req ->
+ | `Ok req ->
Request.read_body_chunk req ic >>= fun body ->
assert_equal (Transfer.Final_chunk "home=Cosby&favorite+flavor=flies") body;
(* A subsequent request for the body will have consumed it, therefore None *)
Request.read_body_chunk req ic >>= fun body ->
assert_equal Transfer.Done body;
return ()
+ | _ -> assert false
let post_chunked_parse () =
let open Cohttp in
let open Cohttp_lwt_unix in
let ic = ic_of_buffer (Lwt_bytes.of_string post_chunked_req) in
Request.read ic >>= function
- |None -> assert false
- |Some req ->
+ | `Ok req ->
assert_equal (Transfer.encoding_to_string (Request.encoding req)) "chunked";
Request.read_body_chunk req ic >>= fun chunk ->
assert_equal chunk (Transfer.Chunk "abcdefghijklmnopqrstuvwxyz");
Request.read_body_chunk req ic >>= fun chunk ->
assert_equal chunk (Transfer.Chunk "1234567890abcdef");
return ()
+ | _ -> assert false
let res_content_parse () =
let open Cohttp in
let open Cohttp_lwt_unix in
let ic = ic_of_buffer (Lwt_bytes.of_string basic_res_content) in
Response.read ic >>= function
- |None -> assert false
- |Some res ->
+ | `Ok res ->
assert_equal `HTTP_1_1 (Response.version res);
assert_equal `OK (Response.status res);
Response.read_body_chunk res ic >>= fun body ->
assert_equal (Transfer.Final_chunk "home=Cosby&favorite+flavor=flies") body;
return ()
+ | _ -> assert false
let res_chunked_parse () =
let open Cohttp in
let open Cohttp_lwt_unix in
let ic = ic_of_buffer (Lwt_bytes.of_string chunked_res) in
Response.read ic >>= function
- |None -> assert false
- |Some res ->
+ | `Ok res ->
assert_equal `HTTP_1_1 (Response.version res);
assert_equal `OK (Response.status res);
Response.read_body_chunk res ic >>= fun chunk ->
assert_equal chunk (Transfer.Chunk "abcdefghijklmnopqrstuvwxyz");
Response.read_body_chunk res ic >>= fun chunk ->
assert_equal chunk (Transfer.Chunk "1234567890abcdef");
return ()
+ | _ -> assert false
(* Extract the substring of the byte buffer that has been written to *)
let get_substring oc buf =
View
@@ -113,11 +113,12 @@ module Make_client
module Response = Response
let read_response ?closefn ic oc =
- match_lwt Response.read ic with
- |None -> Lwt.fail (Failure "Failed to read response")
- |Some res -> begin
+ Response.read ic >>= function
+ | `Invalid reason -> Lwt.fail (Failure ("Failed to read response: " ^ reason))
+ | `Eof -> Lwt.fail (Failure "Client connection was closed")
+ | `Ok res -> begin
match Response.has_body res with
- |true ->
+ | true ->
let stream = Cohttp_lwt_body.create_stream (Response.read_body_chunk res) ic in
(match closefn with
|Some fn ->
@@ -128,7 +129,7 @@ module Make_client
);
let body = Cohttp_lwt_body.of_stream stream in
return (res, body)
- |false ->
+ | false ->
(match closefn with |Some fn -> fn () |None -> ());
return (res, `Empty)
end
@@ -315,12 +316,12 @@ module Make_server(IO:Cohttp.IO.S with type 'a t = 'a Lwt.t)
if !early_close
then return None
else
- lwt () = Lwt_mutex.lock read_m in
- match_lwt Request.read ic with
- | None ->
+ Lwt_mutex.lock read_m >>= fun () ->
+ Request.read ic >>= function
+ | `Eof | `Invalid _ -> (* TODO: request logger for invalid req *)
Lwt_mutex.unlock read_m;
return None
- | Some req -> begin
+ | `Ok req -> begin
early_close := not (Request.is_keep_alive req);
(* Ensure the input body has been fully read before reading again *)
match Request.has_body req with

0 comments on commit 824b95b

Please sign in to comment.