diff --git a/include/http.hrl b/include/http.hrl index 9ba3787d2..21d837bfc 100644 --- a/include/http.hrl +++ b/include/http.hrl @@ -41,8 +41,8 @@ meta = [] :: [{atom(), any()}], %% Request body. - body_state = waiting :: waiting | done | - {multipart, non_neg_integer(), fun()}, + body_state = waiting :: waiting | done | {stream, fun(), any(), fun()} + | {multipart, non_neg_integer(), fun()}, buffer = <<>> :: binary(), %% Response. diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl index 9d727f3f9..2f4f982f1 100644 --- a/src/cowboy_http.erl +++ b/src/cowboy_http.erl @@ -22,6 +22,9 @@ http_date/1, rfc1123_date/1, rfc850_date/1, asctime_date/1, whitespace/2, digits/1, token/2, token_ci/2, quoted_string/2]). +%% Decoding. +-export([te_chunked/2, te_identity/2, ce_identity/1]). + %% Interpretation. -export([connection_to_atom/1, urldecode/1, urldecode/2, urlencode/1, urlencode/2, x_www_form_urlencoded/2]). @@ -708,6 +711,51 @@ qvalue(<< C, Rest/binary >>, Fun, Q, M) qvalue(Data, Fun, Q, _M) -> Fun(Data, Q). +%% Decoding. + +%% @doc Decode a stream of chunks. +-spec te_chunked(binary(), {non_neg_integer(), non_neg_integer()}) + -> more | {ok, binary(), {non_neg_integer(), non_neg_integer()}} + | {ok, binary(), binary(), {non_neg_integer(), non_neg_integer()}} + | {done, non_neg_integer(), binary()} | {error, badarg}. +te_chunked(<<>>, _) -> + more; +te_chunked(<< "0\r\n\r\n", Rest/binary >>, {0, Streamed}) -> + {done, Streamed, Rest}; +te_chunked(Data, {0, Streamed}) -> + %% @todo We are expecting an hex size, not a general token. + token(Data, + fun (Rest, _) when byte_size(Rest) < 4 -> + more; + (<< "\r\n", Rest/binary >>, BinLen) -> + Len = list_to_integer(binary_to_list(BinLen), 16), + te_chunked(Rest, {Len, Streamed}); + (_, _) -> + {error, badarg} + end); +te_chunked(Data, {ChunkRem, Streamed}) when byte_size(Data) >= ChunkRem + 2 -> + << Chunk:ChunkRem/binary, "\r\n", Rest/binary >> = Data, + {ok, Chunk, Rest, {0, Streamed + byte_size(Chunk)}}; +te_chunked(Data, {ChunkRem, Streamed}) -> + Size = byte_size(Data), + {ok, Data, {ChunkRem - Size, Streamed + Size}}. + +%% @doc Decode an identity stream. +-spec te_identity(binary(), {non_neg_integer(), non_neg_integer()}) + -> {ok, binary(), {non_neg_integer(), non_neg_integer()}} + | {done, binary(), non_neg_integer(), binary()}. +te_identity(Data, {Streamed, Total}) + when Streamed + byte_size(Data) < Total -> + {ok, Data, {Streamed + byte_size(Data), Total}}; +te_identity(Data, {Streamed, Total}) -> + Size = Total - Streamed, + << Data2:Size/binary, Rest/binary >> = Data, + {done, Data2, Total, Rest}. + +%% @doc Decode an identity content. +-spec ce_identity(binary()) -> {ok, binary()}. +ce_identity(Data) -> + {ok, Data}. %% Interpretation. diff --git a/src/cowboy_http_protocol.erl b/src/cowboy_http_protocol.erl index ecc7286d3..04abfbc64 100644 --- a/src/cowboy_http_protocol.erl +++ b/src/cowboy_http_protocol.erl @@ -396,10 +396,9 @@ next_request(Req=#http_req{connection=Conn}, ensure_body_processed(#http_req{body_state=done, buffer=Buffer}) -> {ok, Buffer}; ensure_body_processed(Req=#http_req{body_state=waiting}) -> - case cowboy_http_req:body(Req) of - {error, badarg} -> {ok, Req#http_req.buffer}; %% No body. - {error, _Reason} -> {close, <<>>}; - {ok, _, Req2} -> {ok, Req2#http_req.buffer} + case cowboy_http_req:skip_body(Req) of + {ok, Req2} -> {ok, Req2#http_req.buffer}; + {error, _Reason} -> {close, <<>>} end; ensure_body_processed(Req=#http_req{body_state={multipart, _, _}}) -> {ok, Req2} = cowboy_http_req:multipart_skip(Req), diff --git a/src/cowboy_http_req.erl b/src/cowboy_http_req.erl index c352bbf62..a6e883468 100644 --- a/src/cowboy_http_req.erl +++ b/src/cowboy_http_req.erl @@ -34,7 +34,8 @@ ]). %% Request API. -export([ - body/1, body/2, body_qs/1, + has_body/1, body_length/1, init_stream/4, stream_body/1, + skip_body/1, body/1, body/2, body_qs/1, multipart_data/1, multipart_skip/1 ]). %% Request Body API. @@ -231,6 +232,7 @@ parse_header(Name, Req=#http_req{p_headers=PHeaders}) -> %% @doc Default values for semantic header parsing. -spec parse_header_default(cowboy_http:header()) -> any(). parse_header_default('Connection') -> []; +parse_header_default('Transfer-Encoding') -> [<<"identity">>]; parse_header_default(_Name) -> undefined. %% @doc Semantically parse headers. @@ -290,6 +292,12 @@ parse_header(Name, Req, Default) fun (Value) -> cowboy_http:http_date(Value) end); +%% @todo Extension parameters. +parse_header(Name, Req, Default) when Name =:= 'Transfer-Encoding' -> + parse_header(Name, Req, Default, + fun (Value) -> + cowboy_http:nonempty_list(Value, fun cowboy_http:token_ci/2) + end); parse_header(Name, Req, Default) when Name =:= 'Upgrade' -> parse_header(Name, Req, Default, fun (Value) -> @@ -299,6 +307,7 @@ parse_header(Name, Req, Default) -> {Value, Req2} = header(Name, Req, Default), {undefined, Value, Req2}. +%% @todo This doesn't look in the cache. parse_header(Name, Req=#http_req{p_headers=PHeaders}, Default, Fun) -> case header(Name, Req) of {undefined, Req2} -> @@ -368,42 +377,179 @@ meta(Name, Req, Default) -> %% Request Body API. -%% @doc Return the full body sent with the request, or {error, badarg} -%% if no Content-Length is available. -%% @todo We probably want to allow a max length. -%% @todo Add multipart support to this function. +%% @doc Return whether the request message has a body. +-spec has_body(#http_req{}) -> {boolean(), #http_req{}}. +has_body(Req) -> + Has = lists:keymember('Content-Length', 1, Req#http_req.headers) orelse + lists:keymember('Transfer-Encoding', 1, Req#http_req.headers), + {Has, Req}. + +%% @doc Return the request message body length, if known. +%% +%% The length may not be known if Transfer-Encoding is not identity, +%% and the body hasn't been read at the time of the call. +-spec body_length(#http_req{}) -> {undefined | non_neg_integer(), #http_req{}}. +body_length(Req) -> + case lists:keymember('Transfer-Encoding', 1, Req#http_req.headers) of + true -> {undefined, Req}; + false -> parse_header('Content-Length', Req, 0) + end. + +%% @doc Initialize body streaming and set custom decoding functions. +%% +%% Calling this function is optional. It should only be used if you +%% need to override the default behavior of Cowboy. Otherwise you +%% should call stream_body/1 directly. +%% +%% Two decodings happen. First a decoding function is applied to the +%% transferred data, and then another is applied to the actual content. +%% +%% Transfer encoding is generally used for chunked bodies. The decoding +%% function uses a state to keep track of how much it has read, which is +%% also initialized through this function. +%% +%% Content encoding is generally used for compression. +%% +%% Standard encodings can be found in cowboy_http. +-spec init_stream(fun(), any(), fun(), #http_req{}) -> {ok, #http_req{}}. +init_stream(TransferDecode, TransferState, ContentDecode, Req) -> + {ok, Req#http_req{body_state= + {stream, TransferDecode, TransferState, ContentDecode}}}. + +%% @doc Stream the request's body. +%% +%% This is the most low level function to read the request body. +%% +%% In most cases, if they weren't defined before using stream_body/4, +%% this function will guess which transfer and content encodings were +%% used for building the request body, and configure the decoding +%% functions that will be used when streaming. +%% +%% It then starts streaming the body, returning {ok, Data, Req} +%% for each streamed part, and {done, Req} when it's finished streaming. +-spec stream_body(#http_req{}) -> {ok, binary(), #http_req{}} + | {done, #http_req{}} | {error, atom()}. +stream_body(Req=#http_req{body_state=waiting}) -> + case parse_header('Transfer-Encoding', Req) of + {[<<"chunked">>], Req2} -> + stream_body(Req2#http_req{body_state= + {stream, fun cowboy_http:te_chunked/2, {0, 0}, + fun cowboy_http:ce_identity/1}}); + {[<<"identity">>], Req2} -> + {Length, Req3} = body_length(Req2), + case Length of + 0 -> + {done, Req3#http_req{body_state=done}}; + Length -> + stream_body(Req3#http_req{body_state= + {stream, fun cowboy_http:te_identity/2, {0, Length}, + fun cowboy_http:ce_identity/1}}) + end + end; +stream_body(Req=#http_req{buffer=Buffer, body_state={stream, _, _, _}}) + when Buffer =/= <<>> -> + transfer_decode(Buffer, Req#http_req{buffer= <<>>}); +stream_body(Req=#http_req{body_state={stream, _, _, _}}) -> + stream_body_recv(Req); +stream_body(Req=#http_req{body_state=done}) -> + {done, Req}. + +-spec stream_body_recv(#http_req{}) + -> {ok, binary(), #http_req{}} | {error, atom()}. +stream_body_recv(Req=#http_req{transport=Transport, socket=Socket}) -> + %% @todo Allow configuring the timeout. + case Transport:recv(Socket, 0, 5000) of + {ok, Data} -> transfer_decode(Data, Req); + {error, Reason} -> {error, Reason} + end. + +-spec transfer_decode(binary(), #http_req{}) + -> {ok, binary(), #http_req{}} | {error, atom()}. +transfer_decode(Data, Req=#http_req{ + body_state={stream, TransferDecode, TransferState, ContentDecode}}) -> + case TransferDecode(Data, TransferState) of + {ok, Data2, TransferState2} -> + content_decode(ContentDecode, Data2, Req#http_req{body_state= + {stream, TransferDecode, TransferState2, ContentDecode}}); + {ok, Data2, Rest, TransferState2} -> + content_decode(ContentDecode, Data2, Req#http_req{ + buffer=Rest, body_state= + {stream, TransferDecode, TransferState2, ContentDecode}}); + %% @todo {header(s) for chunked + more -> + stream_body_recv(Req); + {done, Length, Rest} -> + Req2 = transfer_decode_done(Length, Rest, Req), + {done, Req2}; + {done, Data2, Length, Rest} -> + Req2 = transfer_decode_done(Length, Rest, Req), + content_decode(ContentDecode, Data2, Req2); + {error, Reason} -> + {error, Reason} + end. + +-spec transfer_decode_done(non_neg_integer(), binary(), #http_req{}) + -> #http_req{}. +transfer_decode_done(Length, Rest, Req=#http_req{ + headers=Headers, p_headers=PHeaders}) -> + Headers2 = lists:keystore('Content-Length', 1, Headers, + {'Content-Length', list_to_binary(integer_to_list(Length))}), + %% At this point we just assume TEs were all decoded. + Headers3 = lists:keydelete('Transfer-Encoding', 1, Headers2), + PHeaders2 = lists:keystore('Content-Length', 1, PHeaders, + {'Content-Length', Length}), + PHeaders3 = lists:keydelete('Transfer-Encoding', 1, PHeaders2), + Req#http_req{buffer=Rest, body_state=done, + headers=Headers3, p_headers=PHeaders3}. + +%% @todo Probably needs a Rest. +-spec content_decode(fun(), binary(), #http_req{}) + -> {ok, binary(), #http_req{}} | {error, atom()}. +content_decode(ContentDecode, Data, Req) -> + case ContentDecode(Data) of + {ok, Data2} -> {ok, Data2, Req}; + {error, Reason} -> {error, Reason} + end. + +%% @doc Return the full body sent with the request. -spec body(#http_req{}) -> {ok, binary(), #http_req{}} | {error, atom()}. body(Req) -> - {Length, Req2} = cowboy_http_req:parse_header('Content-Length', Req), - case Length of - undefined -> {error, badarg}; - {error, badarg} -> {error, badarg}; - _Any -> - body(Length, Req2) - end. + read_body(infinity, Req, <<>>). -%% @doc Return Length bytes of the request body. +%% @doc Return the full body sent with the request as long as the body +%% length doesn't go over MaxLength. %% -%% You probably shouldn't be calling this function directly, as it expects the -%% Length argument to be the full size of the body, and will consider -%% the body to be fully read from the socket. -%% @todo We probably want to configure the timeout. --spec body(non_neg_integer(), #http_req{}) +%% This is most useful to quickly be able to get the full body while +%% avoiding filling your memory with huge request bodies when you're +%% not expecting it. +-spec body(non_neg_integer() | infinity, #http_req{}) -> {ok, binary(), #http_req{}} | {error, atom()}. -body(Length, Req=#http_req{body_state=waiting, buffer=Buffer}) - when is_integer(Length) andalso Length =< byte_size(Buffer) -> - << Body:Length/binary, Rest/bits >> = Buffer, - {ok, Body, Req#http_req{body_state=done, buffer=Rest}}; -body(Length, Req=#http_req{socket=Socket, transport=Transport, - body_state=waiting, buffer=Buffer}) -> - case Transport:recv(Socket, Length - byte_size(Buffer), 5000) of - {ok, Body} -> {ok, << Buffer/binary, Body/binary >>, - Req#http_req{body_state=done, buffer= <<>>}}; +body(MaxLength, Req) -> + read_body(MaxLength, Req, <<>>). + +-spec read_body(non_neg_integer() | infinity, #http_req{}, binary()) + -> {ok, binary(), #http_req{}} | {error, atom()}. +read_body(MaxLength, Req, Acc) when MaxLength > byte_size(Acc) -> + case stream_body(Req) of + {ok, Data, Req2} -> + read_body(MaxLength, Req2, << Acc/binary, Data/binary >>); + {done, Req2} -> + {ok, Acc, Req2}; + {error, Reason} -> + {error, Reason} + end. + +-spec skip_body(#http_req{}) -> {ok, #http_req{}} | {error, atom()}. +skip_body(Req) -> + case stream_body(Req) of + {ok, _, Req2} -> skip_body(Req2); + {done, Req2} -> {ok, Req2}; {error, Reason} -> {error, Reason} end. %% @doc Return the full body sent with the reqest, parsed as an %% application/x-www-form-urlencoded string. Essentially a POST query string. +%% @todo We need an option to limit the size of the body for QS too. -spec body_qs(#http_req{}) -> {list({binary(), binary() | true}), #http_req{}}. body_qs(Req=#http_req{urldecode={URLDecFun, URLDecArg}}) -> {ok, Body, Req2} = body(Req), diff --git a/src/cowboy_http_websocket.erl b/src/cowboy_http_websocket.erl index f08405bcf..bc2871264 100644 --- a/src/cowboy_http_websocket.erl +++ b/src/cowboy_http_websocket.erl @@ -175,12 +175,18 @@ websocket_handshake(State=#state{version=0, origin=Origin, %% We replied with a proper response. Proxies should be happy enough, %% we can now read the 8 last bytes of the challenge keys and send %% the challenge response directly to the socket. - case cowboy_http_req:body(8, Req2) of - {ok, Key3, Req3} -> + %% + %% We use a trick here to read exactly 8 bytes of the body regardless + %% of what's in the buffer. + {ok, Req3} = cowboy_http_req:init_stream( + fun cowboy_http:te_identity/2, {0, 8}, + fun cowboy_http:ce_identity/1, Req2), + case cowboy_http_req:body(Req3) of + {ok, Key3, Req4} -> Challenge = hixie76_challenge(Key1, Key2, Key3), Transport:send(Socket, Challenge), handler_before_loop(State#state{messages=Transport:messages()}, - Req3, HandlerState, <<>>); + Req4, HandlerState, <<>>); _Any -> closed %% If an error happened reading the body, stop there. end; diff --git a/test/http_SUITE.erl b/test/http_SUITE.erl index eaf4c2281..cebc1e6bc 100644 --- a/test/http_SUITE.erl +++ b/test/http_SUITE.erl @@ -24,7 +24,8 @@ pipeline/1, raw/1, set_resp_header/1, set_resp_overwrite/1, set_resp_body/1, stream_body_set_resp/1, response_as_req/1, static_mimetypes_function/1, static_attribute_etag/1, - static_function_etag/1, multipart/1]). %% http. + static_function_etag/1, multipart/1, te_identity/1, + te_chunked/1, te_chunked_delayed/1]). %% http. -export([http_200/1, http_404/1, handler_errors/1, file_200/1, file_403/1, dir_403/1, file_404/1, file_400/1]). %% http and https. @@ -47,7 +48,8 @@ groups() -> set_resp_header, set_resp_overwrite, set_resp_body, response_as_req, stream_body_set_resp, static_mimetypes_function, static_attribute_etag, - static_function_etag, multipart] ++ BaseTests}, + static_function_etag, multipart, te_identity, te_chunked, + te_chunked_delayed] ++ BaseTests}, {https, [], BaseTests}, {misc, [], [http_10_hostless, http_10_chunkless]}, {rest, [], [rest_simple, rest_keepalive, rest_keepalive_post, @@ -165,6 +167,7 @@ init_http_dispatch(Config) -> [{directory, ?config(static_dir, Config)}, {etag, {fun static_function_etag/2, etag_data}}]}, {[<<"multipart">>], http_handler_multipart, []}, + {[<<"echo">>, <<"body">>], http_handler_echo_body, []}, {[], http_handler, []} ]} ]. @@ -530,6 +533,57 @@ static_function_etag(Arguments, etag_data) -> [Checksum|_] = string:tokens(os:cmd(ChecksumCommand), " "), {strong, iolist_to_binary(Checksum)}. +te_identity(Config) -> + {port, Port} = lists:keyfind(port, 1, Config), + {ok, Socket} = gen_tcp:connect("localhost", Port, + [binary, {active, false}, {packet, raw}]), + Body = list_to_binary(io_lib:format("~p", [lists:seq(1, 100)])), + StrLen = integer_to_list(byte_size(Body)), + ok = gen_tcp:send(Socket, ["GET /echo/body HTTP/1.1\r\n" + "Host: localhost\r\nConnection: close\r\n" + "Content-Length: ", StrLen, "\r\n\r\n", Body]), + {ok, Data} = gen_tcp:recv(Socket, 0, 6000), + {_, _} = binary:match(Data, Body). + +te_chunked(Config) -> + {port, Port} = lists:keyfind(port, 1, Config), + {ok, Socket} = gen_tcp:connect("localhost", Port, + [binary, {active, false}, {packet, raw}]), + Body = list_to_binary(io_lib:format("~p", [lists:seq(1, 100)])), + Chunks = body_to_chunks(50, Body, []), + ok = gen_tcp:send(Socket, ["GET /echo/body HTTP/1.1\r\n" + "Host: localhost\r\nConnection: close\r\n" + "Transfer-Encoding: chunked\r\n\r\n", Chunks]), + {ok, Data} = gen_tcp:recv(Socket, 0, 6000), + {_, _} = binary:match(Data, Body). + +te_chunked_delayed(Config) -> + {port, Port} = lists:keyfind(port, 1, Config), + {ok, Socket} = gen_tcp:connect("localhost", Port, + [binary, {active, false}, {packet, raw}]), + Body = list_to_binary(io_lib:format("~p", [lists:seq(1, 100)])), + Chunks = body_to_chunks(50, Body, []), + ok = gen_tcp:send(Socket, ["GET /echo/body HTTP/1.1\r\n" + "Host: localhost\r\nConnection: close\r\n" + "Transfer-Encoding: chunked\r\n\r\n"]), + _ = [begin ok = gen_tcp:send(Socket, Chunk), ok = timer:sleep(10) end + || Chunk <- Chunks], + {ok, Data} = gen_tcp:recv(Socket, 0, 6000), + {_, _} = binary:match(Data, Body). + +body_to_chunks(_, <<>>, Acc) -> + lists:reverse([<<"0\r\n\r\n">>|Acc]); +body_to_chunks(ChunkSize, Body, Acc) -> + BodySize = byte_size(Body), + ChunkSize2 = case BodySize < ChunkSize of + true -> BodySize; + false -> ChunkSize + end, + << Chunk:ChunkSize2/binary, Rest/binary >> = Body, + ChunkSizeBin = list_to_binary(integer_to_list(ChunkSize2, 16)), + body_to_chunks(ChunkSize, Rest, + [<< ChunkSizeBin/binary, "\r\n", Chunk/binary, "\r\n" >>|Acc]). + %% http and https. build_url(Path, Config) -> diff --git a/test/http_handler_echo_body.erl b/test/http_handler_echo_body.erl new file mode 100644 index 000000000..b64ae7bc3 --- /dev/null +++ b/test/http_handler_echo_body.erl @@ -0,0 +1,19 @@ +%% Feel free to use, reuse and abuse the code in this file. + +-module(http_handler_echo_body). +-behaviour(cowboy_http_handler). +-export([init/3, handle/2, terminate/2]). + +init({_, http}, Req, _) -> + {ok, Req, undefined}. + +handle(Req, State) -> + {true, Req1} = cowboy_http_req:has_body(Req), + {ok, Body, Req2} = cowboy_http_req:body(Req1), + {Size, Req3} = cowboy_http_req:body_length(Req2), + Size = byte_size(Body), + {ok, Req4} = cowboy_http_req:reply(200, [], Body, Req3), + {ok, Req4, State}. + +terminate(_, _) -> + ok.