Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge pull request #163 from basho/gh154-handoff-after-restart

  • Loading branch information...
commit a15029452b11fb85653412ce6b2226da453a6eee 2 parents 3119736 + e2aa4dc
@jtuple jtuple authored
Showing with 63 additions and 4 deletions.
  1. +63 −4 src/riak_core_vnode_manager.erl
View
67 src/riak_core_vnode_manager.erl
@@ -41,7 +41,10 @@
-record(idxrec, {key, idx, mod, pid, monref}).
-record(state, {idxtab,
forwarding :: [pid()],
- handoff :: [{term(), integer(), pid(), node()}]
+ handoff :: [{term(), integer(), pid(), node()}],
+ known_modules :: [term()],
+ never_started :: [{integer(), term()}],
+ vnode_start_tokens :: integer()
}).
-define(DEFAULT_OWNERSHIP_TRIGGER, 8).
@@ -107,7 +110,8 @@ get_tab() ->
init(_State) ->
{ok, Ring} = riak_core_ring_manager:get_raw_ring(),
Mods = [Mod || {_, Mod} <- riak_core:vnode_modules()],
- State = #state{forwarding=[], handoff=[]},
+ State = #state{forwarding=[], handoff=[],
+ known_modules=[], never_started=[], vnode_start_tokens=0},
State2 = find_vnodes(State),
AllVNodes = get_all_vnodes(Mods, State2),
State3 = update_forwarding(AllVNodes, Mods, Ring, State2),
@@ -215,6 +219,15 @@ handle_cast(management_tick, State) ->
AllVNodes = get_all_vnodes(Mods, State),
State2 = update_handoff(AllVNodes, Ring, State),
trigger_ownership_handoff(Mods, Ring, State2),
+
+ MaxStart = app_helper:get_env(riak_core, vnode_rolling_start, 16),
+ State3 = State2#state{vnode_start_tokens=MaxStart},
+ State4 = maybe_start_vnodes(Ring, State3),
+ {noreply, State4};
+
+handle_cast(maybe_start_vnodes, State) ->
+ {ok, Ring} = riak_core_ring_manager:get_raw_ring(),
+ State2 = maybe_start_vnodes(Ring, State),
{noreply, State2};
handle_cast(_, State) ->
@@ -251,7 +264,10 @@ code_change(_OldVsn, State, _Extra) ->
%% ===================================================================
schedule_management_timer() ->
- timer:apply_after(30000, gen_server, cast, [?MODULE, management_tick]).
+ ManagementTick = app_helper:get_env(riak_core,
+ vnode_management_timer,
+ 10000),
+ timer:apply_after(ManagementTick, gen_server, cast, [?MODULE, management_tick]).
trigger_ownership_handoff(Mods, Ring, State) ->
Transfers = riak_core_ring:pending_changes(Ring),
@@ -474,4 +490,47 @@ get_all_vnodes_status(State=#state{forwarding=Forwarding, handoff=HO}) ->
end, Types2, [Pids2, Forwarding2, Handoff2]),
Status.
-
+update_never_started(Ring, State) ->
+ {Indices, _} = lists:unzip(riak_core_ring:all_owners(Ring)),
+ lists:foldl(fun({_App, Mod}, StateAcc) ->
+ case lists:member(Mod, StateAcc#state.known_modules) of
+ false ->
+ update_never_started(Mod, Indices, StateAcc);
+ true ->
+ StateAcc
+ end
+ end, State, riak_core:vnode_modules()).
+
+update_never_started(Mod, Indices, State) ->
+ IdxPids =
+ try
+ get_all_index_pid(Mod, State)
+ catch
+ _:_ -> []
+ end,
+ AlreadyStarted = [Idx || {Idx, _Pid} <- IdxPids],
+ NeverStarted = ordsets:subtract(ordsets:from_list(Indices),
+ ordsets:from_list(AlreadyStarted)),
+ NeverStarted2 = [{Idx, Mod} || Idx <- NeverStarted],
+ NeverStarted3 = NeverStarted2 ++ State#state.never_started,
+ KnownModules = [Mod | State#state.known_modules],
+ State#state{known_modules=KnownModules, never_started=NeverStarted3}.
+
+maybe_start_vnodes(Ring, State) ->
+ State2 = update_never_started(Ring, State),
+ State3 = maybe_start_vnodes(State2),
+ State3.
+
+maybe_start_vnodes(State=#state{vnode_start_tokens=Tokens,
+ never_started=NeverStarted}) ->
+ case {Tokens, NeverStarted} of
+ {0, _} ->
+ State;
+ {_, []} ->
+ State;
+ {_, [{Idx, Mod} | NeverStarted2]} ->
+ get_vnode(Idx, Mod, State),
+ gen_server:cast(?MODULE, maybe_start_vnodes),
+ State#state{vnode_start_tokens=Tokens-1,
+ never_started=NeverStarted2}
+ end.

0 comments on commit a150294

Please sign in to comment.
Something went wrong with that request. Please try again.