Skip to content

Commit

Permalink
Support async response from handle_handoff_command.
Browse files Browse the repository at this point in the history
  • Loading branch information
kellymclaughlin committed Sep 6, 2011
1 parent d428c15 commit f64caaa
Showing 1 changed file with 37 additions and 31 deletions.
68 changes: 37 additions & 31 deletions src/riak_core_vnode.erl
Expand Up @@ -24,13 +24,13 @@
start_link/3,
send_command/2,
send_command_after/2]).
-export([init/1,
active/2,
active/3,
-export([init/1,
active/2,
active/3,
handle_event/3,
handle_sync_event/4,
handle_info/3,
terminate/3,
handle_sync_event/4,
handle_info/3,
terminate/3,
code_change/4]).
-export([reply/2]).
-export([get_mod_index/1,
Expand All @@ -55,7 +55,7 @@ behaviour_info(callbacks) ->
behaviour_info(_Other) ->
undefined.

%% handle_exit/3 is an optional behaviour callback that can be implemented.
%% handle_exit/3 is an optional behaviour callback that can be implemented.
%% It will be called in the case that a process that is linked to the vnode
%% process dies and allows the module using the behaviour to take appropriate
%% action. It is called by handle_info when it receives an {'EXIT', Pid, Reason}
Expand Down Expand Up @@ -98,18 +98,18 @@ start_link(Mod, Index) ->
start_link(Mod, Index, InitialInactivityTimeout) ->
gen_fsm:start_link(?MODULE, [Mod, Index, InitialInactivityTimeout], []).

%% Send a command message for the vnode module by Pid -
%% Send a command message for the vnode module by Pid -
%% typically to do some deferred processing after returning yourself
send_command(Pid, Request) ->
gen_fsm:send_event(Pid, ?VNODE_REQ{request=Request}).


%% Sends a command to the FSM that called it after Time
%% Sends a command to the FSM that called it after Time
%% has passed.
-spec send_command_after(integer(), term()) -> reference().
send_command_after(Time, Request) ->
gen_fsm:send_event_after(Time, ?VNODE_REQ{request=Request}).


init([Mod, Index, InitialInactivityTimeout]) ->
%%TODO: Should init args really be an array if it just gets Init?
Expand Down Expand Up @@ -248,17 +248,23 @@ vnode_coverage(Sender, Request, KeySpaces, State=#state{index=Index,
end.

vnode_handoff_command(Sender, Request, State=#state{index=Index,
mod=Mod,
modstate=ModState,
handoff_node=HN}) ->
mod=Mod,
modstate=ModState,
handoff_node=HN,
pool_pid=Pool}) ->
case Mod:handle_handoff_command(Request, Sender, ModState) of
{reply, Reply, NewModState} ->
reply(Sender, Reply),
continue(State, NewModState);
{noreply, NewModState} ->
continue(State, NewModState);
{async, Work, From, NewModState} ->
%% dispatch some work to the vnode worker pool
%% the result is sent back to 'From'
riak_core_vnode_worker_pool:handle_work(Pool, Work, From),
continue(State, NewModState);
{forward, NewModState} ->
riak_core_vnode_master:command({Index, HN}, Request, Sender,
riak_core_vnode_master:command({Index, HN}, Request, Sender,
riak_core_vnode_master:reg_name(Mod)),
continue(State, NewModState);
{drop, NewModState} ->
Expand All @@ -269,7 +275,7 @@ vnode_handoff_command(Sender, Request, State=#state{index=Index,

active(timeout, State) ->
maybe_handoff(State);
active(?COVERAGE_REQ{keyspaces=KeySpaces,
active(?COVERAGE_REQ{keyspaces=KeySpaces,
request=Request,
sender=Sender},
State=#state{handoff_node=HN}) when HN =:= none ->
Expand All @@ -279,20 +285,20 @@ active(?VNODE_REQ{sender=Sender, request=Request},
vnode_command(Sender, Request, State);
active(?VNODE_REQ{sender=Sender, request=Request},State) ->
vnode_handoff_command(Sender, Request, State);
active(handoff_complete, State=#state{mod=Mod,
active(handoff_complete, State=#state{mod=Mod,
modstate=ModState,
index=Idx,
index=Idx,
handoff_node=HN,
handoff_token=HT}) ->
riak_core_handoff_manager:release_handoff_lock({Mod, Idx}, HT),
Mod:handoff_finished(HN, ModState),
finish_handoff(State);
active({handoff_error, _Err, _Reason}, State=#state{mod=Mod,
active({handoff_error, _Err, _Reason}, State=#state{mod=Mod,
modstate=ModState,
index=Idx,
index=Idx,
handoff_token=HT}) ->
riak_core_handoff_manager:release_handoff_lock({Mod, Idx}, HT),
%% it would be nice to pass {Err, Reason} to the vnode but the
%% it would be nice to pass {Err, Reason} to the vnode but the
%% API doesn't currently allow for that.
Mod:handoff_cancelled(ModState),
continue(State#state{handoff_node=none});
Expand All @@ -306,9 +312,9 @@ active(_Event, _From, State) ->
Reply = ok,
{reply, Reply, active, State, State#state.inactivity_timeout}.

finish_handoff(State=#state{mod=Mod,
finish_handoff(State=#state{mod=Mod,
modstate=ModState,
index=Idx,
index=Idx,
handoff_node=HN}) ->
case riak_core_gossip:finish_handoff(Idx, node(), HN, Mod) of
forward ->
Expand Down Expand Up @@ -340,15 +346,15 @@ handle_event(R=?COVERAGE_REQ{}, _StateName, State) ->
handle_sync_event(get_mod_index, _From, StateName,
State=#state{index=Idx,mod=Mod}) ->
{reply, {Mod, Idx}, StateName, State, State#state.inactivity_timeout};
handle_sync_event({handoff_data,BinObj}, _From, StateName,
handle_sync_event({handoff_data,BinObj}, _From, StateName,
State=#state{mod=Mod, modstate=ModState}) ->
case Mod:handle_handoff_data(BinObj, ModState) of
{reply, ok, NewModState} ->
{reply, ok, StateName, State#state{modstate=NewModState},
State#state.inactivity_timeout};
{reply, {error, Err}, NewModState} ->
lager:error("~p failed to store handoff obj: ~p", [Mod, Err]),
{reply, {error, Err}, StateName, State#state{modstate=NewModState},
{reply, {error, Err}, StateName, State#state{modstate=NewModState},
State#state.inactivity_timeout}
end.

Expand All @@ -357,7 +363,7 @@ handle_info({'EXIT', Pid, _Reason}, _StateName, State=#state{handoff_pid=Pid}) -

handle_info({'EXIT', Pid, Reason}, StateName, State=#state{mod=Mod,modstate=ModState}) ->
%% A linked processes has died so use the
%% handle_exit callback to allow the vnode
%% handle_exit callback to allow the vnode
%% process to take appropriate action.
%% If the function is not implemented default
%% to crashing the process.
Expand Down Expand Up @@ -432,7 +438,7 @@ should_handoff(#state{index=Idx, mod=Mod}) ->
case app_for_vnode_module(Mod) of
undefined -> false;
{ok, App} ->
case lists:member(TargetNode,
case lists:member(TargetNode,
riak_core_node_watcher:nodes(App)) of
false -> false;
true -> {true, TargetNode}
Expand All @@ -445,28 +451,28 @@ start_handoff(State=#state{index=Idx, mod=Mod, modstate=ModState}, TargetNode) -
{true, NewModState} ->
finish_handoff(State#state{modstate=NewModState,
handoff_node=TargetNode});
{false, NewModState} ->
{false, NewModState} ->
case riak_core_handoff_manager:get_handoff_lock({Mod, Idx}) of
{error, max_concurrency} ->
{ok, NewModState1} = Mod:handoff_cancelled(NewModState),
NewState = State#state{modstate=NewModState1},
{next_state, active, NewState, ?LOCK_RETRY_TIMEOUT};
{ok, {handoff_token, HandoffToken}} ->
NewState = State#state{modstate=NewModState,
NewState = State#state{modstate=NewModState,
handoff_token=HandoffToken,
handoff_node=TargetNode},
{ok, HandoffPid} = riak_core_handoff_sender:start_link(TargetNode, Mod, Idx),
continue(NewState#state{handoff_pid=HandoffPid})
end
end.


%% @doc Send a reply to a vnode request. If

%% @doc Send a reply to a vnode request. If
%% the Ref is undefined just send the reply
%% for compatibility with pre-0.12 requestors.
%% If Ref is defined, send it along with the
%% reply.
%%
%%
-spec reply(sender(), term()) -> true.
reply({fsm, undefined, From}, Reply) ->
gen_fsm:send_event(From, Reply);
Expand Down

0 comments on commit f64caaa

Please sign in to comment.