Skip to content

Commit

Permalink
Merge branch 'jdm-jdb-async-handoff' into 1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
Jon Meredith committed Sep 27, 2011
2 parents 7cef0a8 + caa7351 commit 9af7c8c
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 48 deletions.
2 changes: 1 addition & 1 deletion src/riak_core_gossip.erl
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ stop() ->
gen_server:cast(?MODULE, stop).

finish_handoff(Idx, Prev, New, Mod) ->
gen_server:call(?MODULE, {finish_handoff, Idx, Prev, New, Mod}).
gen_server:call(?MODULE, {finish_handoff, Idx, Prev, New, Mod}, infinity).

rejoin(Node, Ring) ->
gen_server:cast({?MODULE, Node}, {rejoin, Ring}).
Expand Down
10 changes: 10 additions & 0 deletions src/riak_core_handoff_sender.erl
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ start_fold(TargetNode, Module, Partition, ParentPid, SslOpts) ->
foldfun=fun visit_item/3,
acc0={Socket,ParentPid,Module,TcpMod,0,0,ok}},
VMaster, infinity),
%% One last sync to make sure the message has been received.
%% post-0.14 vnodes switch to handoff to forwarding immediately
%% so handoff_complete can only be sent once all of the data is
%% written. handle_handoff_data is a sync call, so once
%% we receive the sync the remote side will be up to date.
lager:debug("~p ~p Sending final sync", [Partition, Module]),
ok = TcpMod:send(Socket, <<?PT_MSG_SYNC:8>>),
{ok,[?PT_MSG_SYNC|<<"sync">>]} = TcpMod:recv(Socket, 0),
lager:debug("~p ~p Final sync received", [Partition, Module]),

EndFoldTime = now(),
FoldTimeDiff = timer:now_diff(EndFoldTime, StartFoldTime) / 1000000,
case ErrStatus of
Expand Down
7 changes: 4 additions & 3 deletions src/riak_core_ring.erl
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ ring_ready(State0) ->
Owner = owner_node(State0),
State = update_seen(Owner, State0),
Seen = State?CHSTATE.seen,
Members = get_members(State?CHSTATE.members, [valid, leaving]),
Members = get_members(State?CHSTATE.members, [valid, leaving, exiting]),
VClock = State?CHSTATE.vclock,
R = [begin
case orddict:find(Node, Seen) of
Expand All @@ -618,7 +618,7 @@ ring_ready_info(State0) ->
Owner = owner_node(State0),
State = update_seen(Owner, State0),
Seen = State?CHSTATE.seen,
Members = get_members(State?CHSTATE.members, [valid, leaving]),
Members = get_members(State?CHSTATE.members, [valid, leaving, exiting]),
RecentVC =
orddict:fold(fun(_, VC, Recent) ->
case vclock:descends(VC, Recent) of
Expand Down Expand Up @@ -889,7 +889,8 @@ maybe_remove_exiting(Node, CState) ->
Claimant = CState?CHSTATE.claimant,
case Claimant of
Node ->
Exiting = get_members(CState?CHSTATE.members, [exiting]),
%% Change exiting nodes to invalid, skipping this node.
Exiting = get_members(CState?CHSTATE.members, [exiting]) -- [Node],
Changed = (Exiting /= []),
CState2 =
lists:foldl(fun(ENode, CState0) ->
Expand Down
97 changes: 64 additions & 33 deletions src/riak_core_vnode.erl
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,11 @@ continue(State, NewModState) ->
%% In the forwarding state, all vnode commands and coverage commands are
%% forwarded to the new owner for processing.

update_forwarding_mode(_Ring, State=#state{modstate={deleted, _ModState}}) ->
%% awaiting unregistered message from the vnode master. The
%% vnode has been deleted so cannot handle messages even if
%% we wanted to.
State;
update_forwarding_mode(Ring, State=#state{index=Index, mod=Mod}) ->
Node = node(),
case riak_core_ring:next_owner(Ring, Index, Mod) of
Expand Down Expand Up @@ -277,8 +282,8 @@ active(timeout, State) ->
maybe_handoff(State);
active(?COVERAGE_REQ{keyspaces=KeySpaces,
request=Request,
sender=Sender},
State=#state{handoff_node=HN}) when HN =:= none ->
sender=Sender}, State) ->
%% Coverage request handled in handoff and non-handoff. Will be forwarded if set.
vnode_coverage(Sender, Request, KeySpaces, State);
active(?VNODE_REQ{sender=Sender, request=Request},
State=#state{handoff_node=HN}) when HN =:= none ->
Expand Down Expand Up @@ -306,7 +311,16 @@ active({update_forwarding, Ring}, State) ->
NewState = update_forwarding_mode(Ring, State),
continue(NewState);
active(trigger_handoff, State) ->
maybe_handoff(State).
maybe_handoff(State);
active(unregistered, State=#state{mod=Mod, index=Index}) ->
%% Add exclusion so the ring handler will not try to spin this vnode
%% up until it receives traffic.
riak_core_handoff_manager:add_exclusion(Mod, Index),
lager:debug("~p ~p vnode excluded and unregistered.",
[Index, Mod]),
{stop, normal, State#state{handoff_node=none,
pool_pid=undefined,
handoff_pid=undefined}}.

active(_Event, _From, State) ->
Reply = ok,
Expand All @@ -315,37 +329,31 @@ active(_Event, _From, State) ->
finish_handoff(State=#state{mod=Mod,
modstate=ModState,
index=Idx,
pool_pid=Pool,
handoff_node=HN}) ->
handoff_node=HN,
pool_pid=Pool}) ->
case riak_core_gossip:finish_handoff(Idx, node(), HN, Mod) of
forward ->
case is_pid(Pool) of
true ->
riak_core_vnode_worker_pool:shutdown_pool(Pool, 60000);
_ ->
ok
end,
{ok, NewModState} = Mod:delete(ModState),
{stop, normal, State#state{modstate=NewModState,
handoff_node=none,
pool_pid=undefined,
handoff_pid=undefined}};
continue ->
continue(State#state{handoff_node=none,
handoff_pid=undefined});
shutdown ->
continue(State#state{handoff_node=none});
Res when Res == forward; Res == shutdown ->
%% Have to issue the delete now. Once unregistered the
%% vnode master will spin up a new vnode on demand.
%% Shutdown the async pool beforehand, don't want callbacks
%% running on non-existant data.
case is_pid(Pool) of
true ->
%% state.pool_pid will be cleaned up by handle_info message.
riak_core_vnode_worker_pool:shutdown_pool(Pool, 60000);
_ ->
ok
end,
{ok, NewModState} = Mod:delete(ModState),
riak_core_handoff_manager:add_exclusion(Mod, Idx),
{stop, normal, State#state{modstate=NewModState,
handoff_node=none,
pool_pid=undefined,
handoff_pid=undefined}}
lager:debug("~p ~p vnode finished handoff and deleted.",
[Idx, Mod]),
riak_core_vnode_master:unregister_vnode(Idx, Mod),
riak_core_vnode_manager:set_not_forwarding(self(), false),
continue(State#state{modstate={deleted,NewModState}, % like to fail if used
handoff_node=none,
forward=HN})
end.

handle_event(R={update_forwarding, _Ring}, _StateName, State) ->
Expand Down Expand Up @@ -373,8 +381,24 @@ handle_sync_event({handoff_data,BinObj}, _From, StateName,
State#state.inactivity_timeout}
end.

handle_info({'EXIT', Pid, _Reason}, _StateName, State=#state{handoff_pid=Pid}) ->
handle_info({'EXIT', Pid, Reason}, _StateName,
State=#state{mod=Mod, index=Index, handoff_pid=Pid}) ->
case Reason of
normal ->
ok;
_ ->
lager:error("~p ~p handoff crashed ~p\n", [Index, Mod, Reason])
end,
continue(State#state{handoff_pid=undefined});
handle_info({'EXIT', Pid, Reason}, _StateName,
State=#state{mod=Mod, index=Index, pool_pid=Pid}) ->
case Reason of
Reason when Reason == normal; Reason == shutdown ->
ok;
_ ->
lager:error("~p ~p worker pool crashed ~p\n", [Index, Mod, Reason])
end,
continue(State#state{pool_pid=undefined});

handle_info({'EXIT', Pid, Reason}, StateName, State=#state{mod=Mod,modstate=ModState}) ->
%% A linked processes has died so use the
Expand All @@ -387,8 +411,8 @@ handle_info({'EXIT', Pid, Reason}, StateName, State=#state{mod=Mod,modstate=ModS
{noreply,NewModState} ->
{next_state, StateName, State#state{modstate=NewModState},
State#state.inactivity_timeout};
{stop, Reason, NewModState} ->
{stop, Reason, State#state{modstate=NewModState}}
{stop, Reason1, NewModState} ->
{stop, Reason1, State#state{modstate=NewModState}}
end
catch
_ErrorType:undef ->
Expand All @@ -406,16 +430,23 @@ handle_info(Info, StateName, State=#state{mod=Mod,modstate=ModState}) ->
end.

terminate(Reason, _StateName, #state{mod=Mod, modstate=ModState,
pool_pid=Pool,index=Index}) ->
riak_core_vnode_master:unregister_vnode(Index,
riak_core_vnode_master:reg_name(Mod)),
case is_pid(Pool) of
pool_pid=Pool}) ->
%% Shutdown if the pool is still alive - there could be a race on
%% delivery of the unregistered event and successfully shutting
%% down the pool.
case is_pid(Pool) andalso is_process_alive(Pool) of
true ->
riak_core_vnode_worker_pool:shutdown_pool(Pool, 60000);
_ ->
ok
end,
Mod:terminate(Reason, ModState).
case ModState of
%% Handoff completed, Mod:delete has been called, now terminate.
{deleted, ModState1} ->
Mod:terminate(Reason, ModState1);
_ ->
Mod:terminate(Reason, ModState)
end.

code_change(_OldVsn, StateName, State, _Extra) ->
{ok, StateName, State}.
Expand Down
15 changes: 10 additions & 5 deletions src/riak_core_vnode_master.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
sync_command/4,
sync_spawn_command/3, make_request/3,
make_coverage_request/4,
unregister_vnode/2,
unregister_vnode/2, unregister_vnode/3,
all_nodes/1, reg_name/1, all_index_pid/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
Expand Down Expand Up @@ -136,8 +136,12 @@ make_coverage_request(Request, KeySpaces, Sender, Index) ->
sender=Sender,
request=Request}.

unregister_vnode(Index, VMaster) ->
gen_server:cast(VMaster, {unregister, Index}).
unregister_vnode(Index, VNodeMod) ->
unregister_vnode(Index, self(), VNodeMod).

unregister_vnode(Index, Pid, VNodeMod) ->
RegName = reg_name(VNodeMod),
gen_server:cast(RegName, {unregister, Index, Pid}).

%% Request a list of Pids for all vnodes
all_nodes(VNodeMod) ->
Expand Down Expand Up @@ -192,8 +196,9 @@ handle_cast(Req=?COVERAGE_REQ{index=Idx}, State) ->
Pid = get_vnode(Idx, State),
gen_fsm:send_event(Pid, Req),
{noreply, State};
handle_cast({unregister, Index}, #state{idxtab=T} = State) ->
ets:match_delete(T, {idxrec, Index, '_', '_'}),
handle_cast({unregister, Index, Pid}, #state{idxtab=T} = State) ->
ets:match_delete(T, {idxrec, Index, Pid, '_'}),
gen_fsm:send_event(Pid, unregistered),
{noreply, State};
handle_cast(Other, State=#state{legacy=Legacy}) when Legacy =/= undefined ->
case catch Legacy:rewrite_cast(Other) of
Expand Down
12 changes: 6 additions & 6 deletions test/new_cluster_membership_model_eqc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ handoff_all(State) ->
State3.

do_maybe(State, Cmd, Args) ->
case precondition(State, {call,join_eqc,Cmd,Args}) of
case precondition(State, {call,?MODULE,Cmd,Args}) of
true ->
run(State, {call,join_eqc,Cmd,Args});
run(State, {call,?MODULE,Cmd,Args});
false ->
State
end.
Expand All @@ -155,9 +155,9 @@ test_ring_convergence(State) ->
end.

do_gossip(State, N2, N1) ->
case precondition(State, {call,join_eqc,random_gossip,[N2,N1]}) of
case precondition(State, {call,?MODULE,random_gossip,[N2,N1]}) of
true ->
{true, run(State, {call,join_eqc,random_gossip,[N2,N1]})};
{true, run(State, {call,?MODULE,random_gossip,[N2,N1]})};
false ->
{false, State}
end.
Expand Down Expand Up @@ -1355,7 +1355,7 @@ ring_ready(CState0) ->
Seen = CState?CHSTATE.seen,
%% TODO: Should we add joining here?
%%Members = get_members(CState?CHSTATE.members, [joining, valid, leaving, exiting]),
Members = get_members(CState?CHSTATE.members, [valid, leaving]),
Members = get_members(CState?CHSTATE.members, [valid, leaving, exiting]),
VClock = CState?CHSTATE.vclock,
R = [begin
case orddict:find(Node, Seen) of
Expand Down Expand Up @@ -1454,7 +1454,7 @@ maybe_remove_exiting(State, Node, CState) ->
Claimant = CState?CHSTATE.claimant,
case Claimant of
Node ->
Exiting = get_members(CState?CHSTATE.members, [exiting]),
Exiting = get_members(CState?CHSTATE.members, [exiting]) -- [Node],
%%io:format("Claimant ~p removing exiting ~p~n", [Node, Exiting]),
Changed = (Exiting /= []),
{State2, CState2} =
Expand Down

0 comments on commit 9af7c8c

Please sign in to comment.