Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(http): optimize message reply processing #2

Merged
merged 4 commits into from
Jan 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/ehttpc.appup.src
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
%% -*-: erlang -*-
{0.1.1,
{"0.1.1",
[
{"0.1.0", [
{load_module, ehttpc, brutal_purge, soft_purge, []}
{code_change, up, [{ehttpc, []}]}
]},
{<<".*">>, []}
],
[
{"0.1.0", [
{load_module, ehttpc, brutal_purge, soft_purge, []}
{code_change, down, [{ehttpc, []}]}
]},
{<<".*">>, []}
]
Expand Down
149 changes: 104 additions & 45 deletions src/ehttpc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@
host :: inet:hostname() | inet:ip_address(),
port :: inet:port_number(),
gun_opts :: proplists:proplist(),
gun_state :: down | up
gun_state :: down | up,
requests :: map()
}).

%%--------------------------------------------------------------------
Expand All @@ -59,7 +60,7 @@ request(Worker, Method, Req) ->
request(Worker, Method, Req, 5000).

request(Worker, Method, Req, Timeout) ->
gen_server:call(Worker, {Method, Req, Timeout}, infinity).
gen_server:call(Worker, {Method, Req, Timeout}, Timeout + 1000).

workers(Pool) ->
gproc_pool:active_workers(name(Pool)).
Expand All @@ -78,7 +79,8 @@ init([Pool, Id, Opts]) ->
host = proplists:get_value(host, Opts),
port = proplists:get_value(port, Opts),
gun_opts = gun_opts(Opts),
gun_state = down},
gun_state = down,
requests = #{}},
true = gproc_pool:connect_worker(ehttpc:name(Pool), {Pool, Id}),
{ok, State}.

Expand All @@ -101,9 +103,10 @@ handle_call(Req = {_, _, Timeout}, From, State = #state{client = Client, mref =
{reply, {error, Reason}, State#state{client = undefined, mref = undefined}}
end;

handle_call({Method, Request, Timeout}, _From, State = #state{client = Client, gun_state = up}) when is_pid(Client) ->
handle_call({Method, Request, Timeout}, From, State = #state{client = Client, requests = Requests, gun_state = up}) when is_pid(Client) ->
StreamRef = do_request(Client, Method, Request),
await_response(StreamRef, Timeout, State);
ExpirationTime = erlang:system_time(millisecond) + Timeout,
{noreply, State#state{requests = maps:put(StreamRef, {From, ExpirationTime, undefined}, Requests)}};

handle_call(Req, _From, State) ->
?LOG(error, "Unexpected call: ~p", [Req]),
Expand All @@ -113,19 +116,95 @@ handle_cast(Msg, State) ->
?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}.

handle_info({gun_response, Client, StreamRef, IsFin, StatusCode, Headers}, State = #state{client = Client, requests = Requests}) ->
Now = erlang:system_time(millisecond),
case maps:take(StreamRef, Requests) of
error ->
?LOG(error, "Received 'gun_response' message from unknown stream ref: ~p", [StreamRef]),
{noreply, State};
{{_, ExpirationTime, _}, NRequests} when Now > ExpirationTime ->
gun:cancel(Client, StreamRef),
flush_stream(Client, StreamRef),
{noreply, State#state{requests = NRequests}};
{{From, ExpirationTime, undefined}, NRequests} ->
case IsFin of
fin ->
gen_server:reply(From, {ok, StatusCode, Headers}),
{noreply, State#state{requests = NRequests}};
nofin ->
{noreply, State#state{requests = NRequests#{StreamRef => {From, ExpirationTime, {StatusCode, Headers, <<>>}}}}}
end;
_ ->
?LOG(error, "Received 'gun_response' message does not match the state", []),
{noreply, State}
end;

handle_info({gun_data, Client, StreamRef, IsFin, Data}, State = #state{client = Client, requests = Requests}) ->
Now = erlang:system_time(millisecond),
case maps:take(StreamRef, Requests) of
error ->
?LOG(error, "Received 'gun_data' message from unknown stream ref: ~p", [StreamRef]),
{noreply, State};
{{_, ExpirationTime, _}, NRequests} when Now > ExpirationTime ->
gun:cancel(Client, StreamRef),
flush_stream(Client, StreamRef),
{noreply, State#state{requests = NRequests}};
{{From, ExpirationTime, {StatusCode, Headers, Acc}}, NRequests} ->
case IsFin of
fin ->
gen_server:reply(From, {ok, StatusCode, Headers, <<Acc/binary, Data/binary>>}),
{noreply, State#state{requests = NRequests}};
nofin ->
{noreply, State#state{requests = NRequests#{StreamRef => {From, ExpirationTime, {StatusCode, Headers, <<Acc/binary, Data/binary>>}}}}}
end;
_ ->
?LOG(error, "Received 'gun_data' message does not match the state", []),
{noreply, State}
end;

handle_info({gun_error, Client, StreamRef, Reason}, State = #state{client = Client, requests = Requests}) ->
Now = erlang:system_time(millisecond),
case maps:take(StreamRef, Requests) of
error ->
?LOG(error, "Received 'gun_error' message from unknown stream ref: ~p~n", [StreamRef]),
{noreply, State};
{{_, ExpirationTime, _}, NRequests} when Now > ExpirationTime ->
{noreply, State#state{requests = NRequests}};
{{From, _, _}, NRequests} ->
gen_server:reply(From, {error, Reason}),
{noreply, State#state{requests = NRequests}}
end;

handle_info({gun_up, Client, _}, State = #state{client = Client}) ->
{noreply, State#state{gun_state = up}};

handle_info({gun_down, Client, _, _Reason, _, _}, State = #state{client = Client}) ->
{noreply, State#state{gun_state = down}};

handle_info({'DOWN', MRef, process, Client, Reason}, State = #state{mref = MRef, client = Client}) ->
handle_info({gun_down, Client, _, Reason, KilledStreams, _}, State = #state{client = Client, requests = Requests}) ->
Now = erlang:system_time(millisecond),
NRequests = lists:foldl(fun(StreamRef, Acc) ->
case maps:take(StreamRef, Acc) of
error ->
Acc;
{{_, ExpirationTime, _}, NAcc} when Now > ExpirationTime ->
NAcc;
{{From, _, _}, NAcc} ->
gen_server:reply(From, {error, Reason}),
NAcc
end
end, Requests, KilledStreams),
{noreply, State#state{gun_state = down, requests = NRequests}};

handle_info({'DOWN', MRef, process, Client, Reason}, State = #state{mref = MRef, client = Client, requests = Requests}) ->
true = erlang:demonitor(MRef, [flush]),
case open(State) of
Now = erlang:system_time(millisecond),
lists:foreach(fun({_, {_, ExpirationTime, _}}) when Now > ExpirationTime ->
ok;
({_, {From, _, _}}) ->
gen_server:reply(From, {error, Reason})
end, maps:to_list(Requests)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe maps:fold/3 is better than maps:to_list/1 + lists:foreach/2?

case open(State#state{requests = #{}}) of
{ok, NewState} ->
{noreply, NewState};
{error, Reason} ->
%% TODO: print warning log
{noreply, State#state{mref = undefined, client = undefined}}
end;

Expand All @@ -137,8 +216,10 @@ terminate(_Reason, #state{pool = Pool, id = Id}) ->
gproc_pool:disconnect_worker(ehttpc:name(Pool), {Pool, Id}),
ok.

code_change(_OldVsn, State, _Extra) ->
{ok, State}.
code_change({down, "0.1.0"}, {state, Pool, ID, Client, MRef, Host, Port, GunOpts, GunState, _}, _Extra) ->
{ok, {state, Pool, ID, Client, MRef, Host, Port, GunOpts, GunState}};
code_change("0.1.0", {state, Pool, ID, Client, MRef, Host, Port, GunOpts, GunState}, _Extra) ->
{ok, {state, Pool, ID, Client, MRef, Host, Port, GunOpts, GunState, #{}}}.

%%--------------------------------------------------------------------
%% Internal functions
Expand Down Expand Up @@ -184,36 +265,14 @@ do_request(Client, put, {Path, Headers, Body}) ->
do_request(Client, delete, {Path, Headers}) ->
gun:delete(Client, Path, Headers).

await_response(StreamRef, Timeout, State = #state{client = Client, mref = MRef}) ->
flush_stream(Client, StreamRef) ->
receive
{gun_response, Client, StreamRef, fin, StatusCode, Headers} ->
{reply, {ok, StatusCode, Headers}, State};
{gun_response, Client, StreamRef, nofin, StatusCode, Headers} ->
await_body(StreamRef, Timeout, {StatusCode, Headers, <<>>}, State);
{gun_error, Client, StreamRef, Reason} ->
{reply, {error, Reason}, State};
{'DOWN', MRef, process, Client, Reason} ->
true = erlang:demonitor(MRef, [flush]),
{reply, {error, Reason}, State#state{client = undefined, mref = undefiend}}
after Timeout ->
gun:cancel(Client, StreamRef),
{reply, {error, timeout}, State}
end.

await_body(StreamRef, Timeout, {StatusCode, Headers, Acc}, State = #state{client = Client, mref = MRef}) ->
receive
{gun_data, Client, StreamRef, fin, Data} ->
{reply, {ok, StatusCode, Headers, << Acc/binary, Data/binary >>}, State};
{gun_data, Client, StreamRef, nofin, Data} ->
await_body(StreamRef, Timeout, {StatusCode, Headers, << Acc/binary, Data/binary >>}, State);
% {gun_error, Client, StreamRef, {closed, _}} ->
% todo;
{gun_error, Client, StreamRef, Reason} ->
{reply, {error, Reason}, State};
{'DOWN', MRef, process, Client, Reason} ->
true = erlang:demonitor(MRef, [flush]),
{reply, {error, Reason}, State#state{client = undefined, mref = undefiend}}
after Timeout ->
gun:cancel(Client, StreamRef),
{reply, {error, timeout}, State}
end.
{gun_response, Client, StreamRef, _, _, _} ->
flush_stream(Client, StreamRef);
{gun_data, Client, StreamRef, _, _} ->
flush_stream(Client, StreamRef);
{gun_error, Client, StreamRef, _} ->
flush_stream(Client, StreamRef)
after 0 ->
ok
end.