Permalink
Browse files

Initial implementation of async requests (stream_to option)

  • Loading branch information...
1 parent e6afca9 commit 4116e354918f18ffc344b298010da2dcbecbfc8e @lpgauth lpgauth committed Jul 20, 2011
Showing with 48 additions and 33 deletions.
  1. +33 −21 src/lhttpc.erl
  2. +15 −12 src/lhttpc_client.erl
View
@@ -322,23 +322,32 @@ request(URL, Method, Hdrs, Body, Timeout, Options) ->
headers(), iolist(), pos_integer(), [option()]) -> result().
request(Host, Port, Ssl, Path, Method, Hdrs, Body, Timeout, Options) ->
verify_options(Options, []),
- Args = [self(), Host, Port, Ssl, Path, Method, Hdrs, Body, Options],
- Pid = spawn_link(lhttpc_client, request, Args),
- receive
- {response, Pid, R} ->
- R;
- {exit, Pid, Reason} ->
- % We would rather want to exit here, instead of letting the
- % linked client send us an exit signal, since this can be
- % caught by the caller.
- exit(Reason);
- {'EXIT', Pid, Reason} ->
- % This could happen if the process we're running in taps exits
- % and the client process exits due to some exit signal being
- % sent to it. Very unlikely though
- exit(Reason)
- after Timeout ->
- kill_client(Pid)
+ ReqId = erlang:now(),
+ case proplists:is_defined(stream_to, Options) of
+ true ->
+ StreamTo = proplists:get_value(stream_to, Options),
+ Args = [ReqId, StreamTo, Host, Port, Ssl, Path, Method, Hdrs, Body, Options],
+ Pid = spawn_link(lhttpc_client, request, Args),
+ {ReqId, Pid};
+ false ->
+ Args = [ReqId, self(), Host, Port, Ssl, Path, Method, Hdrs, Body, Options],
+ Pid = spawn_link(lhttpc_client, request, Args),
+ receive
+ {response, ReqId, Pid, R} ->
+ R;
+ {exit, ReqId, Pid, Reason} ->
+ % We would rather want to exit here, instead of letting the
+ % linked client send us an exit signal, since this can be
+ % caught by the caller.
+ exit(Reason);
+ {'EXIT', ReqId, Pid, Reason} ->
+ % This could happen if the process we're running in taps exits
+ % and the client process exits due to some exit signal being
+ % sent to it. Very unlikely though
+ exit(Reason)
+ after Timeout ->
+ kill_client(Pid)
+ end
end.
%% @spec (UploadState :: UploadState, BodyPart :: BodyPart) -> Result
@@ -388,7 +397,7 @@ send_body_part({Pid, 0}, IoList, Timeout) when is_pid(Pid) ->
receive
{ack, Pid} ->
send_body_part({Pid, 1}, IoList, Timeout);
- {response, Pid, R} ->
+ {response, _ReqId, Pid, R} ->
R;
{exit, Pid, Reason} ->
exit(Reason);
@@ -403,7 +412,7 @@ send_body_part({Pid, Window}, IoList, _Timeout) when Window > 0, is_pid(Pid) ->
receive
{ack, Pid} ->
{ok, {Pid, Window}};
- {reponse, Pid, R} ->
+ {response, _ReqId, Pid, R} ->
R;
{exit, Pid, Reason} ->
exit(Reason);
@@ -506,7 +515,7 @@ read_response(Pid, Timeout) ->
receive
{ack, Pid} ->
read_response(Pid, Timeout);
- {response, Pid, R} ->
+ {response, _ReqId, Pid, R} ->
R;
{exit, Pid, Reason} ->
exit(Reason);
@@ -521,7 +530,7 @@ kill_client(Pid) ->
unlink(Pid), % or we'll kill ourself :O
exit(Pid, timeout),
receive
- {response, Pid, R} ->
+ {response, _ReqId, Pid, R} ->
erlang:demonitor(Monitor, [flush]),
R;
{'DOWN', _, process, Pid, timeout} ->
@@ -547,6 +556,9 @@ verify_options([{connection_timeout, MS} | Options], Errors)
verify_options([{max_connections, MS} | Options], Errors)
when is_integer(MS), MS >= 0 ->
verify_options(Options, Errors);
+verify_options([{stream_to, Pid} | Options], Errors)
+ when is_pid(Pid) ->
+ verify_options(Options, Errors);
verify_options([{partial_upload, WindowSize} | Options], Errors)
when is_integer(WindowSize), WindowSize >= 0 ->
verify_options(Options, Errors);
View
@@ -34,11 +34,12 @@
%%% @type iolist() = [] | binary() | [char() | binary() | iolist()].
-module(lhttpc_client).
--export([request/9]).
+-export([request/10]).
-include("lhttpc_types.hrl").
-record(client_state, {
+ req_id :: tuple(),
host :: string(),
port = 80 :: integer(),
ssl = false :: true | false,
@@ -64,9 +65,10 @@
-define(CONNECTION_HDR(HDRS, DEFAULT),
string:to_lower(lhttpc_lib:header_value("connection", HDRS, DEFAULT))).
--spec request(pid(), string(), 1..65535, true | false, string(),
+-spec request(tuple(), pid(), string(), 1..65535, true | false, string(),
string() | atom(), headers(), iolist(), [option()]) -> no_return().
-%% @spec (From, Host, Port, Ssl, Path, Method, Hdrs, RequestBody, Options) -> ok
+%% @spec (ReqId, From, Host, Port, Ssl, Path, Method, Hdrs, RequestBody, Options) -> ok
+%% ReqId = tuple()
%% From = pid()
%% Host = string()
%% Port = integer()
@@ -78,16 +80,16 @@
%% Options = [Option]
%% Option = {connect_timeout, Milliseconds}
%% @end
-request(From, Host, Port, Ssl, Path, Method, Hdrs, Body, Options) ->
+request(ReqId, From, Host, Port, Ssl, Path, Method, Hdrs, Body, Options) ->
Result = try
- execute(From, Host, Port, Ssl, Path, Method, Hdrs, Body, Options)
+ execute(ReqId, From, Host, Port, Ssl, Path, Method, Hdrs, Body, Options)
catch
Reason ->
- {response, self(), {error, Reason}};
+ {response, ReqId, self(), {error, Reason}};
error:closed ->
- {response, self(), {error, connection_closed}};
+ {response, ReqId, self(), {error, connection_closed}};
error:Error ->
- {exit, self(), {Error, erlang:get_stacktrace()}}
+ {exit, ReqId, self(), {Error, erlang:get_stacktrace()}}
end,
case Result of
{response, _, {ok, {no_return, _}}} -> ok;
@@ -98,7 +100,7 @@ request(From, Host, Port, Ssl, Path, Method, Hdrs, Body, Options) ->
unlink(From),
ok.
-execute(From, Host, Port, Ssl, Path, Method, Hdrs, Body, Options) ->
+execute(ReqId, From, Host, Port, Ssl, Path, Method, Hdrs, Body, Options) ->
UploadWindowSize = proplists:get_value(partial_upload, Options),
PartialUpload = proplists:is_defined(partial_upload, Options),
PartialDownload = proplists:is_defined(partial_download, Options),
@@ -112,6 +114,7 @@ execute(From, Host, Port, Ssl, Path, Method, Hdrs, Body, Options) ->
LbRequest = {lb, Host, Port, Ssl, MaxConnections, ConnectionTimeout},
{ok, Lb} = gen_server:call(lhttpc_manager, LbRequest, infinity),
State = #client_state{
+ req_id = ReqId,
host = Host,
port = Port,
ssl = Ssl,
@@ -145,7 +148,7 @@ execute(From, Host, Port, Ssl, Path, Method, Hdrs, Body, Options) ->
end,
{ok, R}
end,
- {response, self(), Response}.
+ {response, ReqId, self(), Response}.
send_request(#client_state{attempts = 0}) ->
throw(connection_closed);
@@ -185,7 +188,7 @@ send_request(State) ->
partial_upload(State) ->
Response = {ok, {self(), State#client_state.upload_window}},
- State#client_state.requester ! {response, self(), Response},
+ State#client_state.requester ! {response, State#client_state.req_id, self(), Response},
partial_upload_loop(State#client_state{attempts = 1, request = undefined}).
partial_upload_loop(State = #client_state{requester = Pid}) ->
@@ -297,7 +300,7 @@ handle_response_body(#client_state{partial_download = true} = State, Vsn,
case has_body(Method, element(1, Status), Hdrs) of
true ->
Response = {ok, {Status, Hdrs, self()}},
- State#client_state.requester ! {response, self(), Response},
+ State#client_state.requester ! {response, State#client_state.req_id, self(), Response},
MonRef = erlang:monitor(process, State#client_state.requester),
Res = read_partial_body(State, Vsn, Hdrs, body_type(Hdrs)),
erlang:demonitor(MonRef, [flush]),

0 comments on commit 4116e35

Please sign in to comment.