Skip to content

Commit

Permalink
Merge pull request #350 from basho/gh625-stream-timeout
Browse files Browse the repository at this point in the history
Add test for streaming timeout message
  • Loading branch information
russelldb committed Aug 21, 2013
2 parents 03dafe2 + 1c79006 commit a422d41
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 16 deletions.
58 changes: 48 additions & 10 deletions tests/secondary_index_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ pb_query(Pid, {Field, Start, End}, Opts) ->
riakc_pb_socket:get_index_range(Pid, ?BUCKET, Field, Start, End, Opts).

http_stream(NodePath, Query, Opts) ->
http_query(NodePath, Query, Opts, stream).
http_query(NodePath, Query, [{stream, true} | Opts], stream).

http_query(NodePath, Q) ->
http_query(NodePath, Q, []).
Expand Down Expand Up @@ -173,7 +173,7 @@ http_get(Url, undefined) ->
http_get(Url, stream) ->
lager:info("streaming ~p", [Url]),
{ok, Ref} = httpc:request(get, {Url, []}, [], [{stream, self}, {sync, false}]),
http_stream_loop(Ref, []).
start_http_stream(Ref).
opts_to_qstring([], QString) ->
QString;
Expand All @@ -193,15 +193,53 @@ opt_to_string(Sep, Name) ->
url_encode(Val) when is_binary(Val) ->
url_encode(binary_to_list(Val));
url_encode(Val) when is_atom(Val) ->
url_encode(atom_to_list(Val));
url_encode(Val) ->
ibrowse_lib:url_encode(Val).
http_stream_loop(Ref, Acc) ->
receive {http, {Ref, stream_end, _Headers}} ->
{struct, Result} = mochijson2:decode(lists:flatten(lists:reverse(Acc))),
Result;
{http, {Ref, stream_start, _Headers}} ->
http_stream_loop(Ref, Acc);
{http, {Ref, stream, Body}} ->
http_stream_loop(Ref, [binary_to_list(Body) | Acc])
start_http_stream(Ref) ->
receive
{http, {Ref, stream_start, Headers}} ->
Boundary = get_boundary(proplists:get_value("content-type", Headers)),
http_stream_loop(Ref, orddict:new(), Boundary)
end.
http_stream_loop(Ref, Acc, {Boundary, BLen}=B) ->
receive
{http, {Ref, stream_end, _Headers}} ->
orddict:to_list(Acc);
{http, {Ref, stream, <<"\r\n--", Boundary:BLen/bytes, "\r\nContent-Type: application/json\r\n\r\n", Body/binary>>}} ->
ReverseBoundary = reverse_bin(<<"\r\n--", Boundary:BLen/binary, "--\r\n">>),
Message = get_message(ReverseBoundary, reverse_bin(Body)),
{struct, Result} = mochijson2:decode(Message),
Acc2 = lists:foldl(fun({K, V}, A) -> orddict:update(K, fun(Existing) -> Existing++V end, V, A) end,
Acc,
Result),
http_stream_loop(Ref, Acc2, B);
{http, {Ref, stream, <<"\r\n--", Boundary:BLen/bytes, "--\r\n">>}} ->
http_stream_loop(Ref, Acc, B);
Other -> lager:error("Unexpected message ~p", [Other]),
{error, unknown_message}
after 60000 ->
{error, timeout_local}
end.
get_boundary("multipart/mixed;boundary=" ++ Boundary) ->
B = list_to_binary(Boundary),
{B, byte_size(B)};
get_boundary(_) ->
undefined.
reverse_bin(Bin) ->
list_to_binary(lists:reverse(binary_to_list(Bin))).
get_message(Boundary, Body) ->
BLen = byte_size(Boundary),
case Body of
<<Boundary:BLen/binary, Message/binary>> ->
reverse_bin(Message);
_ -> reverse_bin(Body)
end.
5 changes: 2 additions & 3 deletions tests/verify_2i_limit.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("riakc/include/riakc.hrl").
-import(secondary_index_tests, [put_an_object/2, put_an_object/4, int_to_key/1,
stream_pb/3, http_query/3, pb_query/3]).
stream_pb/3, http_query/3, pb_query/3, http_stream/3]).
-define(BUCKET, <<"2ibucket">>).
-define(FOO, <<"foo">>).
-define(MAX_RESULTS, 50).
Expand Down Expand Up @@ -52,7 +52,7 @@ confirm() ->
?assertEqual(Rest, proplists:get_value(keys, PBKeys2, [])),

%% HTTP
HttpRes = http_query(RiakHttp, Q, [{max_results, ?MAX_RESULTS}]),
HttpRes = http_stream(RiakHttp, Q, [{max_results, ?MAX_RESULTS}]),
?assertEqual(FirstHalf, proplists:get_value(<<"keys">>, HttpRes, [])),
HttpContinuation = proplists:get_value(<<"continuation">>, HttpRes),
?assertEqual(PBContinuation, HttpContinuation),
Expand Down Expand Up @@ -115,4 +115,3 @@ verify_eq_pag(PBPid, RiakHttp, EqualsQuery, FirstHalf, Rest) ->
[{continuation, EqPBContinuation}]),
?assertEqual({EqualsQuery, Rest},
{EqualsQuery, proplists:get_value(keys, EqPBKeys2, [])}).

14 changes: 11 additions & 3 deletions tests/verify_2i_timeout.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
-export([confirm/0]).
-include_lib("eunit/include/eunit.hrl").
-import(secondary_index_tests, [put_an_object/2, put_an_object/4, int_to_key/1,
stream_pb/3, url/2, http_query/3]).
stream_pb/3, url/2, http_query/3, http_stream/3]).
-define(BUCKET, <<"2ibucket">>).
-define(FOO, <<"foo">>).

Expand All @@ -47,14 +47,22 @@ confirm() ->
{ok, Res} = stream_pb(PBPid, Query, [{timeout, 5000}]),
?assertEqual(ExpectedKeys, proplists:get_value(keys, Res, [])),

{ok, {{_, 500, _}, _, Body}} = httpc:request(url("~s/buckets/~s/index/~s/~s~s",
{ok, {{_, 503, _}, _, Body}} = httpc:request(url("~s/buckets/~s/index/~s/~s~s",
[Http, ?BUCKET, <<"$bucket">>, ?BUCKET, []])),
?assertMatch({match, _}, re:run(Body, "{error,timeout}")), %% shows the app.config timeout

?assertMatch({match, _}, re:run(Body, "request timed out")), %% shows the app.config timeout

HttpRes = http_query(Http, Query, [{timeout, 5000}]),
?assertEqual(ExpectedKeys, proplists:get_value(<<"keys">>, HttpRes, [])),

stream_http(Http, Query, ExpectedKeys),

riakc_pb_socket:stop(PBPid),
pass.

stream_http(Http, Query, ExpectedKeys) ->
Res = http_stream(Http, Query, []),
?assert(lists:member({<<"error">>,<<"timeout">>}, Res)),
Res2 = http_stream(Http, Query, [{timeout, 5000}]),
?assertEqual(ExpectedKeys, proplists:get_value(<<"keys">>, Res2, [])).

0 comments on commit a422d41

Please sign in to comment.