Skip to content

Commit

Permalink
Changed pipeline algo to smallest pipeline first
Browse files Browse the repository at this point in the history
Big commit. Switched algorithm to one which will favor
the connection with the smallest pipeline first
(deciding ties by timestamp of last finished request,
and then by pid as ultimate tie breaker).

Note: this also drastically changes the internal
representation of the connection in ets and is dependent
on specific order of operations when changing key values
to limit risk of race conditions between loadbalancer
and a given connection.

Also removed connection reporting of start of request
as this was no longer necessary since the load balancer
tees up the entry into ets with a 1.
  • Loading branch information
benjaminplee committed Nov 20, 2014
1 parent 3061aa2 commit 9d0b7e3
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 60 deletions.
44 changes: 17 additions & 27 deletions src/ibrowse_http_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -762,11 +762,10 @@ send_req_1(From,
{ok, _Sent_body} ->
trace_request_body(Body_1),
_ = active_once(State_1),
State_1_1 = inc_pipeline_counter(State_1),
State_2 = State_1_1#state{status = get_header,
cur_req = NewReq,
proxy_tunnel_setup = in_progress,
tunnel_setup_queue = [{From, Url, Headers, Method, Body, Options, Timeout}]},
State_2 = State_1#state{status = get_header,
cur_req = NewReq,
proxy_tunnel_setup = in_progress,
tunnel_setup_queue = [{From, Url, Headers, Method, Body, Options, Timeout}]},
State_3 = set_inac_timer(State_2),
{noreply, State_3};
Err ->
Expand Down Expand Up @@ -853,15 +852,14 @@ send_req_1(From,
Raw_req = list_to_binary([Req, Sent_body]),
NewReq_1 = NewReq#request{raw_req = Raw_req},
State_1 = State#state{reqs=queue:in(NewReq_1, State#state.reqs)},
State_2 = inc_pipeline_counter(State_1),
_ = active_once(State_2),
State_3 = case Status of
_ = active_once(State_1),
State_2 = case Status of
idle ->
State_2#state{
State_1#state{
status = get_header,
cur_req = NewReq_1};
_ ->
State_2
State_1
end,
case StreamTo of
undefined ->
Expand All @@ -875,8 +873,8 @@ send_req_1(From,
catch StreamTo ! {ibrowse_async_raw_req, Raw_req}
end
end,
State_4 = set_inac_timer(State_3),
{noreply, State_4};
State_3 = set_inac_timer(State_2),
{noreply, State_3};
Err ->
shutting_down(State),
do_trace("Send failed... Reason: ~p~n", [Err]),
Expand Down Expand Up @@ -1815,13 +1813,13 @@ format_response_data(Resp_format, Body) ->
do_reply(State, From, undefined, _, Resp_format, {ok, St_code, Headers, Body}) ->
Msg_1 = {ok, St_code, Headers, format_response_data(Resp_format, Body)},
gen_server:reply(From, Msg_1),
dec_pipeline_counter(State);
report_request_complete(State);
do_reply(State, From, undefined, _, _, Msg) ->
gen_server:reply(From, Msg),
dec_pipeline_counter(State);
report_request_complete(State);
do_reply(#state{prev_req_id = Prev_req_id} = State,
_From, StreamTo, ReqId, Resp_format, {ok, _, _, Body}) ->
State_1 = dec_pipeline_counter(State),
State_1 = report_request_complete(State),
case Body of
[] ->
ok;
Expand All @@ -1843,7 +1841,7 @@ do_reply(#state{prev_req_id = Prev_req_id} = State,
ets:delete(?STREAM_TABLE, {req_id_pid, Prev_req_id}),
State_1#state{prev_req_id = ReqId};
do_reply(State, _From, StreamTo, ReqId, Resp_format, Msg) ->
State_1 = dec_pipeline_counter(State),
State_1 = report_request_complete(State),
Msg_1 = format_response_data(Resp_format, Msg),
catch StreamTo ! {ibrowse_async_response, ReqId, Msg_1},
State_1.
Expand Down Expand Up @@ -1946,19 +1944,11 @@ shutting_down(#state{lb_ets_tid = undefined}) ->
shutting_down(#state{lb_ets_tid = Tid}) ->
ibrowse_lb:report_connection_down(Tid).

inc_pipeline_counter(#state{is_closing = true} = State) ->
State;
inc_pipeline_counter(#state{lb_ets_tid = undefined} = State) ->
State;
inc_pipeline_counter(#state{lb_ets_tid = Tid} = State) ->
ibrowse_lb:report_request_underway(Tid),
State.

dec_pipeline_counter(#state{is_closing = true} = State) ->
report_request_complete(#state{is_closing = true} = State) ->
State;
dec_pipeline_counter(#state{lb_ets_tid = undefined} = State) ->
report_request_complete(#state{lb_ets_tid = undefined} = State) ->
State;
dec_pipeline_counter(#state{lb_ets_tid = Tid} = State) ->
report_request_complete(#state{lb_ets_tid = Tid} = State) ->
ibrowse_lb:report_request_complete(Tid),
State.

Expand Down
71 changes: 46 additions & 25 deletions src/ibrowse_lb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
spawn_connection/6,
stop/1,
report_connection_down/1,
report_request_underway/1,
report_request_complete/1
]).

Expand All @@ -39,6 +38,9 @@
proc_state}).

-define(PIPELINE_MAX, 99999).
-define(KEY_MATCHSPEC_BY_PID(Pid), [{{{'_', '_', Pid}, '_'}, [], ['$_']}]).
-define(KEY_MATCHSPEC(Key), [{{Key, '_'}, [], ['$_']}]).
-define(KEY_MATCHSPEC_FOR_DELETE(Key), [{{Key, '_'}, [], [true]}]).

-include("ibrowse.hrl").

Expand Down Expand Up @@ -74,13 +76,23 @@ stop(Lb_pid) ->
end.

report_connection_down(Tid) ->
catch ets:delete(Tid, self()).

report_request_underway(Tid) ->
catch ets:update_counter(Tid, self(), {2, 1, ?PIPELINE_MAX, ?PIPELINE_MAX}).
%% Don't cascade errors since Tid is really managed by other process
catch ets:select_delete(Tid, ?KEY_MATCHSPEC_BY_PID(self())).

report_request_complete(Tid) ->
catch ets:update_counter(Tid, self(), {2, -1, 0, 0}).
%% Don't cascade errors since Tid is really managed by other process
catch case ets:select(Tid, ?KEY_MATCHSPEC_BY_PID(self())) of
[MatchKey] ->
case ets:select_delete(Tid, ?KEY_MATCHSPEC_FOR_DELETE(MatchKey)) of
1 ->
ets:insert(Tid, {decremented(MatchKey), undefined}),
true;
_ ->
false
end;
_ ->
false
end.

%%====================================================================
%% Server functions
Expand Down Expand Up @@ -210,23 +222,17 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
find_best_connection(Tid, Max_pipe) ->
find_best_connection(ets:first(Tid), Tid, Max_pipe).

find_best_connection('$end_of_table', _, _) ->
{error, retry_later};
find_best_connection(Pid, Tid, Max_pipe) ->
case ets:lookup(Tid, Pid) of
[{Pid, Cur_sz}] when Cur_sz < Max_pipe ->
case record_request_for_connection(Tid, Pid) of
{'EXIT', _} ->
%% The selected process has shutdown
find_best_connection(ets:next(Tid, Pid), Tid, Max_pipe);
_ ->
{ok, Pid}
find_best_connection(Tid, Max_pipeline_size) ->
case ets:first(Tid) of
{Size, _Timestamp, Pid} = Key when Size < Max_pipeline_size ->
case record_request_for_connection(Tid, Key) of
true ->
{ok, Pid};
false ->
find_best_connection(Tid, Max_pipeline_size)
end;
_ ->
find_best_connection(ets:next(Tid, Pid), Tid, Max_pipe)
_ ->
{error, retry_later}
end.

maybe_create_ets(#state{ets_tid = undefined} = State) ->
Expand All @@ -240,10 +246,25 @@ num_current_connections(Tid) ->
catch ets:info(Tid, size).

record_new_connection(Tid, Pid) ->
catch ets:insert(Tid, {Pid, 0}).
catch ets:insert(Tid, {new_key(Pid), undefined}).

record_request_for_connection(Tid, Key) ->
case ets:select_delete(Tid, ?KEY_MATCHSPEC_FOR_DELETE(Key)) of
1 ->
ets:insert(Tid, {incremented(Key), undefined}),
true;
_ ->
false
end.

new_key(Pid) ->
{1, os:timestamp(), Pid}.

incremented({Size, Timestamp, Pid}) ->
{Size + 1, Timestamp, Pid}.

record_request_for_connection(Tid, Pid) ->
catch ets:update_counter(Tid, Pid, {2, 1, ?PIPELINE_MAX, ?PIPELINE_MAX}).
decremented({Size, _Timestamp, Pid}) ->
{Size - 1, os:timestamp(), Pid}.

for_each_connection_pid(Tid, Fun) ->
catch ets:foldl(fun({Pid, _}, _) -> Fun(Pid) end, undefined, Tid),
Expand Down
11 changes: 3 additions & 8 deletions test/ibrowse_functional_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,10 @@ balanced_connections() ->

timer:sleep(1000),

Diffs = [Count - BalancedNumberOfRequestsPerConnection || {_Pid, Count} <- ibrowse_test_server:get_conn_pipeline_depth()],
?assertEqual(MaxSessions, length(Diffs)),
Counts = [Count || {_Pid, Count} <- ibrowse_test_server:get_conn_pipeline_depth()],
?assertEqual(MaxSessions, length(Counts)),

lists:foreach(fun(X) -> ?assertEqual(yep, close_to_zero(X)) end, Diffs).

close_to_zero(0) -> yep;
close_to_zero(-1) -> yep;
close_to_zero(1) -> yep;
close_to_zero(X) -> {nope, X}.
?assertEqual(lists:duplicate(MaxSessions, BalancedNumberOfRequestsPerConnection), Counts).

times(0, _) ->
ok;
Expand Down

0 comments on commit 9d0b7e3

Please sign in to comment.