Skip to content

Commit

Permalink
MB-5398 Remove timeout when requesting next data chunk
Browse files Browse the repository at this point in the history
This timeout existed to deal with misbehaviour of the http
library ibrowse.
Ibrowse is no longer used, lhttpc replaced it and doesn't
seem to suffer the same issue, as it isn't modeled with
a gen_server, passive sockets and complex state transitions
as ibrowse is (just uses plain old socket recv calls when
a data chunk is request by user).

This affected view queries with stale=false option in a multi-node
scenario where the client would get timeout errors for remote
nodes.

Lhttpc patch submitted and merged upstream:

esl/lhttpc#18

Change-Id: I28a0a1607420c3df717dbdc2d3dfa6d2a0bb58bb
Reviewed-on: http://review.couchbase.org/17980
Reviewed-by: Volker Mische <volker.mische@gmail.com>
Tested-by: Filipe David Borba Manana <fdmanana@gmail.com>
  • Loading branch information
fdmanana committed Jul 6, 2012
1 parent 588f33d commit 70f1faa
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 43 deletions.
24 changes: 12 additions & 12 deletions src/couch_index_merger/src/couch_index_merger.erl
Expand Up @@ -643,7 +643,7 @@ http_index_folder(Mod, IndexSpec, MergeParams, DDoc, Queue) ->
Streamer = get(streamer_pid),
case is_pid(Streamer) andalso is_process_alive(Streamer) of
true ->
catch stream_all(Streamer, MergeParams#index_merge.conn_timeout, []);
catch stream_all(Streamer, []);
false ->
ok
end
Expand All @@ -665,10 +665,10 @@ run_http_index_folder(Mod, IndexSpec, MergeParams, DDoc, Queue) ->
{win32, _} ->
% TODO: make couch_view_parser build and run on Windows
EventFun = Mod:make_event_fun(MergeParams#index_merge.http_params, Queue),
DataFun = fun() -> stream_data(Pid, Timeout) end,
DataFun = fun() -> stream_data(Pid) end,
json_stream_parse:events(DataFun, EventFun);
_ ->
DataFun = fun() -> next_chunk(Pid, Timeout) end,
DataFun = fun() -> next_chunk(Pid) end,
ok = couch_http_view_streamer:parse(DataFun, Queue, get(from_url))
end
catch throw:{error, Error} ->
Expand All @@ -679,7 +679,7 @@ run_http_index_folder(Mod, IndexSpec, MergeParams, DDoc, Queue) ->
{ok, {{Code, _}, _RespHeaders, Pid}} when is_pid(Pid) ->
put(streamer_pid, Pid),
Error = try
stream_all(Pid, Timeout, [])
stream_all(Pid, [])
catch throw:{error, _Error} ->
<<"Error code ", (?l2b(integer_to_list(Code)))/binary>>
end,
Expand Down Expand Up @@ -707,19 +707,19 @@ run_http_index_folder(Mod, IndexSpec, MergeParams, DDoc, Queue) ->
end.


stream_data(Pid, Timeout) ->
case lhttpc:get_body_part(Pid, Timeout) of
stream_data(Pid) ->
case lhttpc:get_body_part(Pid) of
{ok, {http_eob, _Trailers}} ->
{<<>>, fun() -> throw({error, <<"more view data expected">>}) end};
{ok, Data} ->
{Data, fun() -> stream_data(Pid, Timeout) end};
{Data, fun() -> stream_data(Pid) end};
{error, _} = Error ->
throw(Error)
end.


next_chunk(Pid, Timeout) ->
case lhttpc:get_body_part(Pid, Timeout) of
next_chunk(Pid) ->
case lhttpc:get_body_part(Pid) of
{ok, {http_eob, _Trailers}} ->
eof;
{ok, _Data} = Ok ->
Expand All @@ -729,12 +729,12 @@ next_chunk(Pid, Timeout) ->
end.


stream_all(Pid, Timeout, Acc) ->
case stream_data(Pid, Timeout) of
stream_all(Pid, Acc) ->
case stream_data(Pid) of
{<<>>, _} ->
iolist_to_binary(lists:reverse(Acc));
{Data, _} ->
stream_all(Pid, Timeout, [Data | Acc])
stream_all(Pid, [Data | Acc])
end.


Expand Down
8 changes: 6 additions & 2 deletions src/lhttpc/lhttpc.erl
Expand Up @@ -507,7 +507,9 @@ get_body_part(Pid) ->
%% `{partial_download, PartialDownloadOptions}' is used.
%% `Timeout' is the timeout for reading the next body part in milliseconds.
%% `http_eob' marks the end of the body. If there were Trailers in the
%% response those are returned with `http_eob' as well.
%% response those are returned with `http_eob' as well.
%% If it evers returns an error, no further calls to this function should
%% be done.
%% @end
-spec get_body_part(pid(), timeout()) ->
{ok, binary()} |
Expand All @@ -519,7 +521,9 @@ get_body_part(Pid, Timeout) ->
Pid ! {ack, self()},
{ok, Bin};
{http_eob, Pid, Trailers} ->
{ok, {http_eob, Trailers}}
{ok, {http_eob, Trailers}};
{error, Pid, Reason} ->
{error, Reason}
after Timeout ->
kill_client(Pid)
end.
Expand Down
48 changes: 19 additions & 29 deletions src/lhttpc/lhttpc_client.erl
Expand Up @@ -489,48 +489,38 @@ read_partial_finite_body(State = #client_state{requester = To}, Hdrs,
exit(normal)
end;
read_partial_finite_body(State, Hdrs, ContentLength, Window) when Window >= 0->
Bin = read_body_part(State, ContentLength),
State#client_state.requester ! {body_part, self(), Bin},
To = State#client_state.requester,
receive
{ack, To} ->
Length = ContentLength - iolist_size(Bin),
read_partial_finite_body(State, Hdrs, Length, Window);
{'DOWN', _, process, To, _} ->
case read_body_part(State, ContentLength) of
{ok, Bin} ->
State#client_state.requester ! {body_part, self(), Bin},
To = State#client_state.requester,
receive
{ack, To} ->
Length = ContentLength - iolist_size(Bin),
read_partial_finite_body(State, Hdrs, Length, Window);
{'DOWN', _, process, To, _} ->
exit(normal)
after 0 ->
Length = ContentLength - iolist_size(Bin),
read_partial_finite_body(State, Hdrs, Length, lhttpc_lib:dec(Window))
end;
{error, Reason} ->
State#client_state.requester ! {error, self(), Reason},
exit(normal)
after 0 ->
Length = ContentLength - iolist_size(Bin),
read_partial_finite_body(State, Hdrs, Length, lhttpc_lib:dec(Window))
end.

read_body_part(#client_state{part_size = infinity} = State, _ContentLength) ->
case lhttpc_sock:recv(State#client_state.socket, State#client_state.ssl) of
{ok, Data} ->
Data;
{error, Reason} ->
erlang:error(Reason)
end;
lhttpc_sock:recv(State#client_state.socket, State#client_state.ssl);
read_body_part(#client_state{part_size = PartSize} = State, ContentLength)
when PartSize =< ContentLength ->
Socket = State#client_state.socket,
Ssl = State#client_state.ssl,
PartSize = State#client_state.part_size,
case lhttpc_sock:recv(Socket, PartSize, Ssl) of
{ok, Data} ->
Data;
{error, Reason} ->
erlang:error(Reason)
end;
lhttpc_sock:recv(Socket, PartSize, Ssl);
read_body_part(#client_state{part_size = PartSize} = State, ContentLength)
when PartSize > ContentLength ->
Socket = State#client_state.socket,
Ssl = State#client_state.ssl,
case lhttpc_sock:recv(Socket, ContentLength, Ssl) of
{ok, Data} ->
Data;
{error, Reason} ->
erlang:error(Reason)
end.
lhttpc_sock:recv(Socket, ContentLength, Ssl).

read_length(Hdrs, Ssl, Socket, Length) ->
case lhttpc_sock:recv(Socket, Length, Ssl) of
Expand Down

0 comments on commit 70f1faa

Please sign in to comment.