Browse files

Refactor riak_core vnode management (part 2)

Move forwarding and handoff decisions from individual vnode processes
and into the vnode manager. The vnode manager makes handoff and
forwarding decisions whenever the ring changes, and triggers vnode
state changes as appropriate.

Rewrite the logic by which per vnode handoff is marked as complete in
the ring. In particular, move the logic from riak_core_gossip and into
riak_core_vnode. The underlying ring changes are still serialized by
riak_core_ring_manager through the ring_trans function.

Change gossip throttling logic to trigger gossip whenever gossip tokens
reset, replacing the gossip interval approach.

Perform various tuning and additional minor changes that improve cluster
operation during a heavy gossip spike.
  • Loading branch information...
1 parent 4b20874 commit 0223283105605253be66ae826fd77a4ec6bcb953 @jtuple jtuple committed Dec 2, 2011
View
55 src/riak_core_gossip.erl
@@ -37,7 +37,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-export ([distribute_ring/1, send_ring/1, send_ring/2, remove_from_cluster/2,
- finish_handoff/4, claim_until_balanced/2, random_gossip/1,
+ claim_until_balanced/2, random_gossip/1,
recursive_gossip/1, random_recursive_gossip/1, rejoin/2,
gossip_version/0, legacy_gossip/0, legacy_gossip/1]).
@@ -80,9 +80,6 @@ start_link() ->
stop() ->
gen_server:cast(?MODULE, stop).
-finish_handoff(Idx, Prev, New, Mod) ->
- gen_server:call(?MODULE, {finish_handoff, Idx, Prev, New, Mod}, infinity).
-
rejoin(Node, Ring) ->
gen_server:cast({?MODULE, Node}, {rejoin, Ring}).
@@ -137,7 +134,6 @@ random_recursive_gossip(Ring) ->
%% @private
init(_State) ->
- schedule_next_gossip(),
schedule_next_reset(),
{ok, Ring} = riak_core_ring_manager:get_raw_ring(),
{Tokens, _} = app_helper:get_env(riak_core, gossip_limit, ?DEFAULT_LIMIT),
@@ -146,35 +142,6 @@ init(_State) ->
gossip_tokens=Tokens}),
{ok, State}.
-
-%% @private
-handle_call({finish_handoff, Idx, Prev, New, Mod}, _From, State) ->
- {ok, Ring} = riak_core_ring_manager:get_raw_ring(),
- Owner = riak_core_ring:index_owner(Ring, Idx),
- {_, NextOwner, Status} = riak_core_ring:next_owner(Ring, Idx, Mod),
- NewStatus = riak_core_ring:member_status(Ring, New),
-
- case {Owner, NextOwner, NewStatus, Status} of
- {_, _, invalid, _} ->
- %% Handing off to invalid node, don't give-up data.
- {reply, continue, State};
- {Prev, New, _, awaiting} ->
- riak_core_ring_manager:ring_trans(
- fun(Ring2, _) ->
- Ring3 = riak_core_ring:handoff_complete(Ring2, Idx, Mod),
- {new_ring, Ring3}
- end, []),
- {reply, forward, State};
- {Prev, New, _, complete} ->
- %% Do nothing
- {reply, continue, State};
- {Prev, _, _, _} ->
- %% Handoff wasn't to node that is scheduled in next, so no change.
- {reply, continue, State};
- {_, _, _, _} ->
- {reply, shutdown, State}
- end;
-
handle_call(legacy_gossip, _From, State) ->
{ok, Ring} = riak_core_ring_manager:get_raw_ring(),
Reply = check_legacy_gossip(Ring, State),
@@ -201,7 +168,6 @@ update_gossip_version(Ring) ->
Ring2
end.
-
known_legacy_gossip(Node, State) ->
case orddict:find(Node, State#state.gossip_versions) of
error ->
@@ -272,9 +238,7 @@ rpc_gossip_version(Ring, Node) ->
handle_cast({send_ring_to, _Node}, State=#state{gossip_tokens=0}) ->
%% Out of gossip tokens, ignore the send request
{noreply, State};
-
handle_cast({send_ring_to, Node}, State) ->
- lager:debug("Sending ring to ~p~n", [Node]),
{ok, MyRing0} = riak_core_ring_manager:get_raw_ring(),
MyRing = update_gossip_version(MyRing0),
GossipVsn = case gossip_version() of
@@ -324,24 +288,14 @@ handle_cast({reconcile_ring, RingIn}, State) ->
handle_cast(reset_tokens, State) ->
schedule_next_reset(),
+ gen_server:cast(?MODULE, gossip_ring),
{Tokens, _} = app_helper:get_env(riak_core, gossip_limit, ?DEFAULT_LIMIT),
{noreply, State#state{gossip_tokens=Tokens}};
handle_cast(gossip_ring, State) ->
- % First, schedule the next round of gossip...
- schedule_next_gossip(),
-
% Gossip the ring to some random other node...
{ok, MyRing} = riak_core_ring_manager:get_raw_ring(),
- %% Ensure vnodes necessary for ownership change are running
- case riak_core_ring:disowning_indices(MyRing, node()) of
- [] ->
- ok;
- _ ->
- riak_core_ring_events:force_update()
- end,
-
random_gossip(MyRing),
{noreply, State};
@@ -379,11 +333,6 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal functions
%% ===================================================================
-schedule_next_gossip() ->
- MaxInterval = app_helper:get_env(riak_core, gossip_interval),
- Interval = random:uniform(MaxInterval),
- timer:apply_after(Interval, gen_server, cast, [?MODULE, gossip_ring]).
-
schedule_next_reset() ->
{_, Reset} = app_helper:get_env(riak_core, gossip_limit, ?DEFAULT_LIMIT),
timer:apply_after(Reset, gen_server, cast, [?MODULE, reset_tokens]).
View
8 src/riak_core_handoff_manager.erl
@@ -89,7 +89,13 @@ handle_cast({del_exclusion, {Mod, Idx}}, State=#state{excl=Excl}) ->
{noreply, State#state{excl=ordsets:del_element({Mod, Idx}, Excl)}};
handle_cast({add_exclusion, {Mod, Idx}}, State=#state{excl=Excl}) ->
{ok, Ring} = riak_core_ring_manager:get_raw_ring(),
- riak_core_ring_events:ring_update(Ring),
+ case riak_core_ring:my_indices(Ring) of
+ [] ->
+ %% Trigger a ring update to ensure the node shuts down
+ riak_core_ring_events:ring_update(Ring);
+ _ ->
+ ok
+ end,
{noreply, State#state{excl=ordsets:add_element({Mod, Idx}, Excl)}}.
handle_info(_Info, State) ->
View
10 src/riak_core_ring_manager.erl
@@ -247,6 +247,9 @@ handle_call({ring_trans, Fun, Args}, _From, State=#state{raw_ring=Ring}) ->
prune_write_notify_ring(NewRing),
riak_core_gossip:random_recursive_gossip(NewRing),
{reply, {ok, NewRing}, State#state{raw_ring=NewRing}};
+ {set_only, NewRing} ->
+ prune_write_ring(NewRing),
+ {reply, {ok, NewRing}, State#state{raw_ring=NewRing}};
{reconciled_ring, NewRing} ->
prune_write_notify_ring(NewRing),
riak_core_gossip:recursive_gossip(NewRing),
@@ -377,11 +380,14 @@ set_ring_global(Ring) ->
%% Persist a new ring file, set the global value and notify any listeners
prune_write_notify_ring(Ring) ->
+ prune_write_ring(Ring),
+ riak_core_ring_events:ring_update(Ring).
+
+prune_write_ring(Ring) ->
riak_core_ring:check_tainted(Ring, "Error: Persisting tainted ring"),
riak_core_ring_manager:prune_ringfiles(),
do_write_ringfile(Ring),
- set_ring_global(Ring),
- riak_core_ring_events:ring_update(Ring).
+ set_ring_global(Ring).
%% ===================================================================
%% Unit tests
View
274 src/riak_core_vnode.erl
@@ -20,8 +20,8 @@
-behaviour(gen_fsm).
-include_lib("riak_core_vnode.hrl").
-export([behaviour_info/1]).
--export([start_link/2,
- start_link/3,
+-export([start_link/3,
+ start_link/4,
send_command/2,
send_command_after/2]).
-export([init/1,
@@ -34,8 +34,8 @@
code_change/4]).
-export([reply/2]).
-export([get_mod_index/1,
- update_forwarding/2,
- trigger_handoff/1]).
+ set_forwarding/2,
+ trigger_handoff/2]).
-spec behaviour_info(atom()) -> 'undefined' | [{atom(), arity()}].
behaviour_info(callbacks) ->
@@ -90,13 +90,15 @@ behaviour_info(_Other) ->
handoff_node=none :: none | node(),
handoff_pid :: pid(),
pool_pid :: pid() | undefined,
+ manager_event_timer :: reference(),
inactivity_timeout}).
-start_link(Mod, Index) ->
- start_link(Mod, Index, 0).
+start_link(Mod, Index, Forward) ->
+ start_link(Mod, Index, 0, Forward).
-start_link(Mod, Index, InitialInactivityTimeout) ->
- gen_fsm:start_link(?MODULE, [Mod, Index, InitialInactivityTimeout], []).
+start_link(Mod, Index, InitialInactivityTimeout, Forward) ->
+ gen_fsm:start_link(?MODULE,
+ [Mod, Index, InitialInactivityTimeout, Forward], []).
%% Send a command message for the vnode module by Pid -
%% typically to do some deferred processing after returning yourself
@@ -111,8 +113,7 @@ send_command_after(Time, Request) ->
gen_fsm:send_event_after(Time, ?VNODE_REQ{request=Request}).
-init([Mod, Index, InitialInactivityTimeout]) ->
- %%TODO: Should init args really be an array if it just gets Init?
+init([Mod, Index, InitialInactivityTimeout, Forward]) ->
process_flag(trap_exit, true),
{ModState, Props} = case Mod:init([Index]) of
{ok, MS} -> {MS, []};
@@ -129,20 +130,19 @@ init([Mod, Index, InitialInactivityTimeout]) ->
end,
riak_core_handoff_manager:remove_exclusion(Mod, Index),
Timeout = app_helper:get_env(riak_core, vnode_inactivity_timeout, ?DEFAULT_TIMEOUT),
- {ok, Ring} = riak_core_ring_manager:get_raw_ring(),
- State = #state{index=Index, mod=Mod, modstate=ModState,
+ State = #state{index=Index, mod=Mod, modstate=ModState, forward=Forward,
inactivity_timeout=Timeout, pool_pid=PoolPid},
- State2 = update_forwarding_mode(Ring, State),
- {ok, active, State2, InitialInactivityTimeout}.
+ lager:debug("vnode :: ~p/~p :: ~p~n", [Mod, Index, Forward]),
+ {ok, active, State, InitialInactivityTimeout}.
get_mod_index(VNode) ->
gen_fsm:sync_send_all_state_event(VNode, get_mod_index).
-update_forwarding(VNode, Ring) ->
- gen_fsm:send_all_state_event(VNode, {update_forwarding, Ring}).
+set_forwarding(VNode, ForwardTo) ->
+ gen_fsm:send_all_state_event(VNode, {set_forwarding, ForwardTo}).
-trigger_handoff(VNode) ->
- gen_fsm:send_all_state_event(VNode, trigger_handoff).
+trigger_handoff(VNode, TargetNode) ->
+ gen_fsm:send_all_state_event(VNode, {trigger_handoff, TargetNode}).
continue(State) ->
{next_state, active, State, State#state.inactivity_timeout}.
@@ -171,22 +171,6 @@ continue(State, NewModState) ->
%% In the forwarding state, all vnode commands and coverage commands are
%% forwarded to the new owner for processing.
-update_forwarding_mode(_Ring, State=#state{modstate={deleted, _ModState}}) ->
- %% awaiting unregistered message from the vnode master. The
- %% vnode has been deleted so cannot handle messages even if
- %% we wanted to.
- State;
-update_forwarding_mode(Ring, State=#state{index=Index, mod=Mod}) ->
- Node = node(),
- case riak_core_ring:next_owner(Ring, Index, Mod) of
- {Node, NextOwner, complete} ->
- riak_core_vnode_manager:set_not_forwarding(self(), false),
- State#state{forward=NextOwner};
- _ ->
- riak_core_vnode_manager:set_not_forwarding(self(), true),
- State#state{forward=undefined}
- end.
-
vnode_command(Sender, Request, State=#state{index=Index,
mod=Mod,
modstate=ModState,
@@ -278,8 +262,9 @@ vnode_handoff_command(Sender, Request, State=#state{index=Index,
{stop, Reason, State#state{modstate=NewModState}}
end.
-active(timeout, State) ->
- maybe_handoff(State);
+active(timeout, State=#state{mod=Mod, index=Idx}) ->
+ riak_core_vnode_manager:vnode_event(Mod, Idx, self(), inactive),
+ continue(State);
active(?COVERAGE_REQ{keyspaces=KeySpaces,
request=Request,
sender=Sender}, State) ->
@@ -290,28 +275,23 @@ active(?VNODE_REQ{sender=Sender, request=Request},
vnode_command(Sender, Request, State);
active(?VNODE_REQ{sender=Sender, request=Request},State) ->
vnode_handoff_command(Sender, Request, State);
-active(handoff_complete, State=#state{mod=Mod,
- modstate=ModState,
- index=Idx,
- handoff_node=HN,
+active(handoff_complete, State=#state{mod=Mod,
+ index=Idx,
handoff_token=HT}) ->
riak_core_handoff_manager:release_handoff_lock({Mod, Idx}, HT),
- Mod:handoff_finished(HN, ModState),
- finish_handoff(State);
-active({handoff_error, _Err, _Reason}, State=#state{mod=Mod,
- modstate=ModState,
- index=Idx,
+ State2 = start_manager_event_timer(handoff_complete, State),
+ continue(State2);
+active({handoff_error, _Err, _Reason}, State=#state{mod=Mod,
+ index=Idx,
handoff_token=HT}) ->
riak_core_handoff_manager:release_handoff_lock({Mod, Idx}, HT),
- %% it would be nice to pass {Err, Reason} to the vnode but the
- %% API doesn't currently allow for that.
- Mod:handoff_cancelled(ModState),
- continue(State#state{handoff_node=none});
-active({update_forwarding, Ring}, State) ->
- NewState = update_forwarding_mode(Ring, State),
- continue(NewState);
-active(trigger_handoff, State) ->
- maybe_handoff(State);
+ State2 = start_manager_event_timer(handoff_error, State),
+ continue(State2);
+active({send_manager_event, Event}, State) ->
+ State2 = start_manager_event_timer(Event, State),
+ continue(State2);
+active({trigger_handoff, TargetNode}, State) ->
+ maybe_handoff(State, TargetNode);
active(unregistered, State=#state{mod=Mod, index=Index}) ->
%% Add exclusion so the ring handler will not try to spin this vnode
%% up until it receives traffic.
@@ -326,12 +306,64 @@ active(_Event, _From, State) ->
Reply = ok,
{reply, Reply, active, State, State#state.inactivity_timeout}.
+%% This code lives in riak_core_vnode rather than riak_core_vnode_manager
+%% because the ring_trans call is a synchronous call to the ring manager,
+%% and it is better to block an individual vnode rather than the vnode
+%% manager. Blocking the manager can impact all vnodes. This code is safe
+%% to execute on multiple parallel vnodes because of the synchronization
+%% afforded by having all ring changes go through the single ring manager.
+mark_handoff_complete(Idx, Prev, New, Mod) ->
+ Result = riak_core_ring_manager:ring_trans(
+ fun(Ring, _) ->
+ Owner = riak_core_ring:index_owner(Ring, Idx),
+ {_, NextOwner, Status} = riak_core_ring:next_owner(Ring, Idx, Mod),
+ NewStatus = riak_core_ring:member_status(Ring, New),
+
+ case {Owner, NextOwner, NewStatus, Status} of
+ {Prev, New, _, awaiting} ->
+ Ring2 = riak_core_ring:handoff_complete(Ring, Idx, Mod),
+ %% Optimization. Only alter the local ring without
+ %% triggering a gossip, thus implicitly coalescing
+ %% multiple vnode handoff completion events. In the
+ %% future we should decouple vnode handoff state from
+ %% the ring structure in order to make gossip independent
+ %% of ring size.
+ {set_only, Ring2};
+ _ ->
+ ignore
+ end
+ end, []),
+
+ case Result of
+ {ok, NewRing} ->
+ NewRing = NewRing;
+ _ ->
+ {ok, NewRing} = riak_core_ring_manager:get_my_ring()
+ end,
+
+ Owner = riak_core_ring:index_owner(NewRing, Idx),
+ {_, NextOwner, Status} = riak_core_ring:next_owner(NewRing, Idx, Mod),
+ NewStatus = riak_core_ring:member_status(NewRing, New),
+
+ case {Owner, NextOwner, NewStatus, Status} of
+ {_, _, invalid, _} ->
+ %% Handing off to invalid node, don't give-up data.
+ continue;
+ {Prev, New, _, _} ->
+ forward;
+ {Prev, _, _, _} ->
+ %% Handoff wasn't to node that is scheduled in next, so no change.
+ continue;
+ {_, _, _, _} ->
+ shutdown
+ end.
+
finish_handoff(State=#state{mod=Mod,
modstate=ModState,
index=Idx,
handoff_node=HN,
pool_pid=Pool}) ->
- case riak_core_gossip:finish_handoff(Idx, node(), HN, Mod) of
+ case mark_handoff_complete(Idx, node(), HN, Mod) of
continue ->
continue(State#state{handoff_node=none});
Res when Res == forward; Res == shutdown ->
@@ -350,16 +382,47 @@ finish_handoff(State=#state{mod=Mod,
lager:debug("~p ~p vnode finished handoff and deleted.",
[Idx, Mod]),
riak_core_handoff_manager:remove_handoff(Mod, Idx),
- riak_core_vnode_master:unregister_vnode(Idx, Mod),
- riak_core_vnode_manager:set_not_forwarding(self(), false),
+ riak_core_vnode_manager:unregister_vnode(Idx, Mod),
+ lager:debug("vnode hn/fwd :: ~p/~p :: ~p -> ~p~n",
+ [State#state.mod, State#state.index, State#state.forward, HN]),
continue(State#state{modstate={deleted,NewModState}, % like to fail if used
handoff_node=none,
forward=HN})
end.
-handle_event(R={update_forwarding, _Ring}, _StateName, State) ->
- active(R, State);
-handle_event(R=trigger_handoff, _StateName, State) ->
+handle_event({set_forwarding, undefined}, _StateName,
+ State=#state{modstate={deleted, _ModState}}) ->
+ %% The vnode must forward requests when in the deleted state, therefore
+ %% ignore requests to stop forwarding.
+ continue(State);
+handle_event({set_forwarding, ForwardTo}, _StateName, State) ->
+ lager:debug("vnode fwd :: ~p/~p :: ~p -> ~p~n",
+ [State#state.mod, State#state.index, State#state.forward, ForwardTo]),
+ continue(State#state{forward=ForwardTo});
+handle_event(finish_handoff, _StateName, State=#state{mod=Mod,
+ modstate=ModState,
+ handoff_node=HN}) ->
+ stop_manager_event_timer(State),
+ case HN of
+ none ->
+ continue(State);
+ _ ->
+ Mod:handoff_finished(HN, ModState),
+ finish_handoff(State)
+ end;
+handle_event(cancel_handoff, _StateName, State=#state{mod=Mod,
+ modstate=ModState}) ->
+ %% it would be nice to pass {Err, Reason} to the vnode but the
+ %% API doesn't currently allow for that.
+ stop_manager_event_timer(State),
+ case State#state.handoff_node of
+ none ->
+ continue(State);
+ _ ->
+ Mod:handoff_cancelled(ModState),
+ continue(State#state{handoff_node=none})
+ end;
+handle_event(R={trigger_handoff, _TargetNode}, _StateName, State) ->
active(R, State);
handle_event(R=?VNODE_REQ{}, _StateName, State) ->
active(R, State);
@@ -461,56 +524,26 @@ terminate(Reason, _StateName, #state{mod=Mod, modstate=ModState,
code_change(_OldVsn, StateName, State, _Extra) ->
{ok, StateName, State}.
-maybe_handoff(State=#state{modstate={deleted, _}}) ->
+maybe_handoff(State=#state{modstate={deleted, _}}, _TargetNode) ->
%% Modstate has been deleted, waiting for unregistered. No handoff.
continue(State);
-maybe_handoff(State=#state{mod=Mod, modstate=ModState}) ->
- case should_handoff(State) of
- {true, TargetNode} ->
- case Mod:handoff_starting(TargetNode, ModState) of
- {true, NewModState} ->
- start_handoff(State#state{modstate=NewModState}, TargetNode);
- {false, NewModState} ->
- continue(State, NewModState)
- end;
- false ->
- continue(State)
- end.
-
-should_handoff(#state{handoff_node=HN}) when HN /= none ->
+maybe_handoff(State=#state{index=Idx, mod=Mod, handoff_node=HN},
+ TargetNode) when HN /= none ->
%% Already handing off
- false;
-should_handoff(#state{index=Idx, mod=Mod}) ->
- {ok, Ring} = riak_core_ring_manager:get_raw_ring(),
- Me = node(),
- {_, NextOwner, _} = riak_core_ring:next_owner(Ring, Idx),
- Owner = riak_core_ring:index_owner(Ring, Idx),
- Ready = riak_core_ring:ring_ready(Ring),
- TargetNode = case {Ready, Owner, NextOwner} of
- {_, _, Me} ->
- Me;
- {_, Me, undefined} ->
- Me;
- {true, Me, _} ->
- NextOwner;
- {_, _, undefined} ->
- Owner;
- {_, _, _} ->
- Me
- end,
- case TargetNode of
- Me ->
- false;
+ case HN of
+ TargetNode ->
+ ok;
_ ->
- case app_for_vnode_module(Mod) of
- undefined -> false;
- {ok, App} ->
- case lists:member(TargetNode,
- riak_core_node_watcher:nodes(App)) of
- false -> false;
- true -> {true, TargetNode}
- end
- end
+ lager:info("~s/~b: ignoring handoff request to ~p, already "
+ "handing off to ~p", [Mod, Idx, TargetNode, HN])
+ end,
+ continue(State);
+maybe_handoff(State=#state{mod=Mod, modstate=ModState}, TargetNode) ->
+ case Mod:handoff_starting(TargetNode, ModState) of
+ {true, NewModState} ->
+ start_handoff(State#state{modstate=NewModState}, TargetNode);
+ {false, NewModState} ->
+ continue(State, NewModState)
end.
start_handoff(State=#state{index=Idx, mod=Mod, modstate=ModState}, TargetNode) ->
@@ -555,14 +588,21 @@ reply({raw, Ref, From}, Reply) ->
reply(ignore, _Reply) ->
ok.
-app_for_vnode_module(Mod) when is_atom(Mod) ->
- case application:get_env(riak_core, vnode_modules) of
- {ok, Mods} ->
- case lists:keysearch(Mod, 2, Mods) of
- {value, {App, Mod}} ->
- {ok, App};
- false ->
- undefined
- end;
- undefined -> undefined
- end.
+%% Individual vnode processes and the vnode manager are tightly coupled. When
+%% vnode events occur, the vnode must ensure that the events are forwarded to
+%% the vnode manager, which will make a state change decision and send an
+%% appropriate message back to the vnode. To minimize blocking, asynchronous
+%% messaging is used. It is possible for the vnode manager to crash and miss
+%% messages sent by the vnode. Therefore, the vnode periodically resends event
+%% messages until an appropriate message is received back from the vnode
+%% manager. The event timer functions below implement this logic.
+start_manager_event_timer(Event, State=#state{mod=Mod, index=Idx}) ->
+ riak_core_vnode_manager:vnode_event(Mod, Idx, self(), Event),
+ stop_manager_event_timer(State),
+ T2 = gen_fsm:send_event_after(30000, {send_manager_event, Event}),
+ State#state{manager_event_timer=T2}.
+
+stop_manager_event_timer(#state{manager_event_timer=undefined}) ->
+ ok;
+stop_manager_event_timer(#state{manager_event_timer=T}) ->
+ gen_fsm:cancel_timer(T).
View
291 src/riak_core_vnode_manager.erl
@@ -27,18 +27,19 @@
-export([start_link/0, stop/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([all_vnodes/0, all_vnodes/1, ring_changed/1, set_not_forwarding/2,
+-export([all_vnodes/0, all_vnodes/1, all_vnodes_status/0, ring_changed/1,
force_handoffs/0]).
--export([all_nodes/1, all_index_pid/1, get_vnode_pid/2, start_vnode/2,
- unregister_vnode/2, unregister_vnode/3]).
+-export([all_index_pid/1, get_vnode_pid/2, start_vnode/2,
+ unregister_vnode/2, unregister_vnode/3, vnode_event/4]).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
-record(idxrec, {key, idx, mod, pid, monref}).
-record(state, {idxtab,
- not_forwarding :: [pid()]
+ forwarding :: [pid()],
+ handoff :: [{term(), integer(), pid(), node()}]
}).
-define(DEFAULT_OWNERSHIP_TRIGGER, 8).
@@ -59,29 +60,28 @@ all_vnodes() ->
all_vnodes(Mod) ->
gen_server:call(?MODULE, {all_vnodes, Mod}).
-ring_changed(Ring) ->
- gen_server:cast(?MODULE, {ring_changed, Ring}).
+all_vnodes_status() ->
+ gen_server:call(?MODULE, all_vnodes_status).
-set_not_forwarding(Pid, Value) ->
- gen_server:cast(?MODULE, {set_not_forwarding, Pid, Value}).
+ring_changed(_TaintedRing) ->
+ %% The ring passed into ring events is the locally modified tainted ring.
+ %% Since the vnode manager uses operations that cannot work on the
+ %% tainted ring, we must retreive the raw ring directly.
+ {ok, Ring} = riak_core_ring_manager:get_raw_ring(),
+ gen_server:cast(?MODULE, {ring_changed, Ring}).
%% @doc Provided for support/debug purposes. Forces all running vnodes to start
%% handoff. Limited by handoff_concurrency setting and therefore may need
%% to be called multiple times to force all handoffs to complete.
-force_handoffs() ->
- [riak_core_vnode:trigger_handoff(Pid) || {_, _, Pid} <- all_vnodes()],
- ok.
+force_handoffs() ->
+ gen_server:cast(?MODULE, force_handoffs).
unregister_vnode(Index, VNodeMod) ->
unregister_vnode(Index, self(), VNodeMod).
unregister_vnode(Index, Pid, VNodeMod) ->
gen_server:cast(?MODULE, {unregister, Index, VNodeMod, Pid}).
-%% Request a list of Pids for all vnodes
-all_nodes(VNodeMod) ->
- gen_server:call(?MODULE, {all_nodes, VNodeMod}, infinity).
-
all_index_pid(VNodeMod) ->
gen_server:call(?MODULE, {all_index_pid, VNodeMod}, infinity).
@@ -91,23 +91,32 @@ get_vnode_pid(Index, VNodeMod) ->
start_vnode(Index, VNodeMod) ->
gen_server:cast(?MODULE, {Index, VNodeMod, start_vnode}).
+vnode_event(Mod, Idx, Pid, Event) ->
+ gen_server:cast(?MODULE, {vnode_event, Mod, Idx, Pid, Event}).
+
%% ===================================================================
%% gen_server behaviour
%% ===================================================================
%% @private
init(_State) ->
- State = #state{not_forwarding=[]},
+ {ok, Ring} = riak_core_ring_manager:get_raw_ring(),
+ Mods = [Mod || {_, Mod} <- riak_core:vnode_modules()],
+ State = #state{forwarding=[], handoff=[]},
State2 = find_vnodes(State),
- {ok, State2}.
+ AllVNodes = get_all_vnodes(Mods, State2),
+ State3 = update_forwarding(AllVNodes, Mods, Ring, State2),
+ State4 = update_handoff(AllVNodes, Ring, State3),
+ {ok, State4}.
%% @private
find_vnodes(State) ->
%% Get the current list of vnodes running in the supervisor. We use this
%% to rebuild our ETS table for routing messages to the appropriate
%% vnode.
VnodePids = [Pid || {_, Pid, worker, _}
- <- supervisor:which_children(riak_core_vnode_sup)],
+ <- supervisor:which_children(riak_core_vnode_sup),
+ is_pid(Pid) andalso is_process_alive(Pid)],
IdxTable = ets:new(ets_vnodes, [{keypos, 2}]),
%% If the vnode manager is being restarted, scan the existing
@@ -135,8 +144,9 @@ find_vnodes(State) ->
State#state{idxtab=IdxTable}.
%% @private
-handle_call({all_nodes, Mod}, _From, State) ->
- {reply, lists:flatten(ets:match(State#state.idxtab, {idxrec, '_', '_', Mod, '$1', '_'})), State};
+handle_call(all_vnodes_status, _From, State) ->
+ Reply = get_all_vnodes_status(State),
+ {reply, Reply, State};
handle_call(all_vnodes, _From, State) ->
Reply = get_all_vnodes(State),
{reply, Reply, State};
@@ -157,51 +167,49 @@ handle_cast({Partition, Mod, start_vnode}, State) ->
get_vnode(Partition, Mod, State),
{noreply, State};
handle_cast({unregister, Index, Mod, Pid}, #state{idxtab=T} = State) ->
+ %% Update forwarding state to ensure vnode is not restarted in
+ %% incorrect forwarding state if next request arrives before next
+ %% ring event.
+ {ok, Ring} = riak_core_ring_manager:get_my_ring(),
+ State2 = update_forwarding({Mod, Index}, Ring, State),
ets:match_delete(T, {idxrec, {Index, Mod}, Index, Mod, Pid, '_'}),
- riak_core_vnode_proxy:unregister_vnode(Mod, Index),
- gen_fsm:send_event(Pid, unregistered),
- {noreply, State};
-handle_cast({ring_changed, Ring}, State=#state{not_forwarding=NF}) ->
- %% Inform vnodes that the ring has changed so they can update their
- %% forwarding state.
+ riak_core_vnode_proxy:unregister_vnode(Mod, Index, Pid),
+ {noreply, State2};
+handle_cast({vnode_event, Mod, Idx, Pid, Event}, State) ->
+ handle_vnode_event(Event, Mod, Idx, Pid, State);
+handle_cast(force_handoffs, State) ->
+ AllVNodes = get_all_vnodes(State),
+ {ok, Ring} = riak_core_ring_manager:get_raw_ring(),
+ State2 = update_handoff(AllVNodes, Ring, State),
+
+ [maybe_trigger_handoff(Mod, Idx, Pid, State2)
+ || {Mod, Idx, Pid} <- AllVNodes],
+
+ {noreply, State2};
+handle_cast({ring_changed, Ring}, State) ->
+ %% Update vnode forwarding state
AllVNodes = get_all_vnodes(State),
- AllPids = [Pid || {_Mod, _Idx, Pid} <- AllVNodes],
- Notify = AllPids -- NF,
- [riak_core_vnode:update_forwarding(Pid, Ring) || Pid <- Notify],
+ Mods = [Mod || {_, Mod} <- riak_core:vnode_modules()],
+ State2 = update_forwarding(AllVNodes, Mods, Ring, State),
+
+ %% Update handoff state
+ State3 = update_handoff(AllVNodes, Ring, State2),
%% Trigger ownership transfers.
Transfers = riak_core_ring:pending_changes(Ring),
Limit = app_helper:get_env(riak_core,
forced_ownership_handoff,
?DEFAULT_OWNERSHIP_TRIGGER),
Throttle = lists:sublist(Transfers, Limit),
- Mods = [Mod || {_, Mod} <- riak_core:vnode_modules()],
Awaiting = [{Mod, Idx} || {Idx, Node, _, CMods, S} <- Throttle,
Mod <- Mods,
S =:= awaiting,
Node =:= node(),
not lists:member(Mod, CMods)],
- [riak_core_vnode:trigger_handoff(Pid) || {ModA, IdxA, Pid} <- AllVNodes,
- {ModB, IdxB} <- Awaiting,
- ModA =:= ModB,
- IdxA =:= IdxB],
-
- %% Filter out dead pids from the not_forwarding list.
- NF2 = lists:filter(fun(Pid) ->
- is_pid(Pid) andalso is_process_alive(Pid)
- end, NF),
- {noreply, State#state{not_forwarding=NF2}};
-
-handle_cast({set_not_forwarding, Pid, Value},
- State=#state{not_forwarding=NF}) ->
- NF2 = case Value of
- true ->
- lists:usort([Pid | NF]);
- false ->
- NF -- [Pid]
- end,
- {noreply, State#state{not_forwarding=NF2}};
+ [maybe_trigger_handoff(Mod, Idx, State3) || {Mod, Idx} <- Awaiting],
+
+ {noreply, State3};
handle_cast(_, State) ->
{noreply, State}.
@@ -211,6 +219,19 @@ handle_info({'DOWN', MonRef, process, _P, _I}, State) ->
{noreply, State}.
%% @private
+handle_vnode_event(inactive, Mod, Idx, Pid, State) ->
+ maybe_trigger_handoff(Mod, Idx, Pid, State),
+ {noreply, State};
+handle_vnode_event(handoff_complete, Mod, Idx, Pid, State) ->
+ NewHO = orddict:erase({Mod, Idx}, State#state.handoff),
+ gen_fsm:send_all_state_event(Pid, finish_handoff),
+ {noreply, State#state{handoff=NewHO}};
+handle_vnode_event(handoff_error, Mod, Idx, Pid, State) ->
+ NewHO = orddict:erase({Mod, Idx}, State#state.handoff),
+ gen_fsm:send_all_state_event(Pid, cancel_handoff),
+ {noreply, State#state{handoff=NewHO}}.
+
+%% @private
terminate(_Reason, _State) ->
ok.
@@ -229,8 +250,10 @@ get_all_index_pid(Mod, State) ->
get_all_vnodes(State) ->
Mods = [Mod || {_App, Mod} <- riak_core:vnode_modules()],
- lists:flatmap(fun(Mod) -> get_all_vnodes(Mod, State) end, Mods).
+ get_all_vnodes(Mods, State).
+get_all_vnodes(Mods, State) when is_list(Mods) ->
+ lists:flatmap(fun(Mod) -> get_all_vnodes(Mod, State) end, Mods);
get_all_vnodes(Mod, State) ->
try get_all_index_pid(Mod, State) of
IdxPids ->
@@ -258,10 +281,174 @@ add_vnode_rec(I, _State=#state{idxtab=T}) -> ets:insert(T,I).
get_vnode(Idx, Mod, State) ->
case idx2vnode(Idx, Mod, State) of
no_match ->
- {ok, Pid} = riak_core_vnode_sup:start_vnode(Mod, Idx),
+ ForwardTo = get_forward(Mod, Idx, State),
+ {ok, Pid} = riak_core_vnode_sup:start_vnode(Mod, Idx, ForwardTo),
MonRef = erlang:monitor(process, Pid),
add_vnode_rec(#idxrec{key={Idx,Mod},idx=Idx,mod=Mod,pid=Pid,
monref=MonRef}, State),
Pid;
X -> X
end.
+
+get_forward(Mod, Idx, #state{forwarding=Fwd}) ->
+ case orddict:find({Mod, Idx}, Fwd) of
+ {ok, ForwardTo} ->
+ ForwardTo;
+ _ ->
+ undefined
+ end.
+
+check_forward(Ring, Mod, Index) ->
+ Node = node(),
+ case riak_core_ring:next_owner(Ring, Index, Mod) of
+ {Node, NextOwner, complete} ->
+ {{Mod, Index}, NextOwner};
+ _ ->
+ {{Mod, Index}, undefined}
+ end.
+
+compute_forwarding(Mods, Ring) ->
+ {AllIndices, _} = lists:unzip(riak_core_ring:all_owners(Ring)),
+ Forwarding = [check_forward(Ring, Mod, Index) || Index <- AllIndices,
+ Mod <- Mods],
+ orddict:from_list(Forwarding).
+
+update_forwarding(AllVNodes, Mods, Ring,
+ State=#state{forwarding=Forwarding}) ->
+ NewForwarding = compute_forwarding(Mods, Ring),
+
+ %% Inform vnodes that have changed forwarding status
+ VNodes = lists:sort([{{Mod, Idx}, Pid} || {Mod, Idx, Pid} <- AllVNodes]),
+ Diff = NewForwarding -- Forwarding,
+ [change_forward(VNodes, Mod, Idx, ForwardTo)
+ || {{Mod, Idx}, ForwardTo} <- Diff],
+
+ State#state{forwarding=NewForwarding}.
+
+update_forwarding({Mod, Idx}, Ring, State=#state{forwarding=Forwarding}) ->
+ {_, ForwardTo} = check_forward(Ring, Mod, Idx),
+ NewForwarding = orddict:store({Mod, Idx}, ForwardTo, Forwarding),
+ State#state{forwarding=NewForwarding}.
+
+change_forward(VNodes, Mod, Idx, ForwardTo) ->
+ case orddict:find({Mod, Idx}, VNodes) of
+ error ->
+ ok;
+ {ok, Pid} ->
+ riak_core_vnode:set_forwarding(Pid, ForwardTo),
+ ok
+ end.
+
+update_handoff(AllVNodes, Ring, State) ->
+ case riak_core_ring:ring_ready(Ring) of
+ false ->
+ State;
+ true ->
+ NewHO = lists:flatten([case should_handoff(Ring, Mod, Idx) of
+ false ->
+ [];
+ {true, TargetNode} ->
+ [{{Mod, Idx}, TargetNode}]
+ end || {Mod, Idx, _Pid} <- AllVNodes]),
+ State#state{handoff=orddict:from_list(NewHO)}
+ end.
+
+should_handoff(Ring, Mod, Idx) ->
+ {_, NextOwner, _} = riak_core_ring:next_owner(Ring, Idx),
+ Owner = riak_core_ring:index_owner(Ring, Idx),
+ Ready = riak_core_ring:ring_ready(Ring),
+ case determine_handoff_target(Ready, Owner, NextOwner) of
+ undefined ->
+ false;
+ TargetNode ->
+ case app_for_vnode_module(Mod) of
+ undefined -> false;
+ {ok, App} ->
+ case lists:member(TargetNode,
+ riak_core_node_watcher:nodes(App)) of
+ false -> false;
+ true -> {true, TargetNode}
+ end
+ end
+ end.
+
+determine_handoff_target(Ready, Owner, NextOwner) ->
+ Me = node(),
+ TargetNode = case {Ready, Owner, NextOwner} of
+ {_, _, Me} ->
+ Me;
+ {_, Me, undefined} ->
+ Me;
+ {true, Me, _} ->
+ NextOwner;
+ {_, _, undefined} ->
+ Owner;
+ {_, _, _} ->
+ Me
+ end,
+ case TargetNode of
+ Me ->
+ undefined;
+ _ ->
+ TargetNode
+ end.
+
+app_for_vnode_module(Mod) when is_atom(Mod) ->
+ case application:get_env(riak_core, vnode_modules) of
+ {ok, Mods} ->
+ case lists:keysearch(Mod, 2, Mods) of
+ {value, {App, Mod}} ->
+ {ok, App};
+ false ->
+ undefined
+ end;
+ undefined -> undefined
+ end.
+
+maybe_trigger_handoff(Mod, Idx, State) ->
+ Pid = get_vnode(Idx, Mod, State),
+ maybe_trigger_handoff(Mod, Idx, Pid, State).
+
+maybe_trigger_handoff(Mod, Idx, Pid, _State=#state{handoff=HO}) ->
+ case orddict:find({Mod, Idx}, HO) of
+ {ok, TargetNode} ->
+ riak_core_vnode:trigger_handoff(Pid, TargetNode),
+ ok;
+ error ->
+ ok
+ end.
+
+get_all_vnodes_status(State=#state{forwarding=Forwarding, handoff=HO}) ->
+ {ok, Ring} = riak_core_ring_manager:get_my_ring(),
+ Owners = riak_core_ring:all_owners(Ring),
+ VNodes = get_all_vnodes(State),
+ Mods = [Mod || {_App, Mod} <- riak_core:vnode_modules()],
+
+ ThisNode = node(),
+ Types = [case Owner of
+ ThisNode ->
+ {{Mod, Idx}, {type, primary}};
+ _ ->
+ {{Mod, Idx}, {type, secondary}}
+ end || {Idx, Owner} <- Owners,
+ Mod <- Mods],
+ Types2 = lists:keysort(1, Types),
+ Pids = [{{Mod, Idx}, {pid, Pid}} || {Mod, Idx, Pid} <- VNodes],
+ Pids2 = lists:keysort(1, Pids),
+ Forwarding2 = [{MI, {forwarding, Node}} || {MI,Node} <- Forwarding,
+ Node /= undefined],
+ Handoff2 = [{MI, {should_handoff, Node}} || {MI,Node} <- HO],
+
+ MergeFn = fun(_, V1, V2) when is_list(V1) and is_list(V2) ->
+ V1 ++ V2;
+ (_, V1, V2) when is_list(V1) ->
+ V1 ++ [V2];
+ (_, V1, V2) ->
+ [V1, V2]
+ end,
+ Status = lists:foldl(fun(B, A) ->
+ orddict:merge(MergeFn, A, B)
+ end, Types2, [Pids2, Forwarding2, Handoff2]),
+ Status.
+
+
View
11 src/riak_core_vnode_master.erl
@@ -31,7 +31,7 @@
command_return_vnode/4,
sync_command/4,
sync_spawn_command/3, make_request/3,
- make_coverage_request/4, reg_name/1]).
+ make_coverage_request/4, all_nodes/1, reg_name/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state, {idxtab, sup_name, vnode_mod, legacy}).
@@ -137,6 +137,15 @@ make_coverage_request(Request, KeySpaces, Sender, Index) ->
sender=Sender,
request=Request}.
+%% Request a list of Pids for all vnodes
+%% @deprecated
+%% Provided for compatibility with older vnode master API. New code should
+%% use riak_core_vnode_manager:all_vnode/1 which returns a mod/index/pid
+%% list rather than just a pid list.
+all_nodes(VNodeMod) ->
+ VNodes = riak_core_vnode_manager:all_vnodes(VNodeMod),
+ [Pid || {_Mod, _Idx, Pid} <- VNodes].
+
%% @private
init([VNodeMod, LegacyMod, _RegName]) ->
{ok, #state{idxtab=undefined,
View
23 src/riak_core_vnode_proxy.erl
@@ -17,8 +17,8 @@
%%
%% -------------------------------------------------------------------
-module(riak_core_vnode_proxy).
--export([start_link/2, init/1, reg_name/2, reg_name/3, call/2, cast/2,
- unregister_vnode/2, command_return_vnode/2]).
+-export([start_link/2, init/1, reg_name/2, reg_name/3, call/2, call/3, cast/2,
+ unregister_vnode/3, command_return_vnode/2]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
-record(state, {mod, index, vnode_pid, vnode_mref}).
@@ -42,8 +42,8 @@ init([Parent, RegName, Mod, Index]) ->
State = #state{mod=Mod, index=Index},
loop(Parent, State).
-unregister_vnode(Mod, Index) ->
- call(reg_name(Mod, Index), unregister_vnode).
+unregister_vnode(Mod, Index, Pid) ->
+ cast(reg_name(Mod, Index), {unregister_vnode, Pid}).
command_return_vnode({Mod,Index,Node}, Req) ->
call(reg_name(Mod, Index, Node), {return_vnode, Req}).
@@ -52,6 +52,10 @@ call(Name, Msg) ->
{ok,Res} = (catch gen:call(Name, '$vnode_proxy_call', Msg)),
Res.
+call(Name, Msg, Timeout) ->
+ {ok,Res} = (catch gen:call(Name, '$vnode_proxy_call', Msg, Timeout)),
+ Res.
+
cast(Name, Msg) ->
catch erlang:send(Name, {'$vnode_proxy_cast', Msg}),
ok.
@@ -86,10 +90,6 @@ loop(Parent, State) ->
end.
%% @private
-handle_call(unregister_vnode, _From, State) ->
- catch demonitor(State#state.vnode_mref, [flush]),
- NewState = State#state{vnode_pid=undefined, vnode_mref=undefined},
- {reply, ok, NewState};
handle_call({return_vnode, Req}, _From, State) ->
{Pid, NewState} = get_vnode_pid(State),
gen_fsm:send_event(Pid, Req),
@@ -99,6 +99,13 @@ handle_call(_Msg, _From, State) ->
{reply, ok, State}.
%% @private
+handle_cast({unregister_vnode, Pid}, State) ->
+ %% The pid may not match the vnode_pid in the state, but we must send the
+ %% unregister event anyway -- the vnode manager requires it.
+ gen_fsm:send_event(Pid, unregistered),
+ catch demonitor(State#state.vnode_mref, [flush]),
+ NewState = State#state{vnode_pid=undefined, vnode_mref=undefined},
+ {noreply, NewState};
handle_cast(_Msg, State) ->
{noreply, State}.
View
6 src/riak_core_vnode_sup.erl
@@ -25,10 +25,10 @@
-module(riak_core_vnode_sup).
-behaviour(supervisor).
-export([start_link/0, init/1]).
--export([start_vnode/2]).
+-export([start_vnode/3]).
-start_vnode(Mod, Index) when is_integer(Index) ->
- supervisor_pre_r14b04:start_child(?MODULE, [Mod, Index]).
+start_vnode(Mod, Index, ForwardTo) when is_integer(Index) ->
+ supervisor_pre_r14b04:start_child(?MODULE, [Mod, Index, ForwardTo]).
start_link() ->
%% We use a custom copy of the supervisor module that is expected to be

0 comments on commit 0223283

Please sign in to comment.