Permalink
Browse files

bz://716 : graceful riak_core_handoff_sender failure on crash of rece…

…iving node
  • Loading branch information...
1 parent 8ff2769 commit 83dd57b78250d2d61369371325b649f9eb2fbc9e @argv0 argv0 committed with Vagabond Apr 5, 2011
Showing with 67 additions and 19 deletions.
  1. +50 −16 src/riak_core_handoff_sender.erl
  2. +17 −3 src/riak_core_vnode.erl
@@ -58,35 +58,69 @@ start_fold(TargetNode, Module, Partition, ParentPid) ->
M = <<?PT_MSG_INIT:8,Partition:160/integer>>,
ok = gen_tcp:send(Socket, M),
StartFoldTime = now(),
- {Socket,ParentPid,Module,_Ack,SentCount} =
+ {Socket,ParentPid,Module,_Ack,SentCount,ErrStatus} =
riak_core_vnode_master:sync_command({Partition, node()},
?FOLD_REQ{
foldfun=fun visit_item/3,
- acc0={Socket,ParentPid,Module,0,0}},
+ acc0={Socket,ParentPid,Module,0,0,ok}},
VMaster, infinity),
EndFoldTime = now(),
- error_logger:info_msg("Handoff of partition ~p ~p to ~p completed: sent ~p objects in ~.2f seconds",
- [Module, Partition, TargetNode, SentCount,
- timer:now_diff(EndFoldTime, StartFoldTime) / 1000000]),
- gen_fsm:send_event(ParentPid, handoff_complete)
- %% Socket will be closed when this process exits
+ case ErrStatus of
+ ok ->
+ error_logger:info_msg("Handoff of partition ~p ~p to ~p "
+ "completed: sent ~p objects in ~.2f "
+ "seconds\n",
+ [Module, Partition, TargetNode,
+ SentCount,
+ timer:now_diff(
+ EndFoldTime,
+ StartFoldTime) / 1000000]),
+ gen_fsm:send_event(ParentPid, handoff_complete);
+ {error, ErrReason} ->
+ error_logger:error_msg("Handoff of partition ~p ~p to ~p "
+ "FAILED: (~p) after sending ~p objects "
+ "in ~.2f seconds\n",
+ [Module, Partition, TargetNode,
+ ErrReason, SentCount,
+ timer:now_diff(
+ EndFoldTime,
+ StartFoldTime) / 1000000]),
+ gen_fsm:send_event(ParentPid, {handoff_error,
+ fold_error, ErrReason})
+ end
catch
Err:Reason ->
error_logger:error_msg("Handoff sender ~p ~p failed ~p:~p\n",
- [Module, Partition, Err,Reason])
+ [Module, Partition, Err,Reason]),
+ gen_fsm:send_event(ParentPid, {handoff_error, Err, Reason})
end.
-visit_item(K, V, {Socket, ParentPid, Module, ?ACK_COUNT, Total}) ->
+%% When a tcp error occurs, the ErrStatus argument is set to {error, Reason}.
+%% Since we can't abort the fold, this clause is just a no-op.
+visit_item(_K, _V, {Socket, ParentPid, Module, Ack, Total, {error, Reason}}) ->
+ {Socket, ParentPid, Module, Ack, Total, {error, Reason}};
+visit_item(K, V, {Socket, ParentPid, Module, ?ACK_COUNT, Total, _Err}) ->
M = <<?PT_MSG_OLDSYNC:8,"sync">>,
- ok = gen_tcp:send(Socket, M),
- {ok,[?PT_MSG_OLDSYNC|<<"sync">>]} = gen_tcp:recv(Socket, 0),
- visit_item(K, V, {Socket, ParentPid, Module, 0, Total});
-visit_item(K, V, {Socket, ParentPid, Module, Ack, Total}) ->
+ case gen_tcp:send(Socket, M) of
+ ok ->
+ case gen_tcp:recv(Socket, 0) of
+ {ok,[?PT_MSG_OLDSYNC|<<"sync">>]} ->
+ visit_item(K, V, {Socket, ParentPid, Module, 0, Total, ok});
+ {error, Reason} ->
+ {Socket, ParentPid, Module, 0, Total, {error, Reason}}
+ end;
+ {error, Reason} ->
+ {Socket, ParentPid, Module, 0, Total, {error, Reason}}
+ end;
+visit_item(K, V, {Socket, ParentPid, Module, Ack, Total, _ErrStatus}) ->
BinObj = Module:encode_handoff_item(K, V),
M = <<?PT_MSG_OBJ:8,BinObj/binary>>,
- ok = gen_tcp:send(Socket, M),
- {Socket, ParentPid, Module, Ack+1, Total+1}.
-
+ case gen_tcp:send(Socket, M) of
+ ok ->
+ {Socket, ParentPid, Module, Ack+1, Total+1, ok};
+ {error, Reason} ->
+ {Socket, ParentPid, Module, Ack, Total, {error, Reason}}
+ end.
get_handoff_port(Node) when is_atom(Node) ->
case catch(gen_server2:call({riak_core_handoff_listener, Node}, handoff_port, infinity)) of
@@ -77,6 +77,7 @@ behaviour_info(_Other) ->
modstate :: term(),
handoff_token :: non_neg_integer(),
handoff_node=none :: none | node(),
+ handoff_pid :: pid(),
inactivity_timeout}).
start_link(Mod, Index) ->
@@ -174,7 +175,18 @@ active(handoff_complete, State=#state{mod=Mod,
Mod:handoff_finished(HN, ModState),
{ok, NewModState} = Mod:delete(ModState),
riak_core_handoff_manager:add_exclusion(Mod, Idx),
- {stop, normal, State#state{modstate=NewModState, handoff_node=none}}.
+ {stop, normal, State#state{modstate=NewModState,
+ handoff_node=none,
+ handoff_pid=undefined}};
+active({handoff_error, _Err, _Reason}, State=#state{mod=Mod,
+ modstate=ModState,
+ index=Idx,
+ handoff_token=HT}) ->
+ riak_core_handoff_manager:release_handoff_lock({Mod, Idx}, HT),
+ %% it would be nice to pass {Err, Reason} to the vnode but the
+ %% API doesn't currently allow for that.
+ Mod:handoff_cancelled(ModState),
+ continue(State#state{handoff_node=none}).
active(_Event, _From, State) ->
Reply = ok,
@@ -198,6 +210,8 @@ handle_sync_event({handoff_data,BinObj}, _From, StateName,
State#state.inactivity_timeout}
end.
+handle_info({'EXIT', Pid, _Reason}, _StateName, State=#state{handoff_pid=Pid}) ->
+ continue(State#state{handoff_pid=undefined});
handle_info({'EXIT', Pid, Reason}, StateName, State=#state{mod=Mod}) ->
%% A linked processes has died so use the
%% handle_exit callback to allow the vnode
@@ -254,8 +268,8 @@ start_handoff(State=#state{index=Idx, mod=Mod, modstate=ModState}, TargetNode) -
NewState = State#state{modstate=NewModState,
handoff_token=HandoffToken,
handoff_node=TargetNode},
- riak_core_handoff_sender:start_link(TargetNode, Mod, Idx),
- continue(NewState)
+ {ok, HandoffPid} = riak_core_handoff_sender:start_link(TargetNode, Mod, Idx),
+ continue(NewState#state{handoff_pid=HandoffPid})
end
end.

0 comments on commit 83dd57b

Please sign in to comment.