Permalink
Browse files

Changing mechanism of the load balancer

Whenever a server would listen to TCP connections but never accept
them, the lhttpc application would leak a ton of processes until
the virtual machine is taken down.

This was due to the way setting up connections was being done
within the load balancer. This would lead to many milliseconds of
delay for each socket connection attempt, and an eventual queue
build-up would happen in the load balancer.
Because requests freely spawn processes, this ended up having
too many requests that the LB cannot deal with.

This fix changes the structure around so that each client is
responsible of setting their own socket and connection, enabling
the load balancer to easily deny connections to newer processes
when older ones are still stuck. Setting a good request timeout
can then insure that slow requests won't starve the system.
  • Loading branch information...
1 parent bbecd1d commit 68ab97c7096f422ac29dd5773a4681c3643d0b84 @ferd committed Nov 25, 2011
Showing with 348 additions and 369 deletions.
  1. +1 −0 rebar.config
  2. +5 −5 src/lhttpc.app.src
  3. +36 −14 src/lhttpc.erl
  4. +40 −44 src/lhttpc_client.erl
  5. +221 −166 src/lhttpc_lb.erl
  6. +1 −2 src/lhttpc_lib.erl
  7. +0 −125 src/lhttpc_manager.erl
  8. +1 −2 src/lhttpc_sock.erl
  9. +18 −7 src/lhttpc_sup.erl
  10. +25 −4 test/lhttpc_tests.erl
View
1 rebar.config
@@ -1 +1,2 @@
+{erl_opts, [debug_info]}.
{cover_enabled, true}.
View
10 src/lhttpc.app.src
@@ -29,11 +29,11 @@
%%% @end
{application, lhttpc,
[{description, "Lightweight HTTP Client"},
- {vsn, "1.2.6"},
- {modules, []},
- {registered, [lhttpc_manager]},
+ {vsn, "1.2.7"},
+ {modules, [lhttpc,lhttpc_sup,lhttpc_client,lhttpc_sock,lhttp_lb]},
+ {registered, [lhttpc_sup]},
{applications, [kernel, stdlib, ssl, crypto]},
- {mod, {lhttpc, nil}},
- {env, []}
+ {mod, {lhttpc, []}},
+ {env, [{connection_timeout, 300000}]}
]}.
View
50 src/lhttpc.erl
@@ -52,7 +52,7 @@
%% @hidden
-spec start(normal | {takeover, node()} | {failover, node()}, any()) ->
{ok, pid()}.
-start(_, _) ->
+start(_, Opts) ->
case lists:member({seed,1}, ssl:module_info(exports)) of
true ->
% Make sure that the ssl random number generator is seeded
@@ -61,7 +61,9 @@ start(_, _) ->
false ->
ok
end,
- lhttpc_sup:start_link().
+ if is_list(Opts) -> lhttpc_sup:start_link(Opts);
+ true -> lhttpc_sup:start_link()
+ end.
%% @hidden
-spec stop(any()) -> ok.
@@ -320,12 +322,16 @@ request(URL, Method, Hdrs, Body, Timeout, Options) ->
headers(), iolist(), pos_integer(), [option()]) -> result().
request(Host, Port, Ssl, Path, Method, Hdrs, Body, Timeout, Options) ->
verify_options(Options, []),
- ReqId = erlang:now(),
+ ReqId = now(),
case proplists:is_defined(stream_to, Options) of
true ->
StreamTo = proplists:get_value(stream_to, Options),
Args = [ReqId, StreamTo, Host, Port, Ssl, Path, Method, Hdrs, Body, Options],
- Pid = spawn(lhttpc_client, request_with_timeout, [Timeout, Args]),
+ Pid = spawn(lhttpc_client, request, Args),
+ spawn(fun() ->
+ R = kill_client_after(Pid, Timeout),
+ StreamTo ! {response, ReqId, Pid, R}
+ end),
{ReqId, Pid};
false ->
Args = [ReqId, self(), Host, Port, Ssl, Path, Method, Hdrs, Body, Options],
@@ -338,7 +344,7 @@ request(Host, Port, Ssl, Path, Method, Hdrs, Body, Timeout, Options) ->
% linked client send us an exit signal, since this can be
% caught by the caller.
exit(Reason);
- {'EXIT', ReqId, Pid, Reason} ->
+ {'EXIT', Pid, Reason} ->
% This could happen if the process we're running in taps exits
% and the client process exits due to some exit signal being
% sent to it. Very unlikely though
@@ -397,7 +403,7 @@ send_body_part({Pid, 0}, IoList, Timeout) when is_pid(Pid) ->
send_body_part({Pid, 1}, IoList, Timeout);
{response, _ReqId, Pid, R} ->
R;
- {exit, Pid, Reason} ->
+ {exit, _ReqId, Pid, Reason} ->
exit(Reason);
{'EXIT', Pid, Reason} ->
exit(Reason)
@@ -410,9 +416,9 @@ send_body_part({Pid, Window}, IoList, _Timeout) when Window > 0, is_pid(Pid) ->
receive
{ack, Pid} ->
{ok, {Pid, Window}};
- {response, _ReqId, Pid, R} ->
+ {reponse, _ReqId, Pid, R} ->
R;
- {exit, Pid, Reason} ->
+ {exit, _ReqId, Pid, Reason} ->
exit(Reason);
{'EXIT', Pid, Reason} ->
exit(Reason)
@@ -515,7 +521,7 @@ read_response(Pid, Timeout) ->
read_response(Pid, Timeout);
{response, _ReqId, Pid, R} ->
R;
- {exit, Pid, Reason} ->
+ {exit, _ReqId, Pid, Reason} ->
exit(Reason);
{'EXIT', Pid, Reason} ->
exit(Reason)
@@ -537,6 +543,23 @@ kill_client(Pid) ->
erlang:error(Reason)
end.
+kill_client_after(Pid, Timeout) ->
+ erlang:monitor(process, Pid),
+ receive
+ {'DOWN', _, process, Pid, _Reason} -> exit(normal)
+ after Timeout ->
+ catch unlink(Pid), % or we'll kill ourself :O
+ exit(Pid, timeout),
+ receive
+ {'DOWN', _, process, Pid, timeout} ->
+ {error, timeout};
+ {'DOWN', _, process, Pid, Reason} ->
+ erlang:error(Reason)
+ after 1000 ->
+ exit(normal) % silent failure!
+ end
+ end.
+
-spec verify_options(options(), options()) -> ok.
verify_options([{send_retry, N} | Options], Errors)
when is_integer(N), N >= 0 ->
@@ -551,11 +574,8 @@ verify_options([{connection_timeout, infinity} | Options], Errors) ->
verify_options([{connection_timeout, MS} | Options], Errors)
when is_integer(MS), MS >= 0 ->
verify_options(Options, Errors);
-verify_options([{max_connections, MS} | Options], Errors)
- when is_integer(MS), MS >= 0 ->
- verify_options(Options, Errors);
-verify_options([{stream_to, Pid} | Options], Errors)
- when is_pid(Pid) ->
+verify_options([{max_connections, N} | Options], Errors)
+ when is_integer(N), N > 0 ->
verify_options(Options, Errors);
verify_options([{partial_upload, WindowSize} | Options], Errors)
when is_integer(WindowSize), WindowSize >= 0 ->
@@ -574,6 +594,8 @@ verify_options([{partial_download, DownloadOptions} | Options], Errors)
verify_options([{connect_options, List} | Options], Errors)
when is_list(List) ->
verify_options(Options, Errors);
+verify_options([{stream_to, Pid} | Options], Errors) when is_pid(Pid) ->
+ verify_options(Options, Errors);
verify_options([Option | Options], Errors) ->
verify_options(Options, [Option | Errors]);
verify_options([], []) ->
View
84 src/lhttpc_client.erl
@@ -29,22 +29,20 @@
%%% @doc
%%% This module implements the HTTP request handling. This should normally
%%% not be called directly since it should be spawned by the lhttpc module.
-%%% @end
-module(lhttpc_client).
--export([request/10, request_with_timeout/2]).
+-export([request/10]).
-include("lhttpc_types.hrl").
-record(client_state, {
- req_id :: tuple(),
+ req_id :: term(),
host :: string(),
port = 80 :: integer(),
ssl = false :: true | false,
method :: string(),
request :: iolist(),
request_headers :: headers(),
- load_balancer:: pid(),
socket,
connect_timeout = infinity :: timeout(),
connect_options = [] :: [any()],
@@ -63,15 +61,9 @@
-define(CONNECTION_HDR(HDRS, DEFAULT),
string:to_lower(lhttpc_lib:header_value("connection", HDRS, DEFAULT))).
-request_with_timeout(Timeout, [ReqId, StreamTo, _Host, _Port, _Ssl, _Path, _Method, _Hdrs, _Body, _Options] = Args) ->
- TimerRef = erlang:send_after(Timeout, lhttpc_manager, {kill_client_after_timeout, ReqId, self(), StreamTo}),
- ok = apply(?MODULE, request, Args),
- erlang:cancel_timer(TimerRef).
-
--spec request(tuple(), pid(), string(), 1..65535, true | false, string(),
+-spec request(term(), pid(), string(), 1..65535, true | false, string(),
string() | atom(), headers(), iolist(), [option()]) -> no_return().
-%% @spec (ReqId, From, Host, Port, Ssl, Path, Method, Hdrs, RequestBody, Options) -> ok
-%% ReqId = tuple()
+%% @spec (From, Host, Port, Ssl, Path, Method, Hdrs, RequestBody, Options) -> ok
%% From = pid()
%% Host = string()
%% Port = integer()
@@ -108,14 +100,16 @@ execute(ReqId, From, Host, Port, Ssl, Path, Method, Hdrs, Body, Options) ->
PartialUpload = proplists:is_defined(partial_upload, Options),
PartialDownload = proplists:is_defined(partial_download, Options),
PartialDownloadOptions = proplists:get_value(partial_download, Options, []),
- ConnectOptions = proplists:get_value(connect_options, Options, []),
NormalizedMethod = lhttpc_lib:normalize_method(Method),
MaxConnections = proplists:get_value(max_connections, Options, 10),
ConnectionTimeout = proplists:get_value(connection_timeout, Options, infinity),
{ChunkedUpload, Request} = lhttpc_lib:format_request(Path, NormalizedMethod,
Hdrs, Host, Port, Body, PartialUpload),
- LbRequest = {lb, Host, Port, Ssl, MaxConnections, ConnectionTimeout},
- {ok, Lb} = gen_server:call(lhttpc_manager, LbRequest, infinity),
+ Socket = case lhttpc_lb:checkout(Host, Port, Ssl, MaxConnections, ConnectionTimeout) of
+ {ok, S} -> S; % Re-using HTTP/1.1 connections
+ retry_later -> throw(retry_later);
+ no_socket -> undefined % Opening a new HTTP/1.1 connection
+ end,
State = #client_state{
req_id = ReqId,
host = Host,
@@ -125,10 +119,10 @@ execute(ReqId, From, Host, Port, Ssl, Path, Method, Hdrs, Body, Options) ->
request = Request,
requester = From,
request_headers = Hdrs,
- load_balancer = Lb,
+ socket = Socket,
connect_timeout = proplists:get_value(connect_timeout, Options,
infinity),
- connect_options = ConnectOptions,
+ connect_options = proplists:get_value(connect_options, Options, []),
attempts = 1 + proplists:get_value(send_retry, Options, 1),
partial_upload = PartialUpload,
upload_window = UploadWindowSize,
@@ -143,56 +137,59 @@ execute(ReqId, From, Host, Port, Ssl, Path, Method, Hdrs, Body, Options) ->
{R, undefined} ->
{ok, R};
{R, NewSocket} ->
- case lhttpc_sock:controlling_process(NewSocket, Lb, Ssl) of
- ok ->
- gen_server:cast(Lb, {store, NewSocket});
- _ ->
- ok
- end,
+ % The socket we ended up doing the request over is returned
+ % here, it might be the same as Socket, but we don't know.
+ lhttpc_lb:checkin(Host, Port, Ssl, NewSocket),
{ok, R}
end,
{response, ReqId, self(), Response}.
send_request(#client_state{attempts = 0}) ->
+ % Don't try again if the number of allowed attempts is 0.
throw(connection_closed);
send_request(#client_state{socket = undefined} = State) ->
+ Host = State#client_state.host,
+ Port = State#client_state.port,
+ Ssl = State#client_state.ssl,
+ Timeout = State#client_state.connect_timeout,
ConnectOptions = State#client_state.connect_options,
- ConnectTimeout = State#client_state.connect_timeout,
- Lb = State#client_state.load_balancer,
- SocketRequest = {socket, self(), ConnectOptions, ConnectTimeout},
- case gen_server:call(Lb, SocketRequest, infinity) of
+ SocketOptions = [binary, {packet, http}, {active, false} | ConnectOptions],
+ case lhttpc_sock:connect(Host, Port, SocketOptions, Timeout, Ssl) of
{ok, Socket} ->
- lhttpc_sock:setopts(Socket, [{active, false}], State#client_state.ssl),
send_request(State#client_state{socket = Socket});
+ {error, etimedout} ->
+ % TCP stack decided to give up
+ throw(connect_timeout);
+ {error, timeout} ->
+ throw(connect_timeout);
{error, Reason} ->
- throw(Reason)
+ erlang:error(Reason)
end;
send_request(State) ->
- Lb = State#client_state.load_balancer,
Socket = State#client_state.socket,
Ssl = State#client_state.ssl,
Request = State#client_state.request,
case lhttpc_sock:send(Socket, Request, Ssl) of
ok ->
if
- State#client_state.partial_upload -> partial_upload(State);
+ State#client_state.partial_upload -> partial_upload(State);
not State#client_state.partial_upload -> read_response(State)
end;
{error, closed} ->
- gen_server:cast(Lb, {remove, Socket}),
+ lhttpc_sock:close(Socket, Ssl),
NewState = State#client_state{
socket = undefined,
attempts = State#client_state.attempts - 1
},
send_request(NewState);
{error, Reason} ->
- gen_server:cast(Lb, {remove, Socket}),
+ lhttpc_sock:close(Socket, Ssl),
erlang:error(Reason)
end.
partial_upload(State) ->
Response = {ok, {self(), State#client_state.upload_window}},
- State#client_state.requester ! {response, State#client_state.req_id, self(), Response},
+ State#client_state.requester ! {response,State#client_state.req_id, self(), Response},
partial_upload_loop(State#client_state{attempts = 1, request = undefined}).
partial_upload_loop(State = #client_state{requester = Pid}) ->
@@ -233,16 +230,15 @@ encode_body_part(#client_state{chunked_upload = false}, Data) ->
check_send_result(_State, ok) ->
ok;
-check_send_result(#client_state{socket = Socket, load_balancer = Lb}, {error, Reason}) ->
- gen_server:cast(Lb, {remove, Socket}),
+check_send_result(#client_state{socket = Sock, ssl = Ssl}, {error, Reason}) ->
+ lhttpc_sock:close(Sock, Ssl),
throw(Reason).
read_response(#client_state{socket = Socket, ssl = Ssl} = State) ->
lhttpc_sock:setopts(Socket, [{packet, http}], Ssl),
read_response(State, nil, {nil, nil}, []).
read_response(State, Vsn, {StatusCode, _} = Status, Hdrs) ->
- Lb = State#client_state.load_balancer,
Socket = State#client_state.socket,
Ssl = State#client_state.ssl,
case lhttpc_sock:recv(Socket, Ssl) of
@@ -265,22 +261,22 @@ read_response(State, Vsn, {StatusCode, _} = Status, Hdrs) ->
Response = handle_response_body(State, Vsn, Status, Hdrs),
NewHdrs = element(2, Response),
ReqHdrs = State#client_state.request_headers,
- NewSocket = maybe_close_socket(Lb, Socket, Vsn, ReqHdrs, NewHdrs),
+ NewSocket = maybe_close_socket(Socket, Ssl, Vsn, ReqHdrs, NewHdrs),
{Response, NewSocket};
{error, closed} ->
% Either we only noticed that the socket was closed after we
% sent the request, the server closed it just after we put
% the request on the wire or the server has some issues and is
% closing connections without sending responses.
% If this the first attempt to send the request, we will try again.
- gen_server:cast(Lb, {remove, Socket}),
+ lhttpc_sock:close(Socket, Ssl),
NewState = State#client_state{
socket = undefined,
attempts = State#client_state.attempts - 1
},
send_request(NewState);
{error, timeout} ->
- gen_server:cast(Lb, {remove, Socket}),
+ lhttpc_sock:close(Socket, Ssl),
NewState = State#client_state{
socket = undefined,
attempts = 0
@@ -622,22 +618,22 @@ read_until_closed(Socket, Acc, Hdrs, Ssl) ->
erlang:error(Reason)
end.
-maybe_close_socket(Lb, Socket, {1, Minor}, ReqHdrs, RespHdrs) when Minor >= 1->
+maybe_close_socket(Socket, Ssl, {1, Minor}, ReqHdrs, RespHdrs) when Minor >= 1->
ClientConnection = ?CONNECTION_HDR(ReqHdrs, "keep-alive"),
ServerConnection = ?CONNECTION_HDR(RespHdrs, "keep-alive"),
if
ClientConnection =:= "close"; ServerConnection =:= "close" ->
- gen_server:cast(Lb, {remove, Socket}),
+ lhttpc_sock:close(Socket, Ssl),
undefined;
ClientConnection =/= "close", ServerConnection =/= "close" ->
Socket
end;
-maybe_close_socket(Lb, Socket, _, ReqHdrs, RespHdrs) ->
+maybe_close_socket(Socket, Ssl, _, ReqHdrs, RespHdrs) ->
ClientConnection = ?CONNECTION_HDR(ReqHdrs, "keep-alive"),
ServerConnection = ?CONNECTION_HDR(RespHdrs, "close"),
if
ClientConnection =:= "close"; ServerConnection =/= "keep-alive" ->
- gen_server:cast(Lb, {remove, Socket}),
+ lhttpc_sock:close(Socket, Ssl),
undefined;
ClientConnection =/= "close", ServerConnection =:= "keep-alive" ->
Socket
View
387 src/lhttpc_lb.erl
@@ -1,195 +1,250 @@
+%%% Load balancer for lhttpc, replacing the older lhttpc_manager.
+%%% Takes a similar stance of storing used-but-not-closed sockets.
+%%% Also adds functionality to limit the number of simultaneous
+%%% connection attempts from clients.
-module(lhttpc_lb).
+-behaviour(gen_server).
+-export([start_link/5, checkout/5, checkin/3, checkin/4]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ code_change/3, terminate/2]).
--export([
- start_link/1
- ]).
--export([
- init/1,
- handle_call/3,
- handle_cast/2,
- handle_info/2,
- code_change/3,
- terminate/2
- ]).
+-define(SHUTDOWN_DELAY, 10000).
--behaviour(gen_server).
+%% TODO: transfert_socket, in case of checkout+give_away
--record(httpc_man, {
- host :: string(),
- port = 80 :: integer(),
- ssl = false :: true | false,
- max_connections = 10 :: non_neg_integer(),
- connection_timeout = 300000 :: non_neg_integer(),
- sockets,
- available_sockets = []
- }).
-
-%% @spec (any()) -> {ok, pid()}
-%% @doc Starts and link to the gen server.
-%% This is normally called by a supervisor.
-%% @end
--spec start_link(any()) -> {ok, pid()}.
-start_link([Dest, Opts]) ->
- gen_server:start_link(?MODULE, [Dest, Opts], []).
-
-%% @hidden
--spec init(any()) -> {ok, #httpc_man{}}.
-init([{Host, Port, Ssl}, {MaxConnections, ConnectionTimeout}]) ->
- State = #httpc_man{
- host = Host,
- port = Port,
- ssl = Ssl,
- max_connections = MaxConnections,
- connection_timeout = ConnectionTimeout,
- sockets = ets:new(sockets, [set])
- },
- {ok, State}.
+-record(state, {host :: host(),
+ port :: port(),
+ ssl :: boolean(),
+ max_conn :: max_connections(),
+ timeout :: timeout(),
+ clients :: ets:tid(),
+ free=[] :: list()}).
+
+-export_types([host/0, port/0, max_connections/0, connection_timeout/0]).
+-type host() :: inet:ip_address()|string().
+-type port_number() :: 1..65535.
+-type max_connections() :: pos_integer().
+-type connection_timeout() :: timeout().
+
+
+-spec start_link(host(), port_number(), SSL::boolean(),
+ max_connections(), connection_timeout()) -> {ok, pid()}.
+start_link(Host, Port, Ssl, MaxConn, ConnTimeout) ->
+ gen_server:start_link(?MODULE, {Host, Port, Ssl, MaxConn, ConnTimeout}, []).
+
+-spec checkout(host(), port_number(), SSL::boolean(),
+ max_connections(), connection_timeout()) ->
+ {ok, port()} | retry_later | no_socket.
+checkout(Host, Port, Ssl, MaxConn, ConnTimeout) ->
+ Lb = find_lb({Host,Port,Ssl}, {MaxConn, ConnTimeout}),
+ gen_server:call(Lb, {checkout, self()}, infinity).
+
+%% Called when the socket has died and we're done
+-spec checkin(host(), port_number(), SSL::boolean()) -> ok.
+checkin(Host, Port, Ssl) ->
+ case find_lb({Host,Port,Ssl}) of
+ {error, undefined} -> ok; % LB is dead. Pretend it's fine -- we don't care
+ {ok, Pid} -> gen_server:cast(Pid, {checkin, self()})
+ end.
+
+%% Called when we're done and the socket can still be reused
+-spec checkin(host(), port_number(), SSL::boolean(), Socket::port()) -> ok.
+checkin(Host, Port, Ssl, Socket) ->
+ case find_lb({Host,Port,Ssl}) of
+ {error, undefined} ->
+ %% should we close the socket? We're not keeping it! There are no
+ %% Lbs available!
+ ok;
+ {ok, Pid} ->
+ %% Give ownership back to the server ASAP. The client has to have
+ %% kept the socket passive. We rely on its good behaviour.
+ %% If the transfer doesn't work, we don't notify.
+ case lhttpc_sock:controlling_process(Socket, Pid, Ssl) of
+ ok -> gen_server:cast(Pid, {checkin, self(), Socket});
+ _ -> ok
+ end
+ end.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%%% GEN_SERVER CALLBACKS %%%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+init({Host,Port,Ssl,MaxConn,ConnTimeout}) ->
+ %% we must use insert_new because it is possible a concurrent request is
+ %% starting such a server at exactly the same time.
+ case ets:insert_new(?MODULE, {{Host,Port,Ssl}, self()}) of
+ true ->
+ {ok, #state{host=Host,
+ port=Port,
+ ssl=Ssl,
+ max_conn=MaxConn,
+ timeout=ConnTimeout,
+ clients=ets:new(clients, [set, private])}};
+ false ->
+ ignore
+ end.
+
+handle_call({checkout,Pid}, _From, S = #state{free=[], max_conn=Max, clients=Tid}) ->
+ Size = ets:info(Tid, size),
+ case Max > Size of
+ true ->
+ Ref = erlang:monitor(process, Pid),
+ ets:insert(Tid, {Pid, Ref}),
+ {reply, no_socket, S};
+ false ->
+ {reply, retry_later, S}
+ end;
+handle_call({checkout,Pid}, _From, S = #state{free=[{Taken,Timer}|Free], clients=Tid, ssl=Ssl}) ->
+ lhttpc_sock:setopts(Taken, [{active,false}], Ssl),
+ case lhttpc_sock:controlling_process(Taken, Pid, Ssl) of
+ ok ->
+ cancel_timer(Timer, Taken),
+ add_client(Tid,Pid),
+ {reply, {ok, Taken}, S#state{free=Free}};
+ {error, badarg} ->
+ %% The caller died.
+ lhttpc_sock:setopts(Taken, [{active, once}], Ssl),
+ {noreply, S};
+ {error, _Reason} -> % socket is closed or something
+ cancel_timer(Timer, Taken),
+ handle_call({checkout,Pid}, _From, S#state{free=Free})
+ end;
+handle_call(_Msg, _From, S) ->
+ {noreply, S}.
-%% @hidden
--spec handle_call(any(), any(), #httpc_man{}) ->
- {reply, any(), #httpc_man{}}.
-handle_call({socket, Pid, ConnectOptions, ConnectTimeout}, _, State) ->
- {Reply, NewState} = find_socket(Pid, ConnectOptions, ConnectTimeout, State),
- {reply, Reply, NewState};
-handle_call(_, _, State) ->
- {reply, {error, unknown_request}, State}.
-
-%% @hidden
--spec handle_cast(any(), #httpc_man{}) -> {noreply, #httpc_man{}}.
-handle_cast({store, Socket}, State) ->
- NewState = store_socket(Socket, State),
- {noreply, NewState};
-handle_cast({remove, Socket}, State) ->
- NewState = remove_socket(Socket, State),
- {noreply, NewState};
-handle_cast({terminate}, State) ->
- terminate(undefined, State),
- {noreply, State};
-handle_cast(_, State) ->
+handle_cast({checkin, Pid}, S = #state{clients=Tid}) ->
+ remove_client(Tid, Pid),
+ noreply_maybe_shutdown(S);
+handle_cast({checkin, Pid, Socket}, S = #state{ssl=Ssl, clients=Tid, free=Free, timeout=T}) ->
+ remove_client(Tid, Pid),
+ %% the client cast function took care of giving us ownership
+ case lhttpc_sock:setopts(Socket, [{active, once}], Ssl) of
+ ok ->
+ Timer = start_timer(Socket,T),
+ {noreply, S#state{free=[{Socket,Timer}|Free]}};
+ {error, _E} -> % socket closed or failed
+ noreply_maybe_shutdown(S)
+ end;
+handle_cast(_Msg, State) ->
{noreply, State}.
-%% @hidden
--spec handle_info(any(), #httpc_man{}) -> {noreply, #httpc_man{}}.
+handle_info({'DOWN', _Ref, process, Pid, _Reason}, S=#state{clients=Tid}) ->
+ %% Client died
+ remove_client(Tid,Pid),
+ noreply_maybe_shutdown(S);
handle_info({tcp_closed, Socket}, State) ->
- {noreply, remove_socket(Socket, State)};
+ noreply_maybe_shutdown(remove_socket(Socket,State));
handle_info({ssl_closed, Socket}, State) ->
- {noreply, remove_socket(Socket, State)};
+ noreply_maybe_shutdown(remove_socket(Socket,State));
handle_info({timeout, Socket}, State) ->
- {noreply, remove_socket(Socket, State)};
+ noreply_maybe_shutdown(remove_socket(Socket,State));
handle_info({tcp_error, Socket, _}, State) ->
- {noreply, remove_socket(Socket, State)};
+ noreply_maybe_shutdown(remove_socket(Socket,State));
handle_info({ssl_error, Socket, _}, State) ->
- {noreply, remove_socket(Socket, State)};
+ noreply_maybe_shutdown(remove_socket(Socket,State));
handle_info({tcp, Socket, _}, State) ->
- {noreply, remove_socket(Socket, State)}; % got garbage
+ noreply_maybe_shutdown(remove_socket(Socket,State));
handle_info({ssl, Socket, _}, State) ->
- {noreply, remove_socket(Socket, State)}; % got garbage
-handle_info(_, State) ->
+ noreply_maybe_shutdown(remove_socket(Socket,State));
+handle_info(timeout, State) ->
+ {stop, normal, State};
+handle_info(_Info, State) ->
{noreply, State}.
-%% @hidden
--spec terminate(any(), #httpc_man{}) -> ok.
-terminate(_, State) ->
- close_sockets(State#httpc_man.sockets, State#httpc_man.ssl).
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
-%% @hidden
--spec code_change(any(), #httpc_man{}, any()) -> #httpc_man{}.
-code_change(_, State, _) ->
- State.
+terminate(_Reason, #state{host=H, port=P, ssl=Ssl, free=Free, clients=Tid}) ->
+ ets:delete(Tid),
+ ets:delete(?MODULE,{H,P,Ssl}),
+ [lhttpc_sock:close(Socket,Ssl) || Socket <- Free],
+ ok.
-find_socket(Pid, ConnectOptions, ConnectTimeout, State = #httpc_man{host=Host, port=Port, ssl=Ssl, sockets=Tid}) ->
- case State#httpc_man.available_sockets of
- [Socket|Available] ->
- case lhttpc_sock:controlling_process(Socket, Pid, Ssl) of
- ok ->
- [{Socket,Timer}] = ets:lookup(Tid, Socket),
- cancel_timer(Timer, Socket),
- NewState = State#httpc_man{available_sockets = Available},
- {{ok, Socket}, NewState};
- {error, badarg} ->
- lhttpc_sock:setopts(Socket, [{active, once}], Ssl),
- {{error, no_pid}, State};
- {error, _Reason} ->
- NewState = State#httpc_man{available_sockets = Available},
- find_socket(Pid, ConnectOptions, ConnectTimeout, remove_socket(Socket, NewState))
- end;
+%%%%%%%%%%%%%%%
+%%% PRIVATE %%%
+%%%%%%%%%%%%%%%
+
+%% Potential race condition: if the lb shuts itself down after a while, it
+%% might happen between a read and the use of the pid. A busy load balancer
+%% should not have this problem.
+-spec find_lb(Name::{host(),port_number(),boolean()}, {max_connections(), connection_timeout()}) -> pid().
+find_lb(Name = {Host,Port,Ssl}, Args={MaxConn, ConnTimeout}) ->
+ case ets:lookup(?MODULE, Name) of
[] ->
- MaxSockets = State#httpc_man.max_connections,
- Size = ets:info(Tid, size),
- Failures = case get('#fail') of
- undefined -> 0;
- Fail -> Fail
- end,
- case Failures > MaxSockets of
+ case supervisor:start_child(lhttpc_sup, [Host,Port,Ssl,MaxConn,ConnTimeout]) of
+ {ok, undefined} -> find_lb(Name,Args);
+ {ok, Pid} -> Pid
+ end;
+ [{_Name, Pid}] ->
+ case is_process_alive(Pid) of % lb died, stale entry
+ true -> Pid;
false ->
- case MaxSockets > Size andalso Size =/= undefined of
- true ->
- SocketOptions = [binary, {packet, http}, {active, false} | ConnectOptions],
- case lhttpc_sock:connect(Host, Port, SocketOptions, ConnectTimeout, Ssl) of
- {ok, Socket} ->
- put('#fail', 0),
- find_socket(Pid, ConnectOptions, ConnectTimeout, store_socket(Socket, State));
- {error, etimedout} ->
- {{error, sys_timeout}, State};
- {error, timeout} ->
- {{error, timeout}, State};
- %% client not answering
- {error, econnrefused} ->
- if Failures < (MaxSockets*2) -> put('#fail', Failures+1);
- true -> ok
- end,
- {{error, econnrefused}, State};
- {error, Reason} ->
- {{error, Reason}, State}
- end;
- false ->
- {{error, retry_later}, State}
- end;
- true ->
- put('#fail', Failures-2),
- {{error, offline}, State}
+ ets:delete(?MODULE, Name),
+ find_lb(Name,Args)
end
end.
-remove_socket(Socket, State = #httpc_man{sockets=Tid, ssl=Ssl}) ->
- case ets:lookup(Tid, Socket) of
- [{_,Timer}] ->
- cancel_timer(Timer, Socket),
- lhttpc_sock:close(Socket, Ssl),
- ets:delete(Tid, Socket);
- [] ->
- ok
- end,
- State.
-
-store_socket(Socket, State = #httpc_man{connection_timeout=Timeout, ssl=Ssl, sockets=Tid}) ->
- Timer = case Timeout of
- infinity -> undefined;
- _Other -> erlang:send_after(Timeout, self(), {timeout, Socket})
- end,
- lhttpc_sock:setopts(Socket, [{active, once}], Ssl),
- ets:insert(Tid, {Socket, Timer}),
- State#httpc_man{available_sockets = [Socket|State#httpc_man.available_sockets]}.
-
-close_sockets(Sockets, Ssl) ->
- ets:foldl(
- fun({Socket, undefined}, _) ->
- lhttpc_sock:close(Socket, Ssl);
- ({Socket, Timer}, _) ->
- erlang:cancel_timer(Timer),
- lhttpc_sock:close(Socket, Ssl)
- end, ok, Sockets
- ).
-
-cancel_timer(undefined, _Socket) ->
- ok;
-cancel_timer(Timer, Socket) ->
- case erlang:cancel_timer(Timer) of
+%% Version of the function to be used when we don't want to start a load balancer
+%% if none is found
+-spec find_lb(Name::{host(),port_number(),boolean()}) -> {error,undefined} | {ok,pid()}.
+find_lb(Name={_Host,_Port,_Ssl}) ->
+ case ets:lookup(?MODULE, Name) of
+ [] -> {error, undefined};
+ [{_Name, Pid}] ->
+ case erlang:is_process_alive(Pid) of
+ true -> {ok, Pid};
+ false -> % lb died, stale entry
+ ets:delete(?MODULE,Name),
+ {error, undefined}
+ end
+ end.
+
+-spec add_client(ets:tid(), pid()) -> true.
+add_client(Tid, Pid) ->
+ Ref = erlang:monitor(process, Pid),
+ ets:insert(Tid, {Pid, Ref}).
+
+-spec remove_client(ets:tid(), pid()) -> true.
+remove_client(Tid, Pid) ->
+ case ets:lookup(Tid, Pid) of
+ [] -> ok; % client already removed
+ [{_Pid, Ref}] ->
+ erlang:demonitor(Ref, [flush]),
+ ets:delete(Tid, Pid)
+ end.
+
+-spec remove_socket(port(), #state{}) -> #state{}.
+remove_socket(Socket, S = #state{ssl=Ssl, free=Free}) ->
+ lhttpc_sock:close(Socket, Ssl),
+ S#state{free=drop_and_cancel(Socket,Free)}.
+
+-spec drop_and_cancel(port(), [{port(), reference()}]) -> [{port(), reference()}].
+drop_and_cancel(_, []) -> [];
+drop_and_cancel(Socket, [{Socket, TimerRef} | Rest]) ->
+ cancel_timer(TimerRef, Socket),
+ Rest;
+drop_and_cancel(Socket, [H|T]) ->
+ [H | drop_and_cancel(Socket, T)].
+
+-spec cancel_timer(reference(), port()) -> ok.
+cancel_timer(TimerRef, Socket) ->
+ case erlang:cancel_timer(TimerRef) of
false ->
receive
{timeout, Socket} -> ok
- after
- 0 -> ok
+ after 0 -> ok
end;
_ -> ok
end.
+
+-spec start_timer(port(), connection_timeout()) -> reference().
+start_timer(_, infinity) -> make_ref(); % dummy timer
+start_timer(Socket, Timeout) ->
+ erlang:send_after(Timeout, self(), {timeout,Socket}).
+
+noreply_maybe_shutdown(S=#state{clients=Tid, free=Free}) ->
+ case Free =:= [] andalso ets:info(Tid, size) =:= 0 of
+ true -> % we're done for
+ {noreply,S,?SHUTDOWN_DELAY};
+ false ->
+ {noreply, S}
+ end.
View
3 src/lhttpc_lib.erl
@@ -28,7 +28,6 @@
%%% @author Oscar Hellström <oscar@hellstrom.st>
%%% @doc
%%% This module implements various library functions used in lhttpc.
-%%% @end
-module(lhttpc_lib).
-export([
@@ -177,7 +176,7 @@ format_hdrs(Headers) ->
format_hdrs([{Hdr, Value} | T], Acc) ->
NewAcc = [
- maybe_atom_to_list(Hdr), ":", maybe_atom_to_list(Value), "\r\n" | Acc
+ maybe_atom_to_list(Hdr), ": ", maybe_atom_to_list(Value), "\r\n" | Acc
],
format_hdrs(T, NewAcc);
format_hdrs([], Acc) ->
View
125 src/lhttpc_manager.erl
@@ -1,125 +0,0 @@
-%%% ----------------------------------------------------------------------------
-%%% Copyright (c) 2009, Erlang Training and Consulting Ltd.
-%%% All rights reserved.
-%%%
-%%% Redistribution and use in source and binary forms, with or without
-%%% modification, are permitted provided that the following conditions are met:
-%%% * Redistributions of source code must retain the above copyright
-%%% notice, this list of conditions and the following disclaimer.
-%%% * Redistributions in binary form must reproduce the above copyright
-%%% notice, this list of conditions and the following disclaimer in the
-%%% documentation and/or other materials provided with the distribution.
-%%% * Neither the name of Erlang Training and Consulting Ltd. nor the
-%%% names of its contributors may be used to endorse or promote products
-%%% derived from this software without specific prior written permission.
-%%%
-%%% THIS SOFTWARE IS PROVIDED BY Erlang Training and Consulting Ltd. ''AS IS''
-%%% AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-%%% IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-%%% ARE DISCLAIMED. IN NO EVENT SHALL Erlang Training and Consulting Ltd. BE
-%%% LIABLE SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
-%%% BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
-%%% WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
-%%% OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
-%%% ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-%%% ----------------------------------------------------------------------------
-
-%%% @author Oscar Hellström <oscar@hellstrom.st>
-%%% @doc Connection manager for the HTTP client.
-%%% This gen_server is responsible for keeping track of persistent
-%%% connections to HTTP servers. The only interesting API is
-%%% `connection_count/0' and `connection_count/1'.
-%%% The gen_server is supposed to be started by a supervisor, which is
-%%% normally {@link lhttpc_sup}.
-%%% @end
--module(lhttpc_manager).
-
--export([
- start_link/0
- ]).
--export([
- init/1,
- handle_call/3,
- handle_cast/2,
- handle_info/2,
- code_change/3,
- terminate/2
- ]).
-
--behaviour(gen_server).
-
--record(httpc_man, {
- destinations = dict:new()
- }).
-
-%% @spec () -> {ok, pid()}
-%% @doc Starts and link to the gen server.
-%% This is normally called by a supervisor.
-%% @end
--spec start_link() -> {ok, pid()} | {error, allready_started}.
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, nil, []).
-
-%% @hidden
--spec init(any()) -> {ok, #httpc_man{}}.
-init(_) ->
- process_flag(priority, high),
- {ok, #httpc_man{}}.
-
-%% @hidden
--spec handle_call(any(), any(), #httpc_man{}) ->
- {reply, any(), #httpc_man{}}.
-handle_call({lb, Host, Port, Ssl, MaxConnections, ConnectionTimeout}, _, State) ->
- {Reply, NewState} = find_lb({Host, Port, Ssl}, {MaxConnections, ConnectionTimeout}, State),
- {reply, Reply, NewState};
-handle_call(_, _, State) ->
- {reply, {error, unknown_request}, State}.
-
-%% @hidden
--spec handle_cast(any(), #httpc_man{}) -> {noreply, #httpc_man{}}.
-handle_cast(_, State) ->
- {noreply, State}.
-
-%% @hidden
--spec handle_info(any(), #httpc_man{}) -> {noreply, #httpc_man{}}.
-handle_info({kill_client_after_timeout, ReqId, Pid, StreamTo}, State) ->
- case erlang:is_process_alive(Pid) of
- true ->
- exit(Pid, kill),
- StreamTo ! {response, ReqId, Pid, {error, timeout}};
- false -> ok
- end,
- {noreply, State};
-handle_info(_, State) ->
- {noreply, State}.
-
-%% @hidden
--spec terminate(any(), #httpc_man{}) -> ok.
-terminate(_, State) ->
- close_lbs(State#httpc_man.destinations).
-
-%% @hidden
--spec code_change(any(), #httpc_man{}, any()) -> #httpc_man{}.
-code_change(_, State, _) ->
- State.
-
-find_lb(Dest, Options, State) ->
- Dests = State#httpc_man.destinations,
- case dict:find(Dest, Dests) of
- {ok, Lb} ->
- {{ok, Lb}, State};
- error ->
- {ok, Pid} = lhttpc_lb:start_link([Dest, Options]),
- NewState = State#httpc_man{
- destinations = update_dest(Dest, Pid, Dests)
- },
- {{ok, Pid}, NewState}
- end.
-
-update_dest(Destination, Lb, Destinations) ->
- dict:store(Destination, Lb, Destinations).
-
-close_lbs(Destinations) ->
- lists:foreach(fun({_Dest, Lb}) ->
- gen_server:cast(Lb, {terminate})
- end, dict:to_list(Destinations)).
View
3 src/lhttpc_sock.erl
@@ -29,7 +29,6 @@
%%% @doc
%%% This module implements wrappers for socket operations.
%%% Makes it possible to have the same interface to ssl and tcp sockets.
-%%% @end
-module(lhttpc_sock).
-export([
@@ -80,7 +79,7 @@ connect(Host, Port, Options, Timeout, false) ->
recv(Socket, true) ->
ssl:recv(Socket, 0);
recv(Socket, false) ->
- inet_tcp:recv(Socket, 0).
+ gen_tcp:recv(Socket, 0).
%% @spec (Socket, Length, SslFlag) -> {ok, Data} | {error, Reason}
%% Socket = socket()
View
25 src/lhttpc_sup.erl
@@ -32,7 +32,7 @@
-module(lhttpc_sup).
-behaviour(supervisor).
--export([start_link/0]).
+-export([start_link/0, start_link/1]).
-export([init/1]).
-type child() :: {atom(), {atom(), atom(), list(any)},
@@ -46,12 +46,23 @@
%% @end
-spec start_link() -> {ok, pid()} | {error, atom()}.
start_link() ->
- supervisor:start_link(?MODULE, nil).
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+start_link(Args) ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, Args).
%% @hidden
-spec init(any()) -> {ok, {{atom(), integer(), integer()}, [child()]}}.
-init(_) ->
- LHTTPCManager = {lhttpc_manager, {lhttpc_manager, start_link, []},
- permanent, 10000, worker, [lhttpc_manager]
- },
- {ok, {{one_for_one, 10, 1}, [LHTTPCManager]}}.
+init(Opts) ->
+ init_ets(Opts),
+ {ok, {{simple_one_for_one, 10, 1}, [
+ {load_balancer,
+ {lhttpc_lb, start_link, []},
+ transient, 10000, worker, [lhttpc_lb]}
+ ]}}.
+
+init_ets(Opts) ->
+ ETSOpts = proplists:get_value(ets, Opts, []),
+ %% Only option supported so far -- others do not really make sense at this point
+ ReadConc = {read_concurrency, proplists:get_value(read_concurrency, ETSOpts, false)},
+ ets:new(lhttpc_lb, [named_table, set, public, ReadConc]).
View
29 test/lhttpc_tests.erl
@@ -134,6 +134,7 @@ tcp_test_() ->
?_test(bad_url()),
?_test(persistent_connection()),
?_test(request_timeout()),
+ ?_test(connection_timeout()),
?_test(suspended_manager()),
?_test(chunked_encoding()),
?_test(partial_upload_identity()),
@@ -151,7 +152,8 @@ tcp_test_() ->
?_test(partial_download_smallish_chunks()),
?_test(partial_download_slow_chunks()),
?_test(close_connection()),
- ?_test(message_queue())
+ ?_test(message_queue()),
+ ?_test(connection_count()) % just check that it's 0 (last)
]}
}.
@@ -160,7 +162,8 @@ ssl_test_() ->
{setup, fun start_app/0, fun stop_app/1, [
?_test(ssl_get()),
?_test(ssl_post()),
- ?_test(ssl_chunked())
+ ?_test(ssl_chunked()),
+ ?_test(connection_count()) % just check that it's 0 (last)
]}
}.
@@ -376,6 +379,18 @@ request_timeout() ->
URL = url(Port, "/slow"),
?assertEqual({error, timeout}, lhttpc:request(URL, get, [], 50)).
+connection_timeout() ->
+ Port = start(gen_tcp, [fun simple_response/5, fun simple_response/5]),
+ URL = url(Port, "/close_conn"),
+ lhttpc_manager:update_connection_timeout(50), % very short keep alive
+ {ok, Response} = lhttpc:request(URL, get, [], 100),
+ ?assertEqual({200, "OK"}, status(Response)),
+ ?assertEqual(<<?DEFAULT_STRING>>, body(Response)),
+ timer:sleep(100),
+ ?assertEqual(0,
+ lhttpc_manager:connection_count({"localhost", Port, false})),
+ lhttpc_manager:update_connection_timeout(300000). % set back
+
suspended_manager() ->
Port = start(gen_tcp, [fun simple_response/5, fun simple_response/5]),
URL = url(Port, "/persistent"),
@@ -386,6 +401,8 @@ suspended_manager() ->
true = erlang:suspend_process(Pid),
?assertEqual({error, timeout}, lhttpc:request(URL, get, [], 50)),
true = erlang:resume_process(Pid),
+ ?assertEqual(1,
+ lhttpc_manager:connection_count({"localhost", Port, false})),
{ok, SecondResponse} = lhttpc:request(URL, get, [], 50),
?assertEqual({200, "OK"}, status(SecondResponse)),
?assertEqual(<<?DEFAULT_STRING>>, body(SecondResponse)).
@@ -467,7 +484,7 @@ partial_upload_chunked() ->
?assertEqual(<<?DEFAULT_STRING>>, body(Response1)),
?assertEqual("This is chunky stuff!",
lhttpc_lib:header_value("x-test-orig-body", headers(Response1))),
- ?assertEqual(element(2, Trailer),
+ ?assertEqual(element(2, Trailer),
lhttpc_lib:header_value("x-test-orig-trailer-1", headers(Response1))),
% Make sure it works with no body part in the original request as well
Headers = [{"Transfer-Encoding", "chunked"}],
@@ -480,7 +497,7 @@ partial_upload_chunked() ->
?assertEqual(<<?DEFAULT_STRING>>, body(Response2)),
?assertEqual("This is chunky stuff!",
lhttpc_lib:header_value("x-test-orig-body", headers(Response2))),
- ?assertEqual(element(2, Trailer),
+ ?assertEqual(element(2, Trailer),
lhttpc_lib:header_value("x-test-orig-trailer-1", headers(Response2))).
partial_upload_chunked_no_trailer() ->
@@ -664,6 +681,10 @@ ssl_chunked() ->
?assertEqual("2", lhttpc_lib:header_value("Trailer-2",
headers(SecondResponse))).
+connection_count() ->
+ timer:sleep(50), % give the TCP stack time to deliver messages
+ ?assertEqual(0, lhttpc_manager:connection_count()).
+
invalid_options() ->
?assertError({bad_options, [{foo, bar}, bad_option]},
lhttpc:request("http://localhost/", get, [], <<>>, 1000,

0 comments on commit 68ab97c

Please sign in to comment.