Skip to content
This repository

Intermittent hang with handoff sender #153

Merged
merged 2 commits into from about 2 years ago

3 participants

Jon Meredith Russell Brown Ryan Zezeski
Jon Meredith
Owner

On a 1.1.1 cluster we've seen multiple cases of handoff sender processes hanging when the TCP socket has gone away.

Polling riak_core_handoff_manager:status() on all the nodes in a cluster we saw this - with no TCP connections alive

[{'riak@1.2.3.195',[]},
 {'riak@1.2.3.197',[]},
 {'riak@1.2.3.199',[{{riak_kv_vnode,713623846352979940529142984724747568191373312000},
                               'riak@1.2.3.197',outbound,active,[]}]},
 {'riak@1.2.3.201',[{{riak_kv_vnode,359666418561901890026688064301272774368452149248},
                               'riak@1.2.3.197',outbound,active,[]}]},
 {'riak@1.2.3.203',[{{riak_kv_vnode,536645132457440915277915524513010171279912730624},
                               'riak@1.2.3.197',outbound,active,[]}]},
 {'riak@1.2.3.205',[]},
 {'riak@1.2.3.207',[]},
 {'riak@1.2.3.209',[]},
 {'riak@1.2.3.211',[{{riak_kv_vnode,108470824645652950960429733678161630365088743424},
                               'riak@1.2.4.214',outbound,active,[]}]},
 {'riak@1.2.3.213',[]},
 {'riak@1.2.3.215',[]},
 {'riak@1.2.3.217',[{{riak_kv_vnode,171269723124715185726994316333939416365929594880},
                               'riak@1.2.4.254',outbound,active,[]}]},
 {'riak@1.2.3.219',[]},
 {'riak@1.2.4.208',[{{riak_kv_vnode,804967698686161372916873286769515256919869095936},
                              'riak@1.2.4.214',outbound,active,[]}]},
 {'riak@1.2.4.210',[{{riak_kv_vnode,753586781748746817198774991869333432010090217472},
                              'riak@1.2.4.208',outbound,active,[]}]},
 {'riak@1.2.4.212',[{{riak_kv_vnode,576608067853207791947547531657596035098629636096},
                              'riak@1.2.4.208',outbound,active,[]}]},
 {'riak@1.2.4.214',[]},
 {'riak@1.2.4.216',[]},
 {'riak@1.2.4.218',[]},
 {'riak@1.2.4.220',[]},
 {'riak@1.2.4.222',[]},
 {'riak@1.2.4.224',[]},
 {'riak@1.2.4.226',[]},
 {'riak@1.2.4.228',[]},
 {'riak@1.2.4.230',[]},
 {'riak@1.2.4.232',[]},
 {'riak@1.2.4.234',[{{riak_kv_vnode,639406966332270026714112114313373821099470487552},
                              'riak@1.2.3.199',outbound,active,[]}]},
 {'riak@1.2.4.236',[]},
 {'riak@1.2.4.238',[]},
 {'riak@1.2.4.240',[]},
 {'riak@1.2.4.242',[]},
 {'riak@1.2.4.244',[]},
 {'riak@1.2.4.246',[]},
 {'riak@1.2.4.248',[]},
 {'riak@1.2.4.250',[]},
 {'riak@1.2.4.252',[]},
 {'riak@1.2.4.254',[]}]
(riaksearch@10.28.60.208)2> io:format("~s\n", [element(2, erlang:process_info(pid(0,15806,280), backtrace))]).  
Program counter: 0x00007f96f70c2bf8 (prim_inet:recv0/3 + 224)
CP: 0x0000000000000000 (invalid)
arity = 0

0x00007f912bd180a0 Return addr 0x00007f96e933cdb0 (riak_core_handoff_sender:start_fold/5 + 1936)
y(0)     4394
y(1)     #Port<0.142646101>

0x00007f912bd180b8 Return addr 0x0000000000870018 (<terminate process normally>)
y(0)     []
y(1)     []
y(2)     []
y(3)     riak_kv_vnode_master
y(4)     #Port<0.142646101>
y(5)     gen_tcp
y(6)     <0.31027.245>
y(7)     804967698686161372916873286769515256919869095936
y(8)     riak_kv_vnode
y(9)     'riaksearch@10.28.60.214'
y(10)    Catch 0x00007f96e933e2e0 (riak_core_handoff_sender:start_fold/5 + 7360)

Digging the TCP port information

(riaksearch@10.28.60.208)10> erlang:port_info(Port).
[{name,"tcp_inet"},
 {links,[<0.15806.280>]},
 {id,142646101},
 {connected,<0.15806.280>},
 {input,0},
 {output,14}]

Looks like it only output 14 bytes which would match with being stuck at the gen_tcp:recv here https://github.com/basho/riak_core/blob/1.1/src/riak_core_handoff_sender.erl#L75

11> size(<<1:8,(atom_to_binary(riak_kv_vnode, utf8))/binary>>).
14
Jon Meredith
Owner

If handoff seems stuck on a cluster, you can verify if the issue is present by running this from the Riak console (after riak attach, press ^D to disconnect)

f(Members).
Members = riak_core_ring:all_members(element(2, riak_core_ring_manager:get_raw_ring())).
[{N, rpc:call(N, riak_core_handoff_manager, status, [])} || N <- Members].

If the output has more outbound than inbound connections then some are probably stuck. While we work on a patch for this issue, you can unstick the handoffs by disabling and re-enabling handoffs across the cluster (this example shows 2 handoffs per-node).

f(Members).
Members = riak_core_ring:all_members(element(2, riak_core_ring_manager:get_raw_ring())).
rp(rpc:multicall(Members, riak_core_handoff_manager, set_concurrency, [0])). 
rp(rpc:multicall(Members, riak_core_handoff_manager, set_concurrency, [2])). 

Any stuck handoff should resume within about a minute and you should be able to verify with the status call

f(Members).
Members = riak_core_ring:all_members(element(2, riak_core_ring_manager:get_raw_ring())).
[{N, rpc:call(N, riak_core_handoff_manager, status, [])} || N <- Members].
Jon Meredith
Owner

See also customer issues zd://1091 and zd://1081

Russell Brown Add timeout to all handoff sender's receives
Don't bother sending the final 'sync' message if handoff failed
5247f79
Russell Brown
Collaborator

Note: also fixes #152

Verified by manually testing as follows:

Lowered the riak_core_handoff_timeout to 1000, added a 2000 timer:sleep() to handoff_receiver. First at the initial sync, then in the final sync.

Finally to test visit_item timeout I reduced the ?ACK count to 5 in handoff_sender and added a call_count to the receiver state, when the appropriate sync call was processed I increment that count, and added a case block that adds a timer:sleep(2000) after the first sync all.

Can probably provide code as diffs if it helps.

Ryan Zezeski
Collaborator

What about the send calls? Reading the gen_tcp docs send also waits for an indefinite amount of time by default.

Jon Meredith
Owner

The send calls only block if the TCP buffer is filled. Repl has a back pressure system that should prevent any permanent deadlocking, so I'm not too worried about it for this PR.

Jon Meredith
Owner

The issue this is really guarding against observed behavior where the socket has gone away at the kernel level but erlang doesn't notice for some reason, so we sit receiving forever. Each time we've seen this (on repl and now in handoff) it has only been stuck in a recv call.

Russell Brown
Collaborator

Repl?

That's a shame, cos I went ahead and made the changes…is up to you if we keep them or not.

Russell Brown Correct env var name
Fix typo in timeout var name
cd33460
Ryan Zezeski
Collaborator

+1 to merge

I've verified timeouts for initial start, finish and during handoff. All three worked. I saw no processes leaks. During testing I did find an issue but it is orthogonal to this change: #154.

Russell Brown russelldb merged commit 6617c2a into from March 22, 2012
Russell Brown russelldb closed this March 22, 2012
Jon Meredith
Owner

Also applied commits again 1.1 branch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Showing 2 unique commits by 1 author.

Mar 16, 2012
Russell Brown Add timeout to all handoff sender's receives
Don't bother sending the final 'sync' message if handoff failed
5247f79
Mar 21, 2012
Russell Brown Correct env var name
Fix typo in timeout var name
cd33460
This page is out of date. Refresh to see the latest.
61  src/riak_core_handoff_sender.erl
@@ -27,6 +27,8 @@
27 27
 -include("riak_core_vnode.hrl").
28 28
 -include("riak_core_handoff.hrl").
29 29
 -define(ACK_COUNT, 1000).
  30
+%% can be set with env riak_core, handoff_timeout
  31
+-define(TCP_TIMEOUT, 60000).
30 32
 
31 33
 start_link(TargetNode, Module, Partition, VnodePid) ->
32 34
     SslOpts = get_handoff_ssl_options(),
@@ -40,8 +42,6 @@ start_link(TargetNode, Module, Partition, VnodePid) ->
40 42
 
41 43
 start_fold(TargetNode, Module, Partition, ParentPid, SslOpts) ->
42 44
      try
43  
-         lager:info("Starting handoff of partition ~p ~p from ~p to ~p",
44  
-                               [Module, Partition, node(), TargetNode]),
45 45
          [_Name,Host] = string:tokens(atom_to_list(TargetNode), "@"),
46 46
          {ok, Port} = get_handoff_port(TargetNode),
47 47
          SockOpts = [binary, {packet, 4}, {header,1}, {active, false}],
@@ -65,6 +65,8 @@ start_fold(TargetNode, Module, Partition, ParentPid, SslOpts) ->
65 65
          Msg = <<?PT_MSG_OLDSYNC:8,ModBin/binary>>,
66 66
          ok = TcpMod:send(Socket, Msg),
67 67
 
  68
+         RecvTimeout = get_handoff_receive_timeout(),
  69
+
68 70
          %% Now that handoff_concurrency applies to both outbound and
69 71
          %% inbound conns there is a chance that the receiver may
70 72
          %% decide to reject the senders attempt to start a handoff.
@@ -72,11 +74,15 @@ start_fold(TargetNode, Module, Partition, ParentPid, SslOpts) ->
72 74
          %% protocol but for now the sender must assume that a closed
73 75
          %% socket at this point is a rejection by the receiver to
74 76
          %% enforce handoff_concurrency.
75  
-         case TcpMod:recv(Socket, 0) of
  77
+         case TcpMod:recv(Socket, 0, RecvTimeout) of
76 78
              {ok,[?PT_MSG_OLDSYNC|<<"sync">>]} -> ok;
  79
+             {error, timeout} -> exit({shutdown, timeout});
77 80
              {error, closed} -> exit({shutdown, max_concurrency})
78 81
          end,
79 82
 
  83
+         lager:info("Starting handoff of partition ~p ~p from ~p to ~p",
  84
+                    [Module, Partition, node(), TargetNode]),
  85
+
80 86
          M = <<?PT_MSG_INIT:8,Partition:160/integer>>,
81 87
          ok = TcpMod:send(Socket, M),
82 88
          StartFoldTime = now(),
@@ -117,20 +123,24 @@ start_fold(TargetNode, Module, Partition, ParentPid, SslOpts) ->
117 123
 
118 124
              {MRef, {Socket,ParentPid,Module,TcpMod,_Ack,SentCount,ErrStatus}} ->
119 125
 
120  
-                 %% One last sync to make sure the message has been received.
121  
-                 %% post-0.14 vnodes switch to handoff to forwarding immediately
122  
-                 %% so handoff_complete can only be sent once all of the data is
123  
-                 %% written.  handle_handoff_data is a sync call, so once
124  
-                 %% we receive the sync the remote side will be up to date.
125  
-                 lager:debug("~p ~p Sending final sync", [Partition, Module]),
126  
-                 ok = TcpMod:send(Socket, <<?PT_MSG_SYNC:8>>),
127  
-                 {ok,[?PT_MSG_SYNC|<<"sync">>]} = TcpMod:recv(Socket, 0),
128  
-                 lager:debug("~p ~p Final sync received", [Partition, Module]),
129  
-
130  
-                 EndFoldTime = now(),
131  
-                 FoldTimeDiff = timer:now_diff(EndFoldTime, StartFoldTime) / 1000000,
132 126
                  case ErrStatus of
133 127
                      ok ->
  128
+                         %% One last sync to make sure the message has been received.
  129
+                         %% post-0.14 vnodes switch to handoff to forwarding immediately
  130
+                         %% so handoff_complete can only be sent once all of the data is
  131
+                         %% written.  handle_handoff_data is a sync call, so once
  132
+                         %% we receive the sync the remote side will be up to date.
  133
+                         lager:debug("~p ~p Sending final sync", [Partition, Module]),
  134
+                         ok = TcpMod:send(Socket, <<?PT_MSG_SYNC:8>>),
  135
+
  136
+                         case TcpMod:recv(Socket, 0, RecvTimeout) of
  137
+                             {ok,[?PT_MSG_SYNC|<<"sync">>]} ->
  138
+                                 lager:debug("~p ~p Final sync received", [Partition, Module]);
  139
+                             {error, timeout} -> exit({shutdown, timeout})
  140
+                         end,
  141
+
  142
+                         FoldTimeDiff = end_fold_time(StartFoldTime),
  143
+
134 144
                          lager:info("Handoff of partition ~p ~p from ~p to ~p "
135 145
                                     "completed: sent ~p objects in ~.2f "
136 146
                                     "seconds",
@@ -138,11 +148,16 @@ start_fold(TargetNode, Module, Partition, ParentPid, SslOpts) ->
138 148
                                      SentCount, FoldTimeDiff]),
139 149
                          gen_fsm:send_event(ParentPid, handoff_complete);
140 150
                      {error, ErrReason} ->
  151
+                         FoldTimeDiff = end_fold_time(StartFoldTime),
141 152
                          lager:error("Handoff of partition ~p ~p from ~p to ~p "
142 153
                                      "FAILED after sending ~p objects "
143 154
                                      "in ~.2f seconds: ~p",
144 155
                                      [Module, Partition, node(), TargetNode,
145 156
                                       SentCount, FoldTimeDiff, ErrReason]),
  157
+                         if ErrReason == timeout ->
  158
+                                 riak_core_stat:update(handoff_timeouts);
  159
+                            true -> ok
  160
+                         end,
146 161
                          gen_fsm:send_event(ParentPid, {handoff_error,
147 162
                                                         fold_error, ErrReason})
148 163
                  end
@@ -153,6 +168,12 @@ start_fold(TargetNode, Module, Partition, ParentPid, SslOpts) ->
153 168
              %% of handoff_concurrency.  You don't want to log
154 169
              %% anything because this is normal.
155 170
              ok;
  171
+         exit:{shutdown, timeout} ->
  172
+             %% A receive timeout during handoff
  173
+             riak_core_stat:update(handoff_timeouts),
  174
+             lager:warning(
  175
+               "TCP recv timeout in handoff of partition ~p ~p from ~p to ~p",
  176
+               [Module, Partition, node(), TargetNode]);
156 177
          Err:Reason ->
157 178
              lager:error("Handoff of partition ~p ~p from ~p to ~p failed ~p:~p",
158 179
                          [Module, Partition, node(), TargetNode,
@@ -166,10 +187,11 @@ visit_item(_K, _V, {Socket, ParentPid, Module, TcpMod, Ack, Total,
166 187
                     {error, Reason}}) ->
167 188
     {Socket, ParentPid, Module, TcpMod, Ack, Total, {error, Reason}};
168 189
 visit_item(K, V, {Socket, ParentPid, Module, TcpMod, ?ACK_COUNT, Total, _Err}) ->
  190
+    RecvTimeout = get_handoff_receive_timeout(),
169 191
     M = <<?PT_MSG_OLDSYNC:8,"sync">>,
170 192
     case TcpMod:send(Socket, M) of
171 193
         ok ->
172  
-            case TcpMod:recv(Socket, 0) of
  194
+            case TcpMod:recv(Socket, 0, RecvTimeout) of
173 195
                 {ok,[?PT_MSG_OLDSYNC|<<"sync">>]} ->
174 196
                     visit_item(K, V, {Socket, ParentPid, Module, TcpMod, 0, Total, ok});
175 197
                 {error, Reason} ->
@@ -223,3 +245,10 @@ get_handoff_ssl_options() ->
223 245
                     []
224 246
             end
225 247
     end.
  248
+
  249
+get_handoff_receive_timeout() ->
  250
+    app_helper:get_env(riak_core, handoff_timeout, ?TCP_TIMEOUT).
  251
+
  252
+end_fold_time(StartFoldTime) ->
  253
+    EndFoldTime = now(),
  254
+    timer:now_diff(EndFoldTime, StartFoldTime) / 1000000.
6  src/riak_core_stat.erl
@@ -42,6 +42,7 @@
42 42
           ignored_gossip_total   :: integer(),
43 43
           rings_reconciled_total :: integer(),
44 44
           rejected_handoffs      :: integer(),
  45
+          handoff_timeouts       :: integer(),
45 46
           gossip_received        :: spiraltime:spiral(),
46 47
           rings_reconciled       :: spiraltime:spiral(),
47 48
           converge_epoch         :: calendar:t_now(),
@@ -79,6 +80,7 @@ init([]) ->
79 80
     {ok, #state{ignored_gossip_total=0,
80 81
                 rings_reconciled_total=0,
81 82
                 rejected_handoffs=0,
  83
+                handoff_timeouts=0,
82 84
                 gossip_received=spiraltime:fresh(),
83 85
                 rings_reconciled=spiraltime:fresh(),
84 86
                 converge_delay=#cuml{},
@@ -137,6 +139,9 @@ update(rebalance_timer_end, _Moment, State=#state{rebalance_epoch=T0}) ->
137 139
 update(rejected_handoffs, _Moment, State) ->
138 140
     int_incr(#state.rejected_handoffs, State);
139 141
 
  142
+update(handoff_timeouts, _Moment, State) ->
  143
+    int_incr(#state.handoff_timeouts, State);
  144
+
140 145
 update(ignored_gossip, _Moment, State) ->
141 146
     int_incr(#state.ignored_gossip_total, State);
142 147
 
@@ -195,6 +200,7 @@ gossip_stats(Moment, State=#state{converge_delay=CDelay,
195 200
      {rings_reconciled_total, State#state.rings_reconciled_total},
196 201
      {rings_reconciled, spiral_minute(Moment, #state.rings_reconciled, State)},
197 202
      {gossip_received, spiral_minute(Moment, #state.gossip_received, State)},
  203
+     {handoff_timeouts, State#state.handoff_timeouts},
198 204
      {converge_delay_min,  CDelay#cuml.min},
199 205
      {converge_delay_max,  CDelay#cuml.max},
200 206
      {converge_delay_mean, CDelay#cuml.mean},
Commit_comment_tip

Tip: You can add notes to lines in a file. Hover to the left of a line to make a note

Something went wrong with that request. Please try again.