Skip to content

Commit

Permalink
Merge branch 'az1018-riak_sysmon-busy_dist_port'
Browse files Browse the repository at this point in the history
  • Loading branch information
slfritchie committed Dec 20, 2011
2 parents dda41e3 + 33c5913 commit e278c18
Showing 1 changed file with 75 additions and 13 deletions.
88 changes: 75 additions & 13 deletions src/riak_sysmon_filter.erl
Expand Up @@ -43,6 +43,8 @@
proc_limit = 10 :: integer(),
port_count = 0 :: integer(),
port_limit = 10 :: integer(),
port_list :: gb_tree(),
node_map :: list(),
tref :: timer:tref() | undefined,
bogus_msg_p = false :: boolean()
}).
Expand Down Expand Up @@ -120,14 +122,22 @@ call_custom_handler(Module, Call, Timeout) ->
init(MonitorProps) ->
GcMsLimit = get_gc_ms_limit(),
HeapWordLimit = get_heap_word_limit(),
BusyPortP = get_busy_port(),
BusyDistPortP = get_busy_dist_port(),
Opts = lists:flatten(
[[{long_gc, GcMsLimit} || lists:member(gc, MonitorProps)],
[{large_heap, HeapWordLimit} || lists:member(heap, MonitorProps)],
[busy_port || lists:member(busy_port, MonitorProps)],
[busy_dist_port || lists:member(busy_dist_port, MonitorProps)]]),
[[{long_gc, GcMsLimit} || lists:member(gc, MonitorProps)
andalso GcMsLimit > 0],
[{large_heap, HeapWordLimit} || lists:member(heap, MonitorProps)
andalso HeapWordLimit > 0],
[busy_port || lists:member(port, MonitorProps)
andalso BusyPortP],
[busy_dist_port || lists:member(dist_port, MonitorProps)
andalso BusyDistPortP]]),
_ = erlang:system_monitor(self(), Opts),
{ok, #state{proc_limit = get_proc_limit(),
port_limit = get_port_limit(),
port_list = gb_trees:empty(),
node_map = get_node_map(),
tref = start_interval_timer()
}}.

Expand Down Expand Up @@ -188,16 +198,27 @@ handle_info({monitor, _, ProcType, _} = Info,
ok
end,
{noreply, State#state{proc_count = NewProcs}};
handle_info({monitor, _, PortType, _} = Info,
#state{port_count = Ports, port_limit = PortLimit} = State)
handle_info({monitor, X, PortType, Port},
#state{port_count = Ports, port_limit = PortLimit,
port_list = PortList} = State)
when PortType == busy_port; PortType == busy_dist_port ->
NewPorts = Ports + 1,
if NewPorts =< PortLimit ->
gen_event:notify(riak_sysmon_handler, Info);
true ->
ok
end,
{noreply, State#state{port_count = NewPorts}};
case gb_trees:lookup(Port, PortList) of
{value, _} ->
{noreply, State#state{port_count = NewPorts}};
none ->
PortListLen = gb_trees:size(PortList),
if PortListLen < PortLimit ->
PortAndMore = annotate_dist_port(PortType, Port, State),
gen_event:notify(riak_sysmon_handler,
{monitor, X, PortType, PortAndMore});
true ->
ok
end,
NewPortList = gb_trees:enter(Port, Port, PortList),
{noreply, State#state{port_count = NewPorts,
port_list = NewPortList}}
end;
handle_info({monitor, _, _, _} = Info, #state{bogus_msg_p = false} = State) ->
error_logger:error_msg("Unknown monitor message: ~P\n", [Info, 20]),
{noreply, State#state{bogus_msg_p = true}};
Expand All @@ -223,7 +244,9 @@ handle_info(reset, #state{proc_count = Procs, proc_limit = ProcLimit,
[?MODULE, Res, 20])
end,
{noreply, State#state{proc_count = 0,
port_count = 0}};
port_count = 0,
port_list = gb_trees:empty(),
node_map = get_node_map()}};
handle_info(_Info, State) ->
{noreply, State}.

Expand Down Expand Up @@ -280,16 +303,55 @@ get_heap_word_limit() ->
%% 10 Mwords = 40MB on a 32-bit VM, 80MB on a 64-bit VM
nonzero_app_env(riak_sysmon, heap_word_limit, 10*1024*1024).

get_busy_port() ->
boolean_app_env(riak_sysmon, busy_port, true).

get_busy_dist_port() ->
boolean_app_env(riak_sysmon, busy_dist_port, true).

nonzero_app_env(App, Key, Default) ->
case application:get_env(App, Key) of
{ok, N} when N >= 0 -> N;
_ -> Default
end.

boolean_app_env(App, Key, Default) ->
case application:get_env(App, Key) of
{ok, B} when B == true; B == false -> B;
_ -> Default
end.

start_interval_timer() ->
{ok, TRef} = timer:send_interval(1000, reset),
TRef.

annotate_dist_port(busy_port, Port, _) ->
Port;
annotate_dist_port(busy_dist_port, Port, S) ->
try
%% Need 'try': may race with disconnecting TCP peer
{ok, Peer} = inet:peername(Port),
{Port, proplists:get_value(Peer, S#state.node_map, unknown)}
catch
_X:_Y ->
Port
end.

get_node_map() ->
{ok, NI} = net_kernel:nodes_info(),
lists:map(fun({Node, Props}) ->
%% Drat, #net_address is a private record in
%% kernel/src/net_address.hrl, but it's exposed via
%% net_kernel:nodes_info/0.
AddrRec = proplists:get_value(address, Props),
case element(2, AddrRec) of
X = {IP, Port} when is_tuple(IP), is_integer(Port) ->
{X, Node};
_X ->
{unknown, Node}
end
end, NI).

-ifdef(TEST).

stop_timer() ->
Expand Down

0 comments on commit e278c18

Please sign in to comment.