Skip to content

Commit

Permalink
Handle node down/node up correctly.
Browse files Browse the repository at this point in the history
  • Loading branch information
Dave Parfitt committed May 20, 2013
1 parent 2ed40cd commit 135d11a
Showing 1 changed file with 94 additions and 10 deletions.
104 changes: 94 additions & 10 deletions src/riak_core_tcp_mon.erl
Expand Up @@ -23,6 +23,10 @@

-module(riak_core_tcp_mon).

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.

-export([start_link/0, start_link/1, monitor/3, status/0, status/1, format/0, format/2]).
-export([default_status_funs/0, raw/2, diff/2, rate/2, kbps/2,
socket_status/1, format_socket_stats/2 ]).
Expand Down Expand Up @@ -95,20 +99,27 @@ format(Status, Stat) ->
format_header(Stat) ->
io_lib:format("~40w Value\n", [Stat]).

format_entry({_Socket, Status}, Stat) ->
format_entry(Status, Stat) ->
Tag = proplists:get_value(tag, Status),
Value = proplists:get_value(Stat, Status),
case Value of
Value when is_list(Value) ->
[io_lib:format("~40s [", [Tag]),
format_list(Value),
"]\n"];
[format_tag(Tag),
" ",
format_list(Value),
"\n"];
_ ->
[io_lib:format("~40s", [Tag]),
[format_tag(Tag),
" [",
format_value(Value),
"\n"]
end.

format_tag(Tag) when is_list(Tag) ->
io_lib:format("~40s", [Tag]);
format_tag(Tag) ->
io_lib:format("~40w", [Tag]).

format_value(Val) when is_float(Val) ->
io_lib:format("~7.1f", [Val]);
format_value(Val) ->
Expand Down Expand Up @@ -161,7 +172,8 @@ init(Props) ->
clear_after = proplists:get_value(clear_after, Props, ?DEFAULT_LIMIT)},
DistCtrl = erlang:system_info(dist_ctrl),
State = lists:foldl(fun({Node,Port}, DatState) ->
add_dist_conn(Node, Port, DatState)
{noreply, add_dist_conn(Node, Port, DatState)}

end, State0, DistCtrl),
{ok, schedule_tick(State)}.

Expand Down Expand Up @@ -198,9 +210,21 @@ handle_info({nodeup, Node, _InfoList}, State) ->
{noreply, add_dist_conn(Port, Node, State)}
end;

handle_info({nodedown, Node, _InfoList}, State) ->
GbList = gb_trees:to_list(State#state.conns),
MaybePortConn = [{P, C} ||
{P, #conn{type = dist, tag = {node, MaybeNode}} = C} <- GbList,
MaybeNode =:= Node],
Conns2 = case MaybePortConn of
[{Port, Conn} | _] ->
erlang:send_after(State#state.clear_after, self(), {clear, Port}),
Conn2 = Conn#conn{type = error},
gb_trees:update(Port, Conn2, State#state.conns);
_ ->
State#state.conns
end,
{noreply, State#state{conns = Conns2}};

handle_info({nodedown, _Node, _InfoList}, _State) ->
{noreply, #state{}};
handle_info(measurement_tick, State = #state{limit = Limit, stats = Stats,
opts = Opts, conns = Conns}) ->
schedule_tick(State),
Expand All @@ -215,7 +239,7 @@ handle_info(measurement_tick, State = #state{limit = Limit, stats = Stats,
hist = Hist2}
catch
_E:_R ->
%io:format("Error ~p: ~p\n", [E, R]),
%io:format("Error ~p: ~p\n", [_E, _R]),
%% Any problems with getstat/getopts mark in error
erlang:send_after(State#state.clear_after,
self(),
Expand All @@ -239,7 +263,9 @@ code_change(_OldVsn, State, _Extra) ->

%% Add a distributed connection to the state
add_dist_conn(Node, Port, State) ->
add_conn(Port, #conn{tag = {node, Node}, type = dist}, State).
add_conn(Port, #conn{tag = {node, Node},
type = dist,
transport = ranch_tcp}, State).

%% Add connection to the state
add_conn(Socket, Conn, State = #state{conns = Conns}) ->
Expand Down Expand Up @@ -317,3 +343,61 @@ format_socket_stats([{K,V}|T], Buf) when
format_socket_stats([{K,V}|T], Buf) ->
format_socket_stats(T, [{K, V} | Buf]).

-ifdef(TEST).
updown() ->
%% Set the stat gathering interval to 100ms
{ok, TCPMonPid} = riak_core_tcp_mon:start_link([{interval, 100}]),
{ok, LS} = gen_tcp:listen(0, [{active, true}, binary]),
{ok, Port} = inet:port(LS),
Pid = self(),
spawn(
fun () ->
%% server
{ok, S} = gen_tcp:accept(LS),
riak_core_tcp_mon:monitor(S, "test", gen_tcp),
timer:sleep(1000),
receive
{tcp, S, _Data} ->
%% only receive one packet, let the others build
%% up
ok;
_ ->
?assert(fail)
after
1000 ->
?assert(fail)
end,
_Stat1 = riak_core_tcp_mon:status(),
MPid = whereis(riak_core_tcp_mon),
MPid ! {nodedown, 'foo', []},
Stat2 = riak_core_tcp_mon:status(),
MPid ! {nodeup, 'foo', []},
Stat3 = riak_core_tcp_mon:status(),
?assert(proplists:is_defined(socket,hd(Stat2))),
?assert(proplists:is_defined(socket,hd(Stat3))),
gen_tcp:close(S),
Pid ! finished
end),
%% client
{ok, Socket} = gen_tcp:connect("localhost",Port,
[binary, {active, true}]),
lists:foreach(
fun (_) ->
gen_tcp:send(Socket, "TEST")
end,
lists:seq(1,10000)),
receive
finished -> ok;
{'EXIT', _, normal} -> ok;
X -> io:format(user, "Unexpected message received ~p~n", [X]),
?assert(fail)
end,
gen_tcp:close(Socket),
unlink(TCPMonPid),
exit(TCPMonPid, kill),
ok.

nodeupdown_test_() ->
{timeout, 60, fun updown/0}.

-endif.

1 comment on commit 135d11a

@buddhisthead
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 looks like a clean application of the patch and I see the fixes for the unit test from our past commit. So, I'm happy.

Please sign in to comment.