Permalink
Browse files

Merge pull request #821 from basho/basho/bugfix/riak-2383-alt

Fix 2 for issue RIAK-2383

Reviewed-by: JeetKunDoug
  • Loading branch information...
borshop committed Mar 4, 2016
2 parents 7787ee3 + 3646166 commit 24012f4bbceb26ed10bd31cf8864dab82c89e47a
Showing with 87 additions and 59 deletions.
  1. +87 −59 src/riak_core_tcp_mon.erl
@@ -2,7 +2,7 @@
%%
%% TCP Connection Monitor
%%
%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved.
%% Copyright (c) 2013-2016 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
@@ -55,10 +55,8 @@
-record(state, {conns = gb_trees:empty(), % conn records keyed by Socket
tags = gb_trees:empty(), % tags to ports
interval = ?DEFAULT_INTERVAL, % how often to get stats
limit = ?DEFAULT_LIMIT, %
limit = ?DEFAULT_LIMIT, % how many entries to keep per stat
clear_after = ?DEFAULT_CLEAR, % how long to leave errored sockets in status
stats = ?INET_STATS, % Stats to read
opts = ?INET_OPTS, % Opts to read
status_funs = dict:from_list(default_status_funs()) % Status reporting functions
}).
@@ -139,7 +137,7 @@ raw(_TS, Hist) ->
diff(TS, Hist) ->
RevTS = lists:reverse(TS),
RevHist = lists:reverse(Hist),
diff(RevTS, RevHist, []).
diff(RevTS, RevHist, []).
diff([_TS], [_C], Acc) ->
Acc;
@@ -167,15 +165,24 @@ rate([TS1 | TSRest], [C1 | CRest], Acc) ->
init(Props) ->
lager:info("Starting TCP Monitor"),
ok = net_kernel:monitor_nodes(true, [{node_type, visible}, nodedown_reason]),
State0 = #state{interval = proplists:get_value(interval, Props, ?DEFAULT_INTERVAL),
limit = proplists:get_value(limit, Props, ?DEFAULT_LIMIT),
clear_after = proplists:get_value(clear_after, Props, ?DEFAULT_LIMIT)},
% blow up here if any state fields would not be sane so subsequent code
% can assume they're positive integers
State0 = #state{
interval = pos_int_value(
proplists:get_value(interval, Props, ?DEFAULT_INTERVAL)),
limit = pos_int_value(
proplists:get_value(limit, Props, ?DEFAULT_LIMIT)),
clear_after = pos_int_value(
proplists:get_value(clear_after, Props, ?DEFAULT_CLEAR))},
DistCtrl = erlang:system_info(dist_ctrl),
State = lists:foldl(fun({Node,Port}, DatState) ->
add_dist_conn(Node, Port, DatState)
end, State0, DistCtrl),
{ok, schedule_tick(State)}.
pos_int_value(Val) when erlang:is_integer(Val) andalso Val > 0 ->
Val.
handle_call(status, _From, State = #state{conns = Conns,
status_funs = StatusFuns}) ->
Out = [ [{socket,P} | conn_status(P, Conn, StatusFuns)]
@@ -224,52 +231,65 @@ handle_info({nodedown, Node, _InfoList}, State) ->
end,
{noreply, State#state{conns = Conns2}};
handle_info(measurement_tick, State0 = #state{limit = Limit, stats = Stats,
opts = Opts, conns = Conns}) ->
handle_info(measurement_tick, State0) ->
State = schedule_tick(State0),
Fun = fun(Socket, Conn = #conn{type = Type, ts_hist = TSHist, hist = Hist}) when Type /= error ->
try
{ok, StatVals} = inet:getstat(unwrap_socket(Socket), Stats),
TS = os:timestamp(), % read between the two split the difference
{ok, OptVals} = safe_getopts(Socket, Opts),
Hist2 = update_hist(OptVals, Limit,
update_hist(StatVals, Limit, Hist)),
Conn#conn{ts_hist = prepend_trunc(TS, TSHist, Limit),
hist = Hist2}
catch
_E:_R ->
%io:format("Error ~p: ~p\n", [_E, _R]),
%% Any problems with getstat/getopts mark in error
erlang:send_after(State#state.clear_after,
self(),
{clear, Socket}),
Conn#conn{type = error}
end;
(_Socket, Conn) ->
Conn
end,
{noreply, State#state{conns = gb_trees:map(Fun, Conns)}};
Limit = State#state.limit,
ClearAfter = State#state.clear_after,
UpdateConn = fun(Socket, Conn) ->
update_conn(Socket, Conn, Limit, ClearAfter)
end,
{noreply, State#state{conns = gb_trees:map(UpdateConn, State#state.conns)}};
handle_info({clear, Socket}, State = #state{conns = Conns}) ->
{noreply, State#state{conns = gb_trees:delete_any(Socket, Conns)}}.
unwrap_socket({sslsocket, Socket, _}) ->
unwrap_socket(Socket);
unwrap_socket({gen_tcp, Socket, _}) ->
Socket;
unwrap_socket({gen_tcp, Socket}) ->
Socket;
update_conn(_Socket, Conn = #conn{type = error}, _Limit, _ClearAfter) ->
Conn;
update_conn(Socket, Conn = #conn{ts_hist = TSHist, hist = Hist},
Limit, ClearAfter) ->
try
RawSock = unwrap_socket(Socket),
{ok, StatVals} = inet:getstat(RawSock, ?INET_STATS),
% get the timestamp between stats and opts, split the difference
TS = os:timestamp(),
{ok, OptVals} = inet:getopts(RawSock, ?INET_OPTS),
NewHist = update_hist(OptVals, Limit, update_hist(StatVals, Limit, Hist)),
Conn#conn{ts_hist = prepend_trunc(TS, TSHist, Limit), hist = NewHist}
catch
_E:_R ->
% io:format("Error ~p: ~p\n", [_E, _R]),
%% Any problems with getstat/getopts mark in error
erlang:send_after(ClearAfter, self(), {clear, Socket}),
Conn#conn{type = error}
end.
%%
%% The #sslsocket record type definition is invisible outside the ssl
%% application code (and barely defined inside it), so validate it roughly
%% the way erlang:is_record/2 would, by checking its type, size, and first
%% element.
%% The second element's first element is actually a transport module, but
%% gen_tcp gets special handling, and is the only type we support anyway.
%% Anything else will throw a badarg error, causing the connection to be
%% flagged in error.
%%
unwrap_socket(Socket) when erlang:is_tuple(Socket)
andalso erlang:tuple_size(Socket) > 1
andalso erlang:element(1, Socket) == sslsocket ->
TcpSock = erlang:element(2, Socket),
if erlang:is_tuple(TcpSock)
andalso erlang:tuple_size(TcpSock) > 1
andalso erlang:element(1, TcpSock) == gen_tcp ->
erlang:element(2, TcpSock);
true ->
erlang:error(badarg, [Socket])
end;
%%
%% Anything else gets passed through as-is, let the consumer(s) handle it.
%%
unwrap_socket(Socket) ->
Socket.
safe_getopts({sslsocket, _, _} = Socket, Opts) ->
ssl:getopts(Socket, Opts);
safe_getopts(Socket, Opts) ->
inet:getopts(Socket, Opts).
terminate(_Reason, _State) ->
lager:info("Shutting down TCP Monitor"),
%% TODO: Consider trying to do something graceful with poolboy?
@@ -288,22 +308,30 @@ add_dist_conn(Node, Port, State) ->
add_conn(Socket, Conn, State = #state{conns = Conns}) ->
State#state{conns = gb_trees:enter(Socket, Conn, Conns)}.
-spec update_hist([{atom(), term()}], pos_integer(), list()) -> list().
%% Update the histogram with the list of name/values
update_hist(Readings, Limit, Histories) ->
%% For all the readings of {Stat, Val} pairs
lists:foldl(
%% Prepend newest reading and truncate
fun ({Stat, Val}, Histories0) ->
orddict:update(Stat,
fun(Hist) ->
prepend_trunc(Val, Hist, Limit)
end,
[Val],
Histories0)
end, Histories, Readings).
{Limit, NewHists} = lists:foldl(
fun update_hist/2, {Limit, Histories}, Readings),
NewHists.
-spec update_hist({atom(), term()}, {pos_integer(), list()}) -> {pos_integer(), list()}.
%% Update the a name/value entry in a histogram, to be called from lists:fold
update_hist({Stat, Val}, {Limit, Histories}) ->
%% Prepend newest reading and truncate
NewHists = orddict:update(Stat,
fun(StatHistory) ->
prepend_trunc(Val, StatHistory, Limit)
end,
[Val], Histories),
{Limit, NewHists}.
-spec prepend_trunc(term(), list(), pos_integer()) -> list().
%% Limit will normally be > 2, so don't bother specializing for lower values
prepend_trunc(Val, List, Limit) ->
lists:sublist([Val | List], Limit).
[Val | lists:sublist(List, (Limit - 1))].
conn_status(Socket, #conn{tag = Tag, type = Type,
ts_hist = TsHist, hist = Histories,
@@ -338,9 +366,9 @@ format_socket_stats([], Buf) -> lists:reverse(Buf);
%format_socket_stats(T, [{tag, V} | Buf]);
format_socket_stats([{K,_V}|T], Buf) when
K == tag;
K == sndbuf;
K == sndbuf;
K == recbuf;
K == buffer;
K == buffer;
K == active;
K == type;
K == send_max;

0 comments on commit 24012f4

Please sign in to comment.