Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

Already on GitHub? Sign in to your account

Intermittent hang with handoff sender #153

Merged
merged 2 commits into from Mar 22, 2012
Jump to file or symbol
Failed to load files and symbols.
+51 −16
Split
@@ -27,6 +27,8 @@
-include("riak_core_vnode.hrl").
-include("riak_core_handoff.hrl").
-define(ACK_COUNT, 1000).
+%% can be set with env riak_core, handoff_timeout
+-define(TCP_TIMEOUT, 60000).
start_link(TargetNode, Module, Partition, VnodePid) ->
SslOpts = get_handoff_ssl_options(),
@@ -40,8 +42,6 @@ start_link(TargetNode, Module, Partition, VnodePid) ->
start_fold(TargetNode, Module, Partition, ParentPid, SslOpts) ->
try
- lager:info("Starting handoff of partition ~p ~p from ~p to ~p",
- [Module, Partition, node(), TargetNode]),
[_Name,Host] = string:tokens(atom_to_list(TargetNode), "@"),
{ok, Port} = get_handoff_port(TargetNode),
SockOpts = [binary, {packet, 4}, {header,1}, {active, false}],
@@ -65,18 +65,24 @@ start_fold(TargetNode, Module, Partition, ParentPid, SslOpts) ->
Msg = <<?PT_MSG_OLDSYNC:8,ModBin/binary>>,
ok = TcpMod:send(Socket, Msg),
+ RecvTimeout = get_handoff_receive_timeout(),
+
%% Now that handoff_concurrency applies to both outbound and
%% inbound conns there is a chance that the receiver may
%% decide to reject the senders attempt to start a handoff.
%% In the future this will be part of the actual wire
%% protocol but for now the sender must assume that a closed
%% socket at this point is a rejection by the receiver to
%% enforce handoff_concurrency.
- case TcpMod:recv(Socket, 0) of
+ case TcpMod:recv(Socket, 0, RecvTimeout) of
{ok,[?PT_MSG_OLDSYNC|<<"sync">>]} -> ok;
+ {error, timeout} -> exit({shutdown, timeout});
{error, closed} -> exit({shutdown, max_concurrency})
end,
+ lager:info("Starting handoff of partition ~p ~p from ~p to ~p",
+ [Module, Partition, node(), TargetNode]),
+
M = <<?PT_MSG_INIT:8,Partition:160/integer>>,
ok = TcpMod:send(Socket, M),
StartFoldTime = now(),
@@ -117,32 +123,41 @@ start_fold(TargetNode, Module, Partition, ParentPid, SslOpts) ->
{MRef, {Socket,ParentPid,Module,TcpMod,_Ack,SentCount,ErrStatus}} ->
- %% One last sync to make sure the message has been received.
- %% post-0.14 vnodes switch to handoff to forwarding immediately
- %% so handoff_complete can only be sent once all of the data is
- %% written. handle_handoff_data is a sync call, so once
- %% we receive the sync the remote side will be up to date.
- lager:debug("~p ~p Sending final sync", [Partition, Module]),
- ok = TcpMod:send(Socket, <<?PT_MSG_SYNC:8>>),
- {ok,[?PT_MSG_SYNC|<<"sync">>]} = TcpMod:recv(Socket, 0),
- lager:debug("~p ~p Final sync received", [Partition, Module]),
-
- EndFoldTime = now(),
- FoldTimeDiff = timer:now_diff(EndFoldTime, StartFoldTime) / 1000000,
case ErrStatus of
ok ->
+ %% One last sync to make sure the message has been received.
+ %% post-0.14 vnodes switch to handoff to forwarding immediately
+ %% so handoff_complete can only be sent once all of the data is
+ %% written. handle_handoff_data is a sync call, so once
+ %% we receive the sync the remote side will be up to date.
+ lager:debug("~p ~p Sending final sync", [Partition, Module]),
+ ok = TcpMod:send(Socket, <<?PT_MSG_SYNC:8>>),
+
+ case TcpMod:recv(Socket, 0, RecvTimeout) of
+ {ok,[?PT_MSG_SYNC|<<"sync">>]} ->
+ lager:debug("~p ~p Final sync received", [Partition, Module]);
+ {error, timeout} -> exit({shutdown, timeout})
+ end,
+
+ FoldTimeDiff = end_fold_time(StartFoldTime),
+
lager:info("Handoff of partition ~p ~p from ~p to ~p "
"completed: sent ~p objects in ~.2f "
"seconds",
[Module, Partition, node(), TargetNode,
SentCount, FoldTimeDiff]),
gen_fsm:send_event(ParentPid, handoff_complete);
{error, ErrReason} ->
+ FoldTimeDiff = end_fold_time(StartFoldTime),
lager:error("Handoff of partition ~p ~p from ~p to ~p "
"FAILED after sending ~p objects "
"in ~.2f seconds: ~p",
[Module, Partition, node(), TargetNode,
SentCount, FoldTimeDiff, ErrReason]),
+ if ErrReason == timeout ->
+ riak_core_stat:update(handoff_timeouts);
+ true -> ok
+ end,
gen_fsm:send_event(ParentPid, {handoff_error,
fold_error, ErrReason})
end
@@ -153,6 +168,12 @@ start_fold(TargetNode, Module, Partition, ParentPid, SslOpts) ->
%% of handoff_concurrency. You don't want to log
%% anything because this is normal.
ok;
+ exit:{shutdown, timeout} ->
+ %% A receive timeout during handoff
+ riak_core_stat:update(handoff_timeouts),
+ lager:warning(
+ "TCP recv timeout in handoff of partition ~p ~p from ~p to ~p",
+ [Module, Partition, node(), TargetNode]);
Err:Reason ->
lager:error("Handoff of partition ~p ~p from ~p to ~p failed ~p:~p",
[Module, Partition, node(), TargetNode,
@@ -166,10 +187,11 @@ visit_item(_K, _V, {Socket, ParentPid, Module, TcpMod, Ack, Total,
{error, Reason}}) ->
{Socket, ParentPid, Module, TcpMod, Ack, Total, {error, Reason}};
visit_item(K, V, {Socket, ParentPid, Module, TcpMod, ?ACK_COUNT, Total, _Err}) ->
+ RecvTimeout = get_handoff_receive_timeout(),
M = <<?PT_MSG_OLDSYNC:8,"sync">>,
case TcpMod:send(Socket, M) of
ok ->
- case TcpMod:recv(Socket, 0) of
+ case TcpMod:recv(Socket, 0, RecvTimeout) of
{ok,[?PT_MSG_OLDSYNC|<<"sync">>]} ->
visit_item(K, V, {Socket, ParentPid, Module, TcpMod, 0, Total, ok});
{error, Reason} ->
@@ -223,3 +245,10 @@ get_handoff_ssl_options() ->
[]
end
end.
+
+get_handoff_receive_timeout() ->
+ app_helper:get_env(riak_core, handoff_timeout, ?TCP_TIMEOUT).
+
+end_fold_time(StartFoldTime) ->
+ EndFoldTime = now(),
+ timer:now_diff(EndFoldTime, StartFoldTime) / 1000000.
View
@@ -42,6 +42,7 @@
ignored_gossip_total :: integer(),
rings_reconciled_total :: integer(),
rejected_handoffs :: integer(),
+ handoff_timeouts :: integer(),
gossip_received :: spiraltime:spiral(),
rings_reconciled :: spiraltime:spiral(),
converge_epoch :: calendar:t_now(),
@@ -79,6 +80,7 @@ init([]) ->
{ok, #state{ignored_gossip_total=0,
rings_reconciled_total=0,
rejected_handoffs=0,
+ handoff_timeouts=0,
gossip_received=spiraltime:fresh(),
rings_reconciled=spiraltime:fresh(),
converge_delay=#cuml{},
@@ -137,6 +139,9 @@ update(rebalance_timer_end, _Moment, State=#state{rebalance_epoch=T0}) ->
update(rejected_handoffs, _Moment, State) ->
int_incr(#state.rejected_handoffs, State);
+update(handoff_timeouts, _Moment, State) ->
+ int_incr(#state.handoff_timeouts, State);
+
update(ignored_gossip, _Moment, State) ->
int_incr(#state.ignored_gossip_total, State);
@@ -195,6 +200,7 @@ gossip_stats(Moment, State=#state{converge_delay=CDelay,
{rings_reconciled_total, State#state.rings_reconciled_total},
{rings_reconciled, spiral_minute(Moment, #state.rings_reconciled, State)},
{gossip_received, spiral_minute(Moment, #state.gossip_received, State)},
+ {handoff_timeouts, State#state.handoff_timeouts},
{converge_delay_min, CDelay#cuml.min},
{converge_delay_max, CDelay#cuml.max},
{converge_delay_mean, CDelay#cuml.mean},