Permalink
Browse files

Adding control flow on connections refused

When many connections were being refused, the load balancer
would impose no good control flow mecanism on incoming requests.
After a while, demand can overtake the process and grow the
message queue until the VM goes out of memory.

This patch adds a counter of refused connections (happens when
the server is down); if too many connections are refusd in a row
(as many as the possible sockets), some of the requests will be
blocked and will return {error, offline}.

Whenever a successful request is made, the counter is dropped.

The patch also contains a few minute optimizations for record
assignment, gaining minimal amounts of speed.
  • Loading branch information...
ferd committed Nov 21, 2011
1 parent 7d4a202 commit bbecd1d47d3bc658b40985abe2751804618a3fd4
Showing with 43 additions and 30 deletions.
  1. +43 −30 src/lhttpc_lb.erl
View
@@ -97,15 +97,12 @@ terminate(_, State) ->
code_change(_, State, _) ->
State.
-find_socket(Pid, ConnectOptions, ConnectTimeout, State) ->
- Host = State#httpc_man.host,
- Port = State#httpc_man.port,
- Ssl = State#httpc_man.ssl,
+find_socket(Pid, ConnectOptions, ConnectTimeout, State = #httpc_man{host=Host, port=Port, ssl=Ssl, sockets=Tid}) ->
case State#httpc_man.available_sockets of
[Socket|Available] ->
case lhttpc_sock:controlling_process(Socket, Pid, Ssl) of
ok ->
- [{Socket,Timer}] = ets:lookup(State#httpc_man.sockets, Socket),
+ [{Socket,Timer}] = ets:lookup(Tid, Socket),
cancel_timer(Timer, Socket),
NewState = State#httpc_man{available_sockets = Available},
{{ok, Socket}, NewState};
@@ -118,44 +115,60 @@ find_socket(Pid, ConnectOptions, ConnectTimeout, State) ->
end;
[] ->
MaxSockets = State#httpc_man.max_connections,
- Size = ets:info(State#httpc_man.sockets, size),
- case MaxSockets > Size andalso Size =/= undefined of
- true ->
- SocketOptions = [binary, {packet, http}, {active, false} | ConnectOptions],
- case lhttpc_sock:connect(Host, Port, SocketOptions, ConnectTimeout, Ssl) of
- {ok, Socket} ->
- find_socket(Pid, ConnectOptions, ConnectTimeout, store_socket(Socket, State));
- {error, etimedout} ->
- {{error, sys_timeout}, State};
- {error, timeout} ->
- {{error, timeout}, State};
- {error, Reason} ->
- {{error, Reason}, State}
+ Size = ets:info(Tid, size),
+ Failures = case get('#fail') of
+ undefined -> 0;
+ Fail -> Fail
+ end,
+ case Failures > MaxSockets of
+ false ->
+ case MaxSockets > Size andalso Size =/= undefined of
+ true ->
+ SocketOptions = [binary, {packet, http}, {active, false} | ConnectOptions],
+ case lhttpc_sock:connect(Host, Port, SocketOptions, ConnectTimeout, Ssl) of
+ {ok, Socket} ->
+ put('#fail', 0),
+ find_socket(Pid, ConnectOptions, ConnectTimeout, store_socket(Socket, State));
+ {error, etimedout} ->
+ {{error, sys_timeout}, State};
+ {error, timeout} ->
+ {{error, timeout}, State};
+ %% client not answering
+ {error, econnrefused} ->
+ if Failures < (MaxSockets*2) -> put('#fail', Failures+1);
+ true -> ok
+ end,
+ {{error, econnrefused}, State};
+ {error, Reason} ->
+ {{error, Reason}, State}
+ end;
+ false ->
+ {{error, retry_later}, State}
end;
- false ->
- {{error, retry_later}, State}
- end
+ true ->
+ put('#fail', Failures-2),
+ {{error, offline}, State}
+ end
end.
-remove_socket(Socket, State) ->
- case ets:lookup(State#httpc_man.sockets, Socket) of
- [{Socket,Timer}] ->
+remove_socket(Socket, State = #httpc_man{sockets=Tid, ssl=Ssl}) ->
+ case ets:lookup(Tid, Socket) of
+ [{_,Timer}] ->
cancel_timer(Timer, Socket),
- lhttpc_sock:close(Socket, State#httpc_man.ssl),
- ets:delete(State#httpc_man.sockets, Socket);
+ lhttpc_sock:close(Socket, Ssl),
+ ets:delete(Tid, Socket);
[] ->
ok
end,
State.
-store_socket(Socket, State) ->
- Timeout = State#httpc_man.connection_timeout,
+store_socket(Socket, State = #httpc_man{connection_timeout=Timeout, ssl=Ssl, sockets=Tid}) ->
Timer = case Timeout of
infinity -> undefined;
_Other -> erlang:send_after(Timeout, self(), {timeout, Socket})
end,
- lhttpc_sock:setopts(Socket, [{active, once}], State#httpc_man.ssl),
- ets:insert(State#httpc_man.sockets, {Socket, Timer}),
+ lhttpc_sock:setopts(Socket, [{active, once}], Ssl),
+ ets:insert(Tid, {Socket, Timer}),
State#httpc_man{available_sockets = [Socket|State#httpc_man.available_sockets]}.
close_sockets(Sockets, Ssl) ->

0 comments on commit bbecd1d

Please sign in to comment.