Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Handle node up/down in tcp_mon #322

Merged
merged 1 commit into from

2 participants

@metadave

manual application of https://github.com/basho/riak_repl/pull/248.patch + additional testing fixes

@buddhisthead buddhisthead was assigned
@buddhisthead

+1 looks like a clean application of the patch and I see the fixes for the unit test from our past commit. So, I'm happy.

@metadave metadave merged commit f84c707 into master

1 check passed

Details default The Travis CI build passed
@seancribbs seancribbs deleted the dip-tcpmon-nodedown branch
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on May 20, 2013
  1. Handle node down/node up correctly.

    Dave Parfitt authored
This page is out of date. Refresh to see the latest.
Showing with 94 additions and 10 deletions.
  1. +94 −10 src/riak_core_tcp_mon.erl
View
104 src/riak_core_tcp_mon.erl
@@ -23,6 +23,10 @@
-module(riak_core_tcp_mon).
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+-endif.
+
-export([start_link/0, start_link/1, monitor/3, status/0, status/1, format/0, format/2]).
-export([default_status_funs/0, raw/2, diff/2, rate/2, kbps/2,
socket_status/1, format_socket_stats/2 ]).
@@ -95,20 +99,27 @@ format(Status, Stat) ->
format_header(Stat) ->
io_lib:format("~40w Value\n", [Stat]).
-format_entry({_Socket, Status}, Stat) ->
+format_entry(Status, Stat) ->
Tag = proplists:get_value(tag, Status),
Value = proplists:get_value(Stat, Status),
case Value of
Value when is_list(Value) ->
- [io_lib:format("~40s [", [Tag]),
- format_list(Value),
- "]\n"];
+ [format_tag(Tag),
+ " ",
+ format_list(Value),
+ "\n"];
_ ->
- [io_lib:format("~40s", [Tag]),
+ [format_tag(Tag),
+ " [",
format_value(Value),
"\n"]
end.
+format_tag(Tag) when is_list(Tag) ->
+ io_lib:format("~40s", [Tag]);
+format_tag(Tag) ->
+ io_lib:format("~40w", [Tag]).
+
format_value(Val) when is_float(Val) ->
io_lib:format("~7.1f", [Val]);
format_value(Val) ->
@@ -161,7 +172,8 @@ init(Props) ->
clear_after = proplists:get_value(clear_after, Props, ?DEFAULT_LIMIT)},
DistCtrl = erlang:system_info(dist_ctrl),
State = lists:foldl(fun({Node,Port}, DatState) ->
- add_dist_conn(Node, Port, DatState)
+ {noreply, add_dist_conn(Node, Port, DatState)}
+
end, State0, DistCtrl),
{ok, schedule_tick(State)}.
@@ -198,9 +210,21 @@ handle_info({nodeup, Node, _InfoList}, State) ->
{noreply, add_dist_conn(Port, Node, State)}
end;
+handle_info({nodedown, Node, _InfoList}, State) ->
+ GbList = gb_trees:to_list(State#state.conns),
+ MaybePortConn = [{P, C} ||
+ {P, #conn{type = dist, tag = {node, MaybeNode}} = C} <- GbList,
+ MaybeNode =:= Node],
+ Conns2 = case MaybePortConn of
+ [{Port, Conn} | _] ->
+ erlang:send_after(State#state.clear_after, self(), {clear, Port}),
+ Conn2 = Conn#conn{type = error},
+ gb_trees:update(Port, Conn2, State#state.conns);
+ _ ->
+ State#state.conns
+ end,
+ {noreply, State#state{conns = Conns2}};
-handle_info({nodedown, _Node, _InfoList}, _State) ->
- {noreply, #state{}};
handle_info(measurement_tick, State = #state{limit = Limit, stats = Stats,
opts = Opts, conns = Conns}) ->
schedule_tick(State),
@@ -215,7 +239,7 @@ handle_info(measurement_tick, State = #state{limit = Limit, stats = Stats,
hist = Hist2}
catch
_E:_R ->
- %io:format("Error ~p: ~p\n", [E, R]),
+ %io:format("Error ~p: ~p\n", [_E, _R]),
%% Any problems with getstat/getopts mark in error
erlang:send_after(State#state.clear_after,
self(),
@@ -239,7 +263,9 @@ code_change(_OldVsn, State, _Extra) ->
%% Add a distributed connection to the state
add_dist_conn(Node, Port, State) ->
- add_conn(Port, #conn{tag = {node, Node}, type = dist}, State).
+ add_conn(Port, #conn{tag = {node, Node},
+ type = dist,
+ transport = ranch_tcp}, State).
%% Add connection to the state
add_conn(Socket, Conn, State = #state{conns = Conns}) ->
@@ -317,3 +343,61 @@ format_socket_stats([{K,V}|T], Buf) when
format_socket_stats([{K,V}|T], Buf) ->
format_socket_stats(T, [{K, V} | Buf]).
+-ifdef(TEST).
+updown() ->
+ %% Set the stat gathering interval to 100ms
+ {ok, TCPMonPid} = riak_core_tcp_mon:start_link([{interval, 100}]),
+ {ok, LS} = gen_tcp:listen(0, [{active, true}, binary]),
+ {ok, Port} = inet:port(LS),
+ Pid = self(),
+ spawn(
+ fun () ->
+ %% server
+ {ok, S} = gen_tcp:accept(LS),
+ riak_core_tcp_mon:monitor(S, "test", gen_tcp),
+ timer:sleep(1000),
+ receive
+ {tcp, S, _Data} ->
+ %% only receive one packet, let the others build
+ %% up
+ ok;
+ _ ->
+ ?assert(fail)
+ after
+ 1000 ->
+ ?assert(fail)
+ end,
+ _Stat1 = riak_core_tcp_mon:status(),
+ MPid = whereis(riak_core_tcp_mon),
+ MPid ! {nodedown, 'foo', []},
+ Stat2 = riak_core_tcp_mon:status(),
+ MPid ! {nodeup, 'foo', []},
+ Stat3 = riak_core_tcp_mon:status(),
+ ?assert(proplists:is_defined(socket,hd(Stat2))),
+ ?assert(proplists:is_defined(socket,hd(Stat3))),
+ gen_tcp:close(S),
+ Pid ! finished
+ end),
+ %% client
+ {ok, Socket} = gen_tcp:connect("localhost",Port,
+ [binary, {active, true}]),
+ lists:foreach(
+ fun (_) ->
+ gen_tcp:send(Socket, "TEST")
+ end,
+ lists:seq(1,10000)),
+ receive
+ finished -> ok;
+ {'EXIT', _, normal} -> ok;
+ X -> io:format(user, "Unexpected message received ~p~n", [X]),
+ ?assert(fail)
+ end,
+ gen_tcp:close(Socket),
+ unlink(TCPMonPid),
+ exit(TCPMonPid, kill),
+ ok.
+
+nodeupdown_test_() ->
+ {timeout, 60, fun updown/0}.
+
+-endif.
Something went wrong with that request. Please try again.