Skip to content

Commit

Permalink
Adding backlog detection so we only send sync messages when necessary
Browse files Browse the repository at this point in the history
  • Loading branch information
Kevin Smith committed Dec 17, 2010
1 parent 813c95e commit 1352ba7
Showing 1 changed file with 40 additions and 10 deletions.
50 changes: 40 additions & 10 deletions src/luke_phase.erl
Expand Up @@ -20,6 +20,9 @@

-include_lib("eunit/include/eunit.hrl").

-define(BUFFER_INPUT_CHECK, 1000).
-define(MAX_BUFFERED_INPUTS, 500).

%% API
-export([start_link/7,
complete/0,
Expand Down Expand Up @@ -49,6 +52,7 @@
partners,
next_phases,
done_count=1,
input_count=0,
flow,
flow_timeout}).

Expand Down Expand Up @@ -218,29 +222,55 @@ route_output(Output, #state{converge=true, lead_partner=Lead}=State) when is_pid
%% to the next phase. Accumulation is only true for the lead
%% process of a converging phase
route_output(Output, #state{id=Id, converge=true, accumulate=Accumulate, lead_partner=undefined,
flow=Flow, next_phases=Next}=State) ->
flow=Flow}=State) ->
if
Accumulate =:= true ->
luke_phases:send_flow_results(Flow, Id, Output);
true ->
ok
end,
RotatedNext = propagate_inputs(Next, Output),
State#state{next_phases=RotatedNext};
propagate_inputs(State, Output);

%% Route output to the next phase. Accumulate output
%% to the flow if accumulation is turned on.
route_output(Output, #state{id=Id, converge=false, accumulate=Accumulate, flow=Flow, next_phases=Next} = State) ->
route_output(Output, #state{id=Id, converge=false, accumulate=Accumulate, flow=Flow} = State) ->
if
Accumulate =:= true ->
luke_phases:send_flow_results(Flow, Id, Output);
true ->
ok
end,
RotatedNext = propagate_inputs(Next, Output),
State#state{next_phases=RotatedNext}.
propagate_inputs(State, Output).

propagate_inputs(#state{next_phases=undefined}=State, _Results) ->
State;
propagate_inputs(#state{next_phases=Next, input_count=InputCount0}=State, Results) ->
{InputCount, UseSync} = case InputCount0 of
?BUFFER_INPUT_CHECK ->
{0, needs_sync(Next)};
_ ->
{InputCount0 + 1, false}
end,
RotatedNext = case UseSync of
true ->
luke_phases:send_sync_inputs(Next, Results, infinity);
false ->
luke_phases:send_inputs(Next, Results)
end,
State#state{next_phases=RotatedNext, input_count=InputCount};
propagate_inputs(Targets, Results) ->
luke_phases:send_inputs(Targets, Results).

propagate_inputs(undefined, _Results) ->
undefined;
propagate_inputs(Next, Results) ->
luke_phases:send_sync_inputs(Next, Results, infinity).
needs_sync([]) ->
false;
needs_sync([H]) ->
{message_queue_len, Len} = erlang:process_info(H, message_queue_len),
Len > ?MAX_BUFFERED_INPUTS;
needs_sync([H|T]) ->
{message_queue_len, Len} = erlang:process_info(H, message_queue_len),
case Len > ?MAX_BUFFERED_INPUTS of
true ->
true;
false ->
needs_sync(T)
end.

0 comments on commit 1352ba7

Please sign in to comment.