Skip to content

Commit

Permalink
Allow vnode init to happen in parallel
Browse files Browse the repository at this point in the history
Added an extra state to riak_core_vnode to decouple vnode initialization
from process creation.
Added a function to block until initialization is truly finished.
Vnode init call now takes a list of indices or single index.
The list version creates vnode processes first, then waits for them to
initialize, which allows it to happen in parallel.
Used the above in riak_core_ring_handler when starting services
on first ring event.
Tests show I/O saturation at bitcask startup now, instead of the
serialized trickle we had.
This code still needs some work to handle edge cases, specially around
how to handle vnode initialization failing.

Add concurrency control to vnode initialization
New pmap with bounded concurrency utility added with unit tests.

Fix cluster into to query vnode manager, not sup

With the parallel vnode change, there is more reason to not query the
supervisor directly, as its children may not have finished
initialization yet.
  • Loading branch information
engelsanchez committed Mar 8, 2013
1 parent c08df0d commit 0ae5839
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 21 deletions.
3 changes: 1 addition & 2 deletions src/riak_core_cinfo_core.erl
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ latest_ringfile(CPid) ->
cluster_info:format(CPid, "File contents:\n~p\n", [binary_to_term(Contents)]).

active_partitions(CPid) ->
Pids = [Pid || {_,Pid,_,_} <- supervisor:which_children(riak_core_vnode_sup)],
Vnodes = [riak_core_vnode:get_mod_index(Pid) || Pid <- Pids],
Vnodes = [{Mod, Idx} || {Mod, Idx, _Pid} <- riak_core_vnode_manager:all_vnodes()],
Partitions = lists:foldl(fun({_,P}, Ps) ->
ordsets:add_element(P, Ps)
end, ordsets:new(), Vnodes),
Expand Down
2 changes: 1 addition & 1 deletion src/riak_core_ring_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ ensure_vnodes_started({App,Mod}, Ring) ->
case riak_core:wait_for_application(App) of
ok ->
%% Start the vnodes.
[Mod:start_vnode(I) || I <- Startable],
Mod:start_vnode(Startable),

%% Mark the service as up.
SupName = list_to_atom(atom_to_list(App) ++ "_sup"),
Expand Down
143 changes: 143 additions & 0 deletions src/riak_core_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
rpc_every_member/4,
rpc_every_member_ann/4,
pmap/2,
pmap/3,
multi_rpc/4,
multi_rpc/5,
multi_rpc_ann/4,
Expand All @@ -51,6 +52,7 @@

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-export([counter_loop/1,incr_counter/1,decr_counter/1]).
-endif.

%% R14 Compatibility
Expand Down Expand Up @@ -259,6 +261,80 @@ pmap(F, L) ->
{_, L3} = lists:unzip(lists:keysort(1, L2)),
L3.

-record(pmap_acc,{
mapper,
fn,
n_pending=0,
pending=[],
n_done=0,
done=[],
max_concurrent=1
}).

%% @doc Parallel map with a cap on the number of concurrent worker processes.
%% Note: Worker processes are linked to the parent, so a crash propagates.
-spec pmap(Fun::function(), List::list(), MaxP::integer()) -> list().
pmap(Fun, List, MaxP) when MaxP < 1 ->
pmap(Fun, List, 1);
pmap(Fun, List, MaxP) when is_function(Fun), is_list(List), is_integer(MaxP) ->
Mapper = self(),
#pmap_acc{pending=Pending, done=Done} =
lists:foldl(fun pmap_worker/2,
#pmap_acc{mapper=Mapper,
fn=Fun,
max_concurrent=MaxP},
List),
% Collect pending work
Collect =
fun(Pid, Acc) ->
receive
{pmap_result, Pid, R} ->
[R|Acc]
end
end,
All = lists:foldl(Collect, Done, Pending),
% Restore input order
Sorted = lists:sort(fun({I1, _}, {I2, _}) -> I1 < I2 end, All),
lists:map(fun({_, R}) -> R end, Sorted).

%% @doc Fold function for {@link pmap/3} that spawns up to a max number of
%% workers to execute the mapping function over the input list.
pmap_worker(X, Acc = #pmap_acc{n_pending=NP,
pending=Pending,
n_done=ND,
max_concurrent=MaxP,
mapper=Mapper,
fn=Fn})
when NP < MaxP ->
Worker =
spawn_link(fun() ->
R = Fn(X),
Mapper ! {pmap_result, self(), {NP+ND, R}}
end),
Acc#pmap_acc{n_pending=NP+1, pending=[Worker|Pending]};
pmap_worker(X, Acc = #pmap_acc{n_pending=NP,
pending=Pending,
n_done=ND,
done=Done,
max_concurrent=MaxP})
when NP == MaxP ->
{Result, NewPending} = pmap_collect_one(Pending),
pmap_worker(X, Acc#pmap_acc{n_pending=NP-1, pending=NewPending,
n_done=ND+1, done=[Result|Done]}).

%% @doc Waits for one pending pmap task to finish
pmap_collect_one(Pending = [_First|_More]) ->
receive
{pmap_result, Pid, Result} ->
case lists:member(Pid, Pending) of
true ->
{Result, lists:delete(Pid, Pending)};
false ->
pmap_collect_one(Pending)
end
end.


%% @spec rpc_every_member(atom(), atom(), [term()], integer()|infinity)
%% -> {Results::[term()], BadNodes::[node()]}
%% @doc Make an RPC call to the given module and function on each
Expand Down Expand Up @@ -454,5 +530,72 @@ build_tree_test() ->
?assertEqual(CTree, build_tree(2, Flat, [cycles])),
ok.


counter_loop(N) ->
receive
{up, Pid} ->
N2=N+1,
Pid ! {counter_value, N2},
counter_loop(N2);
down ->
counter_loop(N-1);
exit ->
exit(normal)
end.

incr_counter(CounterPid) ->
CounterPid ! {up, self()},
receive
{counter_value, N} -> N
after
3000 ->
?assert(false)
end.

decr_counter(CounterPid) ->
CounterPid ! down.

bounded_pmap_test_() ->
Fun1 = fun(X) -> X+2 end,
Tests =
fun(CountPid) ->
GFun = fun(Max) ->
fun(X) ->
?assert(incr_counter(CountPid) =< Max),
timer:sleep(1),
decr_counter(CountPid),
Fun1(X)
end
end,
[
fun() ->
?assertEqual(lists:seq(Fun1(1), Fun1(N)),
pmap(GFun(MaxP),
lists:seq(1, N), MaxP))
end ||
MaxP <- lists:seq(1,20),
N <- lists:seq(0,10)
]
end,
{setup,
fun() ->
Pid = spawn_link(?MODULE, counter_loop, [0]),
monitor(process, Pid),
Pid
end,
fun(Pid) ->
Pid ! exit,
receive
{'DOWN', _Ref, process, Pid, _Info} -> ok
after
3000 ->
?debugMsg("pmap counter process did not go down in time"),
?assert(false)
end,
ok
end,
Tests
}.

-endif.

41 changes: 35 additions & 6 deletions src/riak_core_vnode.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
-export([behaviour_info/1]).
-export([start_link/3,
start_link/4,
wait_for_init/1,
send_command/2,
send_command_after/2]).
-export([init/1,
started/2,
started/3,
active/2,
active/3,
handle_event/3,
Expand Down Expand Up @@ -95,7 +98,6 @@ behaviour_info(_Other) ->

-define(DEFAULT_TIMEOUT, 60000).
-define(LOCK_RETRY_TIMEOUT, 10000).
-define(MODSTATE, State#state{mod=Mod,modstate=ModState}).
-record(state, {
index :: partition(),
mod :: module(),
Expand All @@ -106,7 +108,8 @@ behaviour_info(_Other) ->
pool_pid :: pid() | undefined,
pool_config :: tuple() | undefined,
manager_event_timer :: reference(),
inactivity_timeout}).
inactivity_timeout :: non_neg_integer()
}).

start_link(Mod, Index, Forward) ->
start_link(Mod, Index, 0, Forward).
Expand All @@ -130,14 +133,37 @@ send_command_after(Time, Request) ->

init([Mod, Index, InitialInactivityTimeout, Forward]) ->
process_flag(trap_exit, true),
State = #state{index=Index, mod=Mod, forward=Forward,
inactivity_timeout=InitialInactivityTimeout},
{ok, started, State, 0}.

started(timeout, State =
#state{inactivity_timeout=InitialInactivityTimeout}) ->
case do_init(State) of
{ok, State2} ->
{next_state, active, State2, InitialInactivityTimeout};
{error, Reason} ->
{stop, Reason}
end.

started(wait_for_init, _From, State =
#state{inactivity_timeout=InitialInactivityTimeout}) ->
case do_init(State) of
{ok, State2} ->
{reply, ok, active, State2, InitialInactivityTimeout};
{error, Reason} ->
{stop, Reason}
end.

do_init(State = #state{index=Index, mod=Mod, forward=Forward}) ->
{ModState, Props} = case Mod:init([Index]) of
{ok, MS} -> {MS, []};
{ok, MS, P} -> {MS, P};
{error, R} -> {error, R}
end,
case {ModState, Props} of
{error, Reason} ->
{stop, Reason};
{error, Reason};
_ ->
case lists:keyfind(pool, 1, Props) of
{pool, WorkerModule, PoolSize, WorkerArgs}=PoolConfig ->
Expand All @@ -153,12 +179,15 @@ init([Mod, Index, InitialInactivityTimeout, Forward]) ->
end,
riak_core_handoff_manager:remove_exclusion(Mod, Index),
Timeout = app_helper:get_env(riak_core, vnode_inactivity_timeout, ?DEFAULT_TIMEOUT),
State = #state{index=Index, mod=Mod, modstate=ModState, forward=Forward,
inactivity_timeout=Timeout, pool_pid=PoolPid, pool_config=PoolConfig},
State2 = State#state{modstate=ModState, inactivity_timeout=Timeout,
pool_pid=PoolPid, pool_config=PoolConfig},
lager:debug("vnode :: ~p/~p :: ~p~n", [Mod, Index, Forward]),
{ok, active, State, InitialInactivityTimeout}
{ok, State2}
end.

wait_for_init(Vnode) ->
gen_fsm:sync_send_event(Vnode, wait_for_init, infinity).

handoff_error(Vnode, Err, Reason) ->
gen_fsm:send_event(Vnode, {handoff_error, Err, Reason}).

Expand Down
51 changes: 39 additions & 12 deletions src/riak_core_vnode_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
-define(XFER_COMPLETE(X), X#xfer_status.status == complete).
-define(DEFAULT_OWNERSHIP_TRIGGER, 8).
-define(ETS, ets_vnode_mgr).
-define(DEFAULT_VNODE_ROLLING_START, 16).

%% ===================================================================
%% Public API
Expand Down Expand Up @@ -439,7 +440,8 @@ handle_info(management_tick, State) ->
State2#state{repairs=[]}
end,

MaxStart = app_helper:get_env(riak_core, vnode_rolling_start, 16),
MaxStart = app_helper:get_env(riak_core, vnode_rolling_start,
?DEFAULT_VNODE_ROLLING_START),
State4 = State3#state{vnode_start_tokens=MaxStart},
State5 = maybe_start_vnodes(Ring, State4),

Expand Down Expand Up @@ -510,17 +512,42 @@ delmon(MonRef, _State=#state{idxtab=T}) ->
add_vnode_rec(I, _State=#state{idxtab=T}) -> ets:insert(T,I).

%% @private
get_vnode(Idx, Mod, State) ->
case idx2vnode(Idx, Mod, State) of
no_match ->
ForwardTo = get_forward(Mod, Idx, State),
{ok, Pid} = riak_core_vnode_sup:start_vnode(Mod, Idx, ForwardTo),
MonRef = erlang:monitor(process, Pid),
add_vnode_rec(#idxrec{key={Idx,Mod},idx=Idx,mod=Mod,pid=Pid,
monref=MonRef}, State),
Pid;
X -> X
end.
-spec get_vnode(Idx::integer() | [integer()], Mod::term(), State:: #state{}) ->
pid() | [pid()].
get_vnode(Idx, Mod, State) when not is_list(Idx) ->
[Result] = get_vnode([Idx], Mod, State),
Result;
get_vnode(IdxList, Mod, State) ->
Initial =
[case idx2vnode(Idx, Mod, State) of
no_match -> Idx;
Pid -> {Idx, Pid}
end
|| Idx <- IdxList],
{NotStarted, Started} = lists:partition(fun erlang:is_integer/1, Initial),
StartFun =
fun(Idx) ->
ForwardTo = get_forward(Mod, Idx, State),
lager:debug("Will start VNode for partition ~p", [Idx]),
{ok, Pid} =
riak_core_vnode_sup:start_vnode(Mod, Idx, ForwardTo),
lager:debug("Started VNode, waiting for initialization to complete ~p, ~p ", [Pid, Idx]),
ok = riak_core_vnode:wait_for_init(Pid),
lager:debug("VNode initialization ready ~p, ~p", [Pid, Idx]),
{Idx, Pid}
end,
MaxStart = app_helper:get_env(riak_core, vnode_rolling_start,
?DEFAULT_VNODE_ROLLING_START),
Pairs = Started ++ riak_core_util:pmap(StartFun, NotStarted, MaxStart),
% Return Pids in same order as input
[begin
{_, Pid} = lists:keyfind(Idx, 1, Pairs),
MonRef = erlang:monitor(process, Pid),
add_vnode_rec(#idxrec{key={Idx,Mod},idx=Idx,mod=Mod,pid=Pid,
monref=MonRef}, State),
Pid
end || Idx <- IdxList].


get_forward(Mod, Idx, #state{forwarding=Fwd}) ->
case orddict:find({Mod, Idx}, Fwd) of
Expand Down

0 comments on commit 0ae5839

Please sign in to comment.