Skip to content

Commit

Permalink
Add chunked transfer encoding support and rework the body reading API
Browse files Browse the repository at this point in the history
Introduces 3 low level functions and updates the existing higher
levels functions. The new primitives are has_body/1, body_length/1
and stream_body/1. In addition to that, a helper function
init_stream/4 has been added.

Streaming a body implies to decode the Transfer-Encoding and
Content-Encoding used for the body. By default, Cowboy will try
to figure out what was used and decode them properly. You can
override this if you want to disable this behavior or simply
support more encodings by calling the init_stream/4 function
before you start streaming the body.
  • Loading branch information
Loïc Hoguin committed Apr 1, 2012
1 parent ba75e8b commit 95e05d8
Show file tree
Hide file tree
Showing 7 changed files with 310 additions and 38 deletions.
4 changes: 2 additions & 2 deletions include/http.hrl
Expand Up @@ -41,8 +41,8 @@
meta = [] :: [{atom(), any()}],

%% Request body.
body_state = waiting :: waiting | done |
{multipart, non_neg_integer(), fun()},
body_state = waiting :: waiting | done | {stream, fun(), any(), fun()}
| {multipart, non_neg_integer(), fun()},
buffer = <<>> :: binary(),

%% Response.
Expand Down
48 changes: 48 additions & 0 deletions src/cowboy_http.erl
Expand Up @@ -22,6 +22,9 @@
http_date/1, rfc1123_date/1, rfc850_date/1, asctime_date/1,
whitespace/2, digits/1, token/2, token_ci/2, quoted_string/2]).

%% Decoding.
-export([te_chunked/2, te_identity/2, ce_identity/1]).

%% Interpretation.
-export([connection_to_atom/1, urldecode/1, urldecode/2, urlencode/1,
urlencode/2, x_www_form_urlencoded/2]).
Expand Down Expand Up @@ -708,6 +711,51 @@ qvalue(<< C, Rest/binary >>, Fun, Q, M)
qvalue(Data, Fun, Q, _M) ->
Fun(Data, Q).

%% Decoding.

%% @doc Decode a stream of chunks.
-spec te_chunked(binary(), {non_neg_integer(), non_neg_integer()})
-> more | {ok, binary(), {non_neg_integer(), non_neg_integer()}}
| {ok, binary(), binary(), {non_neg_integer(), non_neg_integer()}}
| {done, non_neg_integer(), binary()} | {error, badarg}.
te_chunked(<<>>, _) ->
more;
te_chunked(<< "0\r\n\r\n", Rest/binary >>, {0, Streamed}) ->
{done, Streamed, Rest};
te_chunked(Data, {0, Streamed}) ->
%% @todo We are expecting an hex size, not a general token.
token(Data,
fun (Rest, _) when byte_size(Rest) < 4 ->
more;
(<< "\r\n", Rest/binary >>, BinLen) ->
Len = list_to_integer(binary_to_list(BinLen), 16),
te_chunked(Rest, {Len, Streamed});
(_, _) ->
{error, badarg}
end);
te_chunked(Data, {ChunkRem, Streamed}) when byte_size(Data) >= ChunkRem + 2 ->
<< Chunk:ChunkRem/binary, "\r\n", Rest/binary >> = Data,
{ok, Chunk, Rest, {0, Streamed + byte_size(Chunk)}};
te_chunked(Data, {ChunkRem, Streamed}) ->
Size = byte_size(Data),
{ok, Data, {ChunkRem - Size, Streamed + Size}}.

%% @doc Decode an identity stream.
-spec te_identity(binary(), {non_neg_integer(), non_neg_integer()})
-> {ok, binary(), {non_neg_integer(), non_neg_integer()}}
| {done, binary(), non_neg_integer(), binary()}.
te_identity(Data, {Streamed, Total})
when Streamed + byte_size(Data) < Total ->
{ok, Data, {Streamed + byte_size(Data), Total}};
te_identity(Data, {Streamed, Total}) ->
Size = Total - Streamed,
<< Data2:Size/binary, Rest/binary >> = Data,
{done, Data2, Total, Rest}.

%% @doc Decode an identity content.
-spec ce_identity(binary()) -> {ok, binary()}.
ce_identity(Data) ->
{ok, Data}.

%% Interpretation.

Expand Down
7 changes: 3 additions & 4 deletions src/cowboy_http_protocol.erl
Expand Up @@ -396,10 +396,9 @@ next_request(Req=#http_req{connection=Conn},
ensure_body_processed(#http_req{body_state=done, buffer=Buffer}) ->
{ok, Buffer};
ensure_body_processed(Req=#http_req{body_state=waiting}) ->
case cowboy_http_req:body(Req) of
{error, badarg} -> {ok, Req#http_req.buffer}; %% No body.
{error, _Reason} -> {close, <<>>};
{ok, _, Req2} -> {ok, Req2#http_req.buffer}
case cowboy_http_req:skip_body(Req) of
{ok, Req2} -> {ok, Req2#http_req.buffer};
{error, _Reason} -> {close, <<>>}
end;
ensure_body_processed(Req=#http_req{body_state={multipart, _, _}}) ->
{ok, Req2} = cowboy_http_req:multipart_skip(Req),
Expand Down
200 changes: 173 additions & 27 deletions src/cowboy_http_req.erl
Expand Up @@ -34,7 +34,8 @@
]). %% Request API.

-export([
body/1, body/2, body_qs/1,
has_body/1, body_length/1, init_stream/4, stream_body/1,
skip_body/1, body/1, body/2, body_qs/1,
multipart_data/1, multipart_skip/1
]). %% Request Body API.

Expand Down Expand Up @@ -231,6 +232,7 @@ parse_header(Name, Req=#http_req{p_headers=PHeaders}) ->
%% @doc Default values for semantic header parsing.
-spec parse_header_default(cowboy_http:header()) -> any().
parse_header_default('Connection') -> [];
parse_header_default('Transfer-Encoding') -> [<<"identity">>];
parse_header_default(_Name) -> undefined.

%% @doc Semantically parse headers.
Expand Down Expand Up @@ -290,6 +292,12 @@ parse_header(Name, Req, Default)
fun (Value) ->
cowboy_http:http_date(Value)
end);
%% @todo Extension parameters.
parse_header(Name, Req, Default) when Name =:= 'Transfer-Encoding' ->
parse_header(Name, Req, Default,
fun (Value) ->
cowboy_http:nonempty_list(Value, fun cowboy_http:token_ci/2)
end);
parse_header(Name, Req, Default) when Name =:= 'Upgrade' ->
parse_header(Name, Req, Default,
fun (Value) ->
Expand All @@ -299,6 +307,7 @@ parse_header(Name, Req, Default) ->
{Value, Req2} = header(Name, Req, Default),
{undefined, Value, Req2}.

%% @todo This doesn't look in the cache.
parse_header(Name, Req=#http_req{p_headers=PHeaders}, Default, Fun) ->
case header(Name, Req) of
{undefined, Req2} ->
Expand Down Expand Up @@ -368,42 +377,179 @@ meta(Name, Req, Default) ->

%% Request Body API.

%% @doc Return the full body sent with the request, or <em>{error, badarg}</em>
%% if no <em>Content-Length</em> is available.
%% @todo We probably want to allow a max length.
%% @todo Add multipart support to this function.
%% @doc Return whether the request message has a body.
-spec has_body(#http_req{}) -> {boolean(), #http_req{}}.
has_body(Req) ->
Has = lists:keymember('Content-Length', 1, Req#http_req.headers) orelse
lists:keymember('Transfer-Encoding', 1, Req#http_req.headers),
{Has, Req}.

%% @doc Return the request message body length, if known.
%%
%% The length may not be known if Transfer-Encoding is not identity,
%% and the body hasn't been read at the time of the call.
-spec body_length(#http_req{}) -> {undefined | non_neg_integer(), #http_req{}}.
body_length(Req) ->
case lists:keymember('Transfer-Encoding', 1, Req#http_req.headers) of
true -> {undefined, Req};
false -> parse_header('Content-Length', Req, 0)
end.

%% @doc Initialize body streaming and set custom decoding functions.
%%
%% Calling this function is optional. It should only be used if you
%% need to override the default behavior of Cowboy. Otherwise you
%% should call stream_body/1 directly.
%%
%% Two decodings happen. First a decoding function is applied to the
%% transferred data, and then another is applied to the actual content.
%%
%% Transfer encoding is generally used for chunked bodies. The decoding
%% function uses a state to keep track of how much it has read, which is
%% also initialized through this function.
%%
%% Content encoding is generally used for compression.
%%
%% Standard encodings can be found in cowboy_http.
-spec init_stream(fun(), any(), fun(), #http_req{}) -> {ok, #http_req{}}.
init_stream(TransferDecode, TransferState, ContentDecode, Req) ->
{ok, Req#http_req{body_state=
{stream, TransferDecode, TransferState, ContentDecode}}}.

%% @doc Stream the request's body.
%%
%% This is the most low level function to read the request body.
%%
%% In most cases, if they weren't defined before using stream_body/4,
%% this function will guess which transfer and content encodings were
%% used for building the request body, and configure the decoding
%% functions that will be used when streaming.
%%
%% It then starts streaming the body, returning {ok, Data, Req}
%% for each streamed part, and {done, Req} when it's finished streaming.
-spec stream_body(#http_req{}) -> {ok, binary(), #http_req{}}
| {done, #http_req{}} | {error, atom()}.
stream_body(Req=#http_req{body_state=waiting}) ->
case parse_header('Transfer-Encoding', Req) of
{[<<"chunked">>], Req2} ->
stream_body(Req2#http_req{body_state=
{stream, fun cowboy_http:te_chunked/2, {0, 0},
fun cowboy_http:ce_identity/1}});
{[<<"identity">>], Req2} ->
{Length, Req3} = body_length(Req2),
case Length of
0 ->
{done, Req3#http_req{body_state=done}};
Length ->
stream_body(Req3#http_req{body_state=
{stream, fun cowboy_http:te_identity/2, {0, Length},
fun cowboy_http:ce_identity/1}})
end
end;
stream_body(Req=#http_req{buffer=Buffer, body_state={stream, _, _, _}})
when Buffer =/= <<>> ->
transfer_decode(Buffer, Req#http_req{buffer= <<>>});
stream_body(Req=#http_req{body_state={stream, _, _, _}}) ->
stream_body_recv(Req);
stream_body(Req=#http_req{body_state=done}) ->
{done, Req}.

-spec stream_body_recv(#http_req{})
-> {ok, binary(), #http_req{}} | {error, atom()}.
stream_body_recv(Req=#http_req{transport=Transport, socket=Socket}) ->
%% @todo Allow configuring the timeout.
case Transport:recv(Socket, 0, 5000) of
{ok, Data} -> transfer_decode(Data, Req);
{error, Reason} -> {error, Reason}
end.

-spec transfer_decode(binary(), #http_req{})
-> {ok, binary(), #http_req{}} | {error, atom()}.
transfer_decode(Data, Req=#http_req{
body_state={stream, TransferDecode, TransferState, ContentDecode}}) ->
case TransferDecode(Data, TransferState) of
{ok, Data2, TransferState2} ->
content_decode(ContentDecode, Data2, Req#http_req{body_state=
{stream, TransferDecode, TransferState2, ContentDecode}});
{ok, Data2, Rest, TransferState2} ->
content_decode(ContentDecode, Data2, Req#http_req{
buffer=Rest, body_state=
{stream, TransferDecode, TransferState2, ContentDecode}});
%% @todo {header(s) for chunked
more ->
stream_body_recv(Req);
{done, Length, Rest} ->
Req2 = transfer_decode_done(Length, Rest, Req),
{done, Req2};
{done, Data2, Length, Rest} ->
Req2 = transfer_decode_done(Length, Rest, Req),
content_decode(ContentDecode, Data2, Req2);
{error, Reason} ->
{error, Reason}
end.

-spec transfer_decode_done(non_neg_integer(), binary(), #http_req{})
-> #http_req{}.
transfer_decode_done(Length, Rest, Req=#http_req{
headers=Headers, p_headers=PHeaders}) ->
Headers2 = lists:keystore('Content-Length', 1, Headers,
{'Content-Length', list_to_binary(integer_to_list(Length))}),
%% At this point we just assume TEs were all decoded.
Headers3 = lists:keydelete('Transfer-Encoding', 1, Headers2),
PHeaders2 = lists:keystore('Content-Length', 1, PHeaders,
{'Content-Length', Length}),
PHeaders3 = lists:keydelete('Transfer-Encoding', 1, PHeaders2),
Req#http_req{buffer=Rest, body_state=done,
headers=Headers3, p_headers=PHeaders3}.

%% @todo Probably needs a Rest.
-spec content_decode(fun(), binary(), #http_req{})
-> {ok, binary(), #http_req{}} | {error, atom()}.
content_decode(ContentDecode, Data, Req) ->
case ContentDecode(Data) of
{ok, Data2} -> {ok, Data2, Req};
{error, Reason} -> {error, Reason}
end.

%% @doc Return the full body sent with the request.
-spec body(#http_req{}) -> {ok, binary(), #http_req{}} | {error, atom()}.
body(Req) ->
{Length, Req2} = cowboy_http_req:parse_header('Content-Length', Req),
case Length of
undefined -> {error, badarg};
{error, badarg} -> {error, badarg};
_Any ->
body(Length, Req2)
end.
read_body(infinity, Req, <<>>).

%% @doc Return <em>Length</em> bytes of the request body.
%% @doc Return the full body sent with the request as long as the body
%% length doesn't go over MaxLength.
%%
%% You probably shouldn't be calling this function directly, as it expects the
%% <em>Length</em> argument to be the full size of the body, and will consider
%% the body to be fully read from the socket.
%% @todo We probably want to configure the timeout.
-spec body(non_neg_integer(), #http_req{})
%% This is most useful to quickly be able to get the full body while
%% avoiding filling your memory with huge request bodies when you're
%% not expecting it.
-spec body(non_neg_integer() | infinity, #http_req{})
-> {ok, binary(), #http_req{}} | {error, atom()}.
body(Length, Req=#http_req{body_state=waiting, buffer=Buffer})
when is_integer(Length) andalso Length =< byte_size(Buffer) ->
<< Body:Length/binary, Rest/bits >> = Buffer,
{ok, Body, Req#http_req{body_state=done, buffer=Rest}};
body(Length, Req=#http_req{socket=Socket, transport=Transport,
body_state=waiting, buffer=Buffer}) ->
case Transport:recv(Socket, Length - byte_size(Buffer), 5000) of
{ok, Body} -> {ok, << Buffer/binary, Body/binary >>,
Req#http_req{body_state=done, buffer= <<>>}};
body(MaxLength, Req) ->
read_body(MaxLength, Req, <<>>).

-spec read_body(non_neg_integer() | infinity, #http_req{}, binary())
-> {ok, binary(), #http_req{}} | {error, atom()}.
read_body(MaxLength, Req, Acc) when MaxLength > byte_size(Acc) ->
case stream_body(Req) of
{ok, Data, Req2} ->
read_body(MaxLength, Req2, << Acc/binary, Data/binary >>);
{done, Req2} ->
{ok, Acc, Req2};
{error, Reason} ->
{error, Reason}
end.

-spec skip_body(#http_req{}) -> {ok, #http_req{}} | {error, atom()}.
skip_body(Req) ->
case stream_body(Req) of
{ok, _, Req2} -> skip_body(Req2);
{done, Req2} -> {ok, Req2};
{error, Reason} -> {error, Reason}
end.

%% @doc Return the full body sent with the reqest, parsed as an
%% application/x-www-form-urlencoded string. Essentially a POST query string.
%% @todo We need an option to limit the size of the body for QS too.
-spec body_qs(#http_req{}) -> {list({binary(), binary() | true}), #http_req{}}.
body_qs(Req=#http_req{urldecode={URLDecFun, URLDecArg}}) ->
{ok, Body, Req2} = body(Req),
Expand Down
12 changes: 9 additions & 3 deletions src/cowboy_http_websocket.erl
Expand Up @@ -175,12 +175,18 @@ websocket_handshake(State=#state{version=0, origin=Origin,
%% We replied with a proper response. Proxies should be happy enough,
%% we can now read the 8 last bytes of the challenge keys and send
%% the challenge response directly to the socket.
case cowboy_http_req:body(8, Req2) of
{ok, Key3, Req3} ->
%%
%% We use a trick here to read exactly 8 bytes of the body regardless
%% of what's in the buffer.
{ok, Req3} = cowboy_http_req:init_stream(
fun cowboy_http:te_identity/2, {0, 8},
fun cowboy_http:ce_identity/1, Req2),
case cowboy_http_req:body(Req3) of
{ok, Key3, Req4} ->
Challenge = hixie76_challenge(Key1, Key2, Key3),
Transport:send(Socket, Challenge),
handler_before_loop(State#state{messages=Transport:messages()},
Req3, HandlerState, <<>>);
Req4, HandlerState, <<>>);
_Any ->
closed %% If an error happened reading the body, stop there.
end;
Expand Down

0 comments on commit 95e05d8

Please sign in to comment.