Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge remote branch 'origin/jdm-sync-forward-shutdown' into jdm-jdb-a…

…sync-handoff
  • Loading branch information...
commit caa7351f36b5e71e7b1b81ae3f78ee06d11184ae 2 parents 183517f + 23ddb13
@jonmeredith jonmeredith authored
View
2  src/riak_core_gossip.erl
@@ -77,7 +77,7 @@ stop() ->
gen_server:cast(?MODULE, stop).
finish_handoff(Idx, Prev, New, Mod) ->
- gen_server:call(?MODULE, {finish_handoff, Idx, Prev, New, Mod}).
+ gen_server:call(?MODULE, {finish_handoff, Idx, Prev, New, Mod}, infinity).
rejoin(Node, Ring) ->
gen_server:cast({?MODULE, Node}, {rejoin, Ring}).
View
97 src/riak_core_vnode.erl
@@ -171,6 +171,11 @@ continue(State, NewModState) ->
%% In the forwarding state, all vnode commands and coverage commands are
%% forwarded to the new owner for processing.
+update_forwarding_mode(_Ring, State=#state{modstate={deleted, _ModState}}) ->
+ %% awaiting unregistered message from the vnode master. The
+ %% vnode has been deleted so cannot handle messages even if
+ %% we wanted to.
+ State;
update_forwarding_mode(Ring, State=#state{index=Index, mod=Mod}) ->
Node = node(),
case riak_core_ring:next_owner(Ring, Index, Mod) of
@@ -277,8 +282,8 @@ active(timeout, State) ->
maybe_handoff(State);
active(?COVERAGE_REQ{keyspaces=KeySpaces,
request=Request,
- sender=Sender},
- State=#state{handoff_node=HN}) when HN =:= none ->
+ sender=Sender}, State) ->
+ %% Coverage request handled in handoff and non-handoff. Will be forwarded if set.
vnode_coverage(Sender, Request, KeySpaces, State);
active(?VNODE_REQ{sender=Sender, request=Request},
State=#state{handoff_node=HN}) when HN =:= none ->
@@ -306,7 +311,16 @@ active({update_forwarding, Ring}, State) ->
NewState = update_forwarding_mode(Ring, State),
continue(NewState);
active(trigger_handoff, State) ->
- maybe_handoff(State).
+ maybe_handoff(State);
+active(unregistered, State=#state{mod=Mod, index=Index}) ->
+ %% Add exclusion so the ring handler will not try to spin this vnode
+ %% up until it receives traffic.
+ riak_core_handoff_manager:add_exclusion(Mod, Index),
+ lager:debug("~p ~p vnode excluded and unregistered.",
+ [Index, Mod]),
+ {stop, normal, State#state{handoff_node=none,
+ pool_pid=undefined,
+ handoff_pid=undefined}}.
active(_Event, _From, State) ->
Reply = ok,
@@ -315,37 +329,31 @@ active(_Event, _From, State) ->
finish_handoff(State=#state{mod=Mod,
modstate=ModState,
index=Idx,
- pool_pid=Pool,
- handoff_node=HN}) ->
+ handoff_node=HN,
+ pool_pid=Pool}) ->
case riak_core_gossip:finish_handoff(Idx, node(), HN, Mod) of
- forward ->
- case is_pid(Pool) of
- true ->
- riak_core_vnode_worker_pool:shutdown_pool(Pool, 60000);
- _ ->
- ok
- end,
- {ok, NewModState} = Mod:delete(ModState),
- {stop, normal, State#state{modstate=NewModState,
- handoff_node=none,
- pool_pid=undefined,
- handoff_pid=undefined}};
continue ->
- continue(State#state{handoff_node=none,
- handoff_pid=undefined});
- shutdown ->
+ continue(State#state{handoff_node=none});
+ Res when Res == forward; Res == shutdown ->
+ %% Have to issue the delete now. Once unregistered the
+ %% vnode master will spin up a new vnode on demand.
+ %% Shutdown the async pool beforehand, don't want callbacks
+ %% running on non-existant data.
case is_pid(Pool) of
true ->
+ %% state.pool_pid will be cleaned up by handle_info message.
riak_core_vnode_worker_pool:shutdown_pool(Pool, 60000);
_ ->
ok
end,
{ok, NewModState} = Mod:delete(ModState),
- riak_core_handoff_manager:add_exclusion(Mod, Idx),
- {stop, normal, State#state{modstate=NewModState,
- handoff_node=none,
- pool_pid=undefined,
- handoff_pid=undefined}}
+ lager:debug("~p ~p vnode finished handoff and deleted.",
+ [Idx, Mod]),
+ riak_core_vnode_master:unregister_vnode(Idx, Mod),
+ riak_core_vnode_manager:set_not_forwarding(self(), false),
+ continue(State#state{modstate={deleted,NewModState}, % like to fail if used
+ handoff_node=none,
+ forward=HN})
end.
handle_event(R={update_forwarding, _Ring}, _StateName, State) ->
@@ -373,8 +381,24 @@ handle_sync_event({handoff_data,BinObj}, _From, StateName,
State#state.inactivity_timeout}
end.
-handle_info({'EXIT', Pid, _Reason}, _StateName, State=#state{handoff_pid=Pid}) ->
+handle_info({'EXIT', Pid, Reason}, _StateName,
+ State=#state{mod=Mod, index=Index, handoff_pid=Pid}) ->
+ case Reason of
+ normal ->
+ ok;
+ _ ->
+ lager:error("~p ~p handoff crashed ~p\n", [Index, Mod, Reason])
+ end,
continue(State#state{handoff_pid=undefined});
+handle_info({'EXIT', Pid, Reason}, _StateName,
+ State=#state{mod=Mod, index=Index, pool_pid=Pid}) ->
+ case Reason of
+ Reason when Reason == normal; Reason == shutdown ->
+ ok;
+ _ ->
+ lager:error("~p ~p worker pool crashed ~p\n", [Index, Mod, Reason])
+ end,
+ continue(State#state{pool_pid=undefined});
handle_info({'EXIT', Pid, Reason}, StateName, State=#state{mod=Mod,modstate=ModState}) ->
%% A linked processes has died so use the
@@ -387,8 +411,8 @@ handle_info({'EXIT', Pid, Reason}, StateName, State=#state{mod=Mod,modstate=ModS
{noreply,NewModState} ->
{next_state, StateName, State#state{modstate=NewModState},
State#state.inactivity_timeout};
- {stop, Reason, NewModState} ->
- {stop, Reason, State#state{modstate=NewModState}}
+ {stop, Reason1, NewModState} ->
+ {stop, Reason1, State#state{modstate=NewModState}}
end
catch
_ErrorType:undef ->
@@ -406,16 +430,23 @@ handle_info(Info, StateName, State=#state{mod=Mod,modstate=ModState}) ->
end.
terminate(Reason, _StateName, #state{mod=Mod, modstate=ModState,
- pool_pid=Pool,index=Index}) ->
- riak_core_vnode_master:unregister_vnode(Index,
- riak_core_vnode_master:reg_name(Mod)),
- case is_pid(Pool) of
+ pool_pid=Pool}) ->
+ %% Shutdown if the pool is still alive - there could be a race on
+ %% delivery of the unregistered event and successfully shutting
+ %% down the pool.
+ case is_pid(Pool) andalso is_process_alive(Pool) of
true ->
riak_core_vnode_worker_pool:shutdown_pool(Pool, 60000);
_ ->
ok
end,
- Mod:terminate(Reason, ModState).
+ case ModState of
+ %% Handoff completed, Mod:delete has been called, now terminate.
+ {deleted, ModState1} ->
+ Mod:terminate(Reason, ModState1);
+ _ ->
+ Mod:terminate(Reason, ModState)
+ end.
code_change(_OldVsn, StateName, State, _Extra) ->
{ok, StateName, State}.
View
15 src/riak_core_vnode_master.erl
@@ -32,7 +32,7 @@
sync_command/4,
sync_spawn_command/3, make_request/3,
make_coverage_request/4,
- unregister_vnode/2,
+ unregister_vnode/2, unregister_vnode/3,
all_nodes/1, reg_name/1, all_index_pid/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -136,8 +136,12 @@ make_coverage_request(Request, KeySpaces, Sender, Index) ->
sender=Sender,
request=Request}.
-unregister_vnode(Index, VMaster) ->
- gen_server:cast(VMaster, {unregister, Index}).
+unregister_vnode(Index, VNodeMod) ->
+ unregister_vnode(Index, self(), VNodeMod).
+
+unregister_vnode(Index, Pid, VNodeMod) ->
+ RegName = reg_name(VNodeMod),
+ gen_server:cast(RegName, {unregister, Index, Pid}).
%% Request a list of Pids for all vnodes
all_nodes(VNodeMod) ->
@@ -192,8 +196,9 @@ handle_cast(Req=?COVERAGE_REQ{index=Idx}, State) ->
Pid = get_vnode(Idx, State),
gen_fsm:send_event(Pid, Req),
{noreply, State};
-handle_cast({unregister, Index}, #state{idxtab=T} = State) ->
- ets:match_delete(T, {idxrec, Index, '_', '_'}),
+handle_cast({unregister, Index, Pid}, #state{idxtab=T} = State) ->
+ ets:match_delete(T, {idxrec, Index, Pid, '_'}),
+ gen_fsm:send_event(Pid, unregistered),
{noreply, State};
handle_cast(Other, State=#state{legacy=Legacy}) when Legacy =/= undefined ->
case catch Legacy:rewrite_cast(Other) of
Please sign in to comment.
Something went wrong with that request. Please try again.