Skip to content

Commit

Permalink
Merge branch 'az724-reduce-batch-handoff' into 1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
beerriot committed Oct 6, 2011
2 parents 427ef68 + a992e2e commit 5c4ed64
Showing 1 changed file with 107 additions and 21 deletions.
128 changes: 107 additions & 21 deletions src/riak_kv_w_reduce.erl
Expand Up @@ -84,10 +84,6 @@
%% Mochijson2 conversion will fail on the bare proplist, but will
%% succeed at encoding this form.
%%
%% The exception to the batching controls is handoff. Whenever a
%% worker receives handoff from another worker, it immediately reduces
%% the concatenation of the two inputs.
%%
%% If no inputs are received before eoi, this fitting evaluated the
%% function once, with an empty list as `Inputs'.
%%
Expand All @@ -108,6 +104,10 @@
-export([reduce_compat/1]).
-export([no_input_run_reduce_once/0]).

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.

-include_lib("riak_pipe/include/riak_pipe.hrl").
-include_lib("riak_pipe/include/riak_pipe_log.hrl").

Expand Down Expand Up @@ -144,14 +144,20 @@ init(Partition, #fitting_details{options=Options} = FittingDetails) ->

%% @doc Evaluate the function if the batch is ready.
-spec process(term(), boolean(), state()) -> {ok, state()}.
process(Input, _Last,
#state{acc=PrevAcc, inacc=OldInAcc, delay=Delay, delay_max=DelayMax}=State) ->
process(Input, _Last, #state{inacc=OldInAcc, delay=Delay}=State) ->
InAcc = [Input|OldInAcc],
if Delay + 1 >= DelayMax ->
OutAcc = reduce(PrevAcc ++ lists:reverse(InAcc), State, "reducing"),
maybe_reduce(State#state{inacc=InAcc, delay=Delay+1}, "reducing").

%% @doc Reduce if the accumulated inputs trip the batch size trigger.
-spec maybe_reduce(state(), string()) -> {ok, state()}.
maybe_reduce(#state{acc=PrevAcc, inacc=InAcc,
delay=Delay, delay_max=DelayMax}=State,
Message) ->
if Delay >= DelayMax ->
OutAcc = reduce(PrevAcc ++ lists:reverse(InAcc), State, Message),
{ok, State#state{acc=OutAcc, inacc=[], delay=0}};
true ->
{ok, State#state{inacc=InAcc, delay=Delay + 1}}
{ok, State}
end.

%% @doc Reduce any unreduced inputs, and then send on the outputs.
Expand All @@ -172,19 +178,25 @@ archive(#state{acc=Acc, inacc=InAcc}) ->
%% just send state of reduce so far
{ok, Acc ++ lists:reverse(InAcc)}.

%% @doc Handoff simply concatenates the accumulator from the remote
%% worker with the accumulator from this worker, and immediately
%% reduces the list.
%% @doc Handoff simply concatenates the accumulators from the remote
%% worker with the accumulator from this worker, and then reduces if
%% the resulting accumulator crosses the batch size threshold.
-spec handoff(list(), state()) -> {ok, state()}.
handoff(HandoffAcc, #state{acc=Acc}=State) ->
%% for each Acc, add to local accs;
NewAcc = handoff_acc(HandoffAcc, Acc, State),
{ok, State#state{acc=NewAcc}}.

-spec handoff_acc([term()], [term()], state()) -> [term()].
handoff_acc(HandoffAcc, LocalAcc, State) ->
InAcc = LocalAcc++HandoffAcc,
reduce(InAcc, State, "reducing handoff").
handoff(HandoffAcc, #state{inacc=OldInAcc}=State) ->
%% assume that inputs received by the vnode that was archived were
%% meant to arrive before any inputs received here (because the
%% typical handoff case is that this is a new node taking over)
%%
%% put all incoming unreduced inputs after all local inputs, and
%% then put all reduced inputs reversed after that. this has the
%% best chance of producing the correct order for a reduce phase
%% that was sorting inputs
%% Example: HandoffAcc = Acc ++ InAcc = [1,2,3] ++ [4,5,6]
%% OldInAcc = [9,8,7]
%% InAcc = [9,8,7] ++ [6,5,4] ++ [3,2,1]
InAcc = OldInAcc ++ lists:reverse(HandoffAcc),
maybe_reduce(State#state{inacc=InAcc, delay=length(InAcc)},
"reducing handoff").

%% @doc Actually evaluate the aggregation function.
-spec reduce([term()], state(), string()) ->
Expand Down Expand Up @@ -333,3 +345,77 @@ extract_json_prop (Key, JsonProps) ->
false ->
[]
end.

-ifdef(TEST).

%% This test should check that the reduce function is not called more
%% often than reduce_phase_batch_size or reduce_phase_only_1 request.
batch_size_during_handoff_test() ->
Fun = fun riak_kv_mapreduce:reduce_count_inputs/2,

ReduceEvery5 = [{reduce_phase_batch_size, 5}],
AInputs = [a,b,c],
{ok, StateUnreduced} =
handoff_test_helper(Fun, ReduceEvery5, AInputs, []),
%% handing off three unprocessed inputs to a fresh worker with
%% zero unprocessed inputs should not immediately process the
%% inputs if the batch size is greater than 3
?assertEqual(length(AInputs), StateUnreduced#state.delay),
?assertEqual(lists:reverse(AInputs), StateUnreduced#state.inacc),

{ok, StateFinally} =
test_helper({ok, StateUnreduced}, [e,f]),
%% just two more inputs should still trigger the reduce
?assertEqual(0, StateFinally#state.delay),
?assertEqual([], StateFinally#state.inacc),

BInputs = [e,f,g],
{ok, StateReduced} =
handoff_test_helper(Fun, ReduceEvery5, AInputs, BInputs),
%% handing off three unprocessed inputs to a worker that has three
%% more unprocessed inputs should immediately process the inputs
%% if the batch size is less than or equal to 6
?assertEqual(0, StateReduced#state.delay),
?assertEqual([], StateReduced#state.inacc),

ReduceOnce = [reduce_phase_only_1],
{ok, StateNever} =
handoff_test_helper(Fun, ReduceOnce, AInputs, BInputs),
%% handing off unprocessed inputs when reduce_phase_only_1 is set
%% should never immediately reduce them
?assertEqual(length(AInputs++BInputs), StateNever#state.delay),
?assertEqual(lists:reverse(AInputs++BInputs), StateNever#state.inacc).

%% Start reducer A and reducer B. Feed AInputs to A and BInputs to B,
%% then archive A and handoff its data to B. Returns B's resulting
%% state.
handoff_test_helper(Fun, Arg, AInputs, BInputs) ->
{ok, StateA} = test_helper(Fun, Arg, AInputs),
{ok, StateB} = test_helper(Fun, Arg, BInputs),
{ok, Archive} = archive(StateA),
handoff(Archive, StateB).

%% Initialize a reducer with the given fun and arg, then pass it the
%% list of inputs.
test_helper(Fun, Arg, Inputs) ->
Fitting = #fitting{pid=self(),
ref=make_ref(),
chashfun=fun() -> <<0:160/integer>> end,
nval=1},
Details = #fitting_details{fitting=Fitting,
name=batch_size_during_handoff_test,
module=?MODULE,
arg={rct, Fun, Arg},
output=Fitting,
options=[],
q_limit=64},
test_helper(init(0, Details), Inputs).

%% Pass the list of inputs to the given reducer.
test_helper({ok, State}, Inputs) ->
lists:foldl(
fun(I, {ok, S}) -> process(I, true, S) end,
{ok, State},
Inputs).

-endif.

0 comments on commit 5c4ed64

Please sign in to comment.