Permalink
Browse files

Code cleanup, EDoc docs, etc.

  • Loading branch information...
1 parent e09a2c5 commit 49dfb72925ce14d66eff72b4b911df8035c5c8bf @slfritchie slfritchie committed Feb 2, 2011
Showing with 129 additions and 19 deletions.
  1. +3 −0 Makefile
  2. +41 −0 doc/overview.edoc
  3. +5 −2 src/riak_sysmon_example_handler.erl
  4. +74 −15 src/riak_sysmon_filter.erl
  5. +6 −2 src/riak_sysmon_testhandler.erl
View
@@ -6,3 +6,6 @@ clean:
test: all
./rebar eunit
+
+doc: all
+ ./rebar doc
View
@@ -0,0 +1,41 @@
+@author Scott Lystig Fritchie <scott@basho.com>
+@copyright 2011 Basho Technologies, Inc.
+@title The `riak_sysmon' application
+
+@doc
+
+`riak_sysmon' is an Erlang/OTP application that manages the event
+messages that can be generated by the Erlang virtual machine's
+`system_monitor' BIF (Built-In Function). These messages can notify a
+central data-gathering process about the following events:
+
+<ul>
+<li> Processes that have their private heaps grow beyond a certain
+size. </li>
+<li> Processes whose private heap garbage collection ops take too long </li>
+<li> Ports that are busy, e.g., blocking file &amp; socket I/O </li>
+<li> Network distribution ports are busy, e.g., lots of communication
+ with a slow peer Erlang node. </li>
+</ul>
+
+The problem with `system_monitor' events is that there isn't a
+mechanism within the Erlang virtual machine that limits the rate at
+which the events are generated. A busy VM can easily create many
+hundreds of these messages per second. Some kind of rate-limiting
+filter is required to avoid further overloading a system that may
+already be overloaded.
+
+This app will use two processes for `system_monitor' message handling.
+
+<ol>
+<li> A `gen_server' process to provide a rate-limiting filter. </li>
+<li> A `gen_event' server to allow flexible, user-defined functions to
+respond to `system_monitor' events that pass through the first stage
+filter. </li>
+</ol>
+
+The Erlang/OTP documentation clearly states that only one
+process can receive `system_monitor' messages. But using the
+`riak_sysmon' OTP app, if multiple parties are interested in receiving
+`system_monitor' events, each party can add an event handler to the
+`riak_sysmon_handler' event handler.
@@ -14,6 +14,9 @@
%% specific language governing permissions and limitations
%% under the License.
+%% @doc A simple example for adding a custom event handler (this module)
+%% to the `riak_sysmon' application's `system_monitor' event manager.
+
-module(riak_sysmon_example_handler).
-behaviour(gen_event).
@@ -34,10 +37,10 @@
%%%===================================================================
add_handler() ->
- gen_event:add_handler(riak_sysmon_handler, ?MODULE, []).
+ riak_sysmon_filter:add_custom_handler(?MODULE, []).
get_call_count() ->
- gen_event:call(riak_sysmon_handler, ?MODULE, get_call_count, infinity).
+ riak_sysmon_filter:call_custom_handler(?MODULE, get_call_count, infinity).
%%%===================================================================
%%% gen_event callbacks
View
@@ -14,6 +14,11 @@
%% specific language governing permissions and limitations
%% under the License.
+%% @doc Filtering/rate-limiting mechanism for the Erlang virtual machine's
+%% `system_monitor' events.
+%%
+%% See the `README.md' file at the top of the source repository for details.
+
-module(riak_sysmon_filter).
-behaviour(gen_server).
@@ -23,7 +28,8 @@
-endif. % TEST
%% API
--export([start_link/0]).
+-export([start_link/0, start_link/1]).
+-export([add_custom_handler/2, call_custom_handler/2, call_custom_handler/3]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@@ -53,7 +59,48 @@
%% @end
%%--------------------------------------------------------------------
start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+ start_link([gc, heap, port, dist_port]).
+
+%% @doc Start riak_sysmon filter process
+%%
+%% The `MonitorProps' arg is a property list that may contain zero
+%% or more of the following atoms:
+%% <ul>
+%% <li> <b>gc</b> Enable long garbage collection events.
+%% The minimum time (in milliseconds) is defined by the application
+%% `riak_sysmon' environment variable `gc_ms_limit'.
+%% </li>
+%% <li> <b>heap</b> Enable process large heap events. </li>
+%% The minimum size (in machine words) is defined by the application
+%% `riak_sysmon' environment variable `process_heap_limit'.
+%% <li> <b>busy_port</b> Enable `busy_port' events. </li>
+%% <li> <b>busy_dist_port</b> Enable `busy_dist_port' events. </li>
+%% </ul>
+
+start_link(MonitorProps) ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, MonitorProps, []).
+
+%% @doc Add a custom handler module to the `riak_sysmon'.
+%%
+%% See the source code of the
+%% {@link riak_sysmon_example_handler:add_handler/0} function for
+%% a usage example.
+
+add_custom_handler(Module, Args) ->
+ gen_event:add_handler(riak_sysmon_handler, Module, Args).
+
+call_custom_handler(Module, Call) ->
+ call_custom_handler(Module, Call, infinity).
+
+%% @doc Make a synchronous call to a `riak_sysmon' specific custom
+%% event handler.
+%%
+%% See the source code of the
+%% {@link riak_sysmon_example_handler:get_call_count/0} function for
+%% a usage example.
+
+call_custom_handler(Module, Call, Timeout) ->
+ gen_event:call(riak_sysmon_handler, Module, Call, Timeout).
%%%===================================================================
%%% gen_server callbacks
@@ -70,13 +117,15 @@ start_link() ->
%% {stop, Reason}
%% @end
%%--------------------------------------------------------------------
-init([]) ->
+init(MonitorProps) ->
GcMsLimit = get_gc_ms_limit(),
HeapWordLimit = get_heap_word_limit(),
- erlang:system_monitor({self(), [{long_gc, GcMsLimit},
- {large_heap, HeapWordLimit},
- busy_port,
- busy_dist_port]}),
+ Opts = lists:flatten(
+ [[{long_gc, GcMsLimit} || lists:member(gc, MonitorProps)],
+ [{large_heap, HeapWordLimit} || lists:member(heap, MonitorProps)],
+ [busy_port || lists:member(busy_port, MonitorProps)],
+ [busy_dist_port || lists:member(busy_dist_port, MonitorProps)]]),
+ erlang:system_monitor(self(), Opts),
{ok, #state{proc_limit = get_proc_limit(),
port_limit = get_port_limit(),
tref = start_interval_timer()
@@ -142,12 +191,13 @@ handle_info({monitor, _, ProcType, _} = Info,
handle_info({monitor, _, PortType, _} = Info,
#state{port_count = Ports, port_limit = PortLimit} = State)
when PortType == busy_port; PortType == busy_dist_port ->
- if Ports + 1 =< PortLimit ->
+ NewPorts = Ports + 1,
+ if NewPorts =< PortLimit ->
gen_event:notify(riak_sysmon_handler, Info);
true ->
ok
end,
- {noreply, State#state{port_count = Ports + 1}};
+ {noreply, State#state{port_count = NewPorts}};
handle_info({monitor, _, _, _} = Info, #state{bogus_msg_p = false} = State) ->
error_logger:error_msg("Unknown monitor message: ~P\n", [Info, 20]),
{noreply, State#state{bogus_msg_p = true}};
@@ -169,7 +219,7 @@ handle_info(reset, #state{proc_count = Procs, proc_limit = ProcLimit,
{Pid, _} when Pid == self() ->
ok;
Res ->
- error_logger:error_msg("~s: current system monitor info is: ~P\n",
+ error_logger:error_msg("~s: current system monitor is: ~P\n",
[?MODULE, Res, 20])
end,
{noreply, State#state{proc_count = 0,
@@ -219,7 +269,9 @@ get_port_limit() ->
nonzero_app_env(riak_sysmon, port_limit, 30).
%% The default limits below here are more of educated guesses than
-%% based on hard experience.
+%% based on hard experience. Practical upper limits can vary quite a
+%% bit by application & workload and are usually found by
+%% experimentation.
get_gc_ms_limit() ->
nonzero_app_env(riak_sysmon, gc_ms_limit, 50).
@@ -247,9 +299,14 @@ start_timer() ->
gen_server:call(?MODULE, start_timer).
limit_test() ->
+ %% Constants ... limits should be at least one or test case will break
+
ProcLimit = 10,
PortLimit = 9,
EventHandler = riak_sysmon_handler,
+ TestHandler = riak_sysmon_testhandler,
+
+ %% Setup part 1: filter server
catch exit(whereis(?MODULE), kill),
timer:sleep(10),
@@ -261,18 +318,20 @@ limit_test() ->
{ok, _FilterPid} = ?MODULE:start_link(),
?MODULE:stop_timer(),
+ %% Setup part 2: gen_event server
+
catch exit(whereis(EventHandler), kill),
timer:sleep(10),
{ok, _HandlerPid} = gen_event:start_link({local, EventHandler}),
- ok = riak_sysmon_testhandler:add_handler(EventHandler),
+ ok = TestHandler:add_handler(EventHandler),
%% Check that all legit message types are passed through.
ProcTypes = [long_gc, large_heap, busy_port, busy_dist_port],
[?MODULE ! {monitor, yay_pid, ProcType, whatever} || ProcType <- ProcTypes],
?MODULE ! reset,
timer:sleep(100),
- Events1 = riak_sysmon_testhandler:get_events(EventHandler),
+ Events1 = TestHandler:get_events(EventHandler),
[true = lists:keymember(ProcType, 3, Events1) || ProcType <- ProcTypes],
%% Check that limits are enforced.
@@ -283,9 +342,9 @@ limit_test() ->
X <- lists:seq(1, (ProcLimit + PortLimit) * 5)],
?MODULE ! reset,
timer:sleep(100),
- Events2 = riak_sysmon_testhandler:get_events(EventHandler),
+ Events2 = TestHandler:get_events(EventHandler),
timer:sleep(50),
- [] = riak_sysmon_testhandler:get_events(EventHandler), % Got 'em all
+ [] = TestHandler:get_events(EventHandler), % Got 'em all
%% Sanity checks
ProcLimit = length([X || {monitor, _, long_gc, _} = X <- Events2]),
PortLimit = length([X || {monitor, _, busy_port, _} = X <- Events2]),
@@ -14,6 +14,8 @@
%% specific language governing permissions and limitations
%% under the License.
+%% @doc Support for EUnit testing: simple event-collecting event handler.
+
-module(riak_sysmon_testhandler).
-ifdef(TEST).
@@ -27,7 +29,9 @@
-export([init/1, handle_event/2, handle_call/2,
handle_info/2, terminate/2, code_change/3]).
--record(state, {list = []}).
+-record(state, {
+ list = [] :: list()
+ }).
%%%===================================================================
%%% gen_event callbacks
@@ -47,7 +51,7 @@ start_link() ->
%% @doc
%% Adds an event handler
%%
-%% @spec add_handler() -> ok | {'EXIT', Reason} | term()
+%% @spec add_handler(gen_event:emgr_name()) -> ok | {'EXIT', Reason} | term()
%% @end
%%--------------------------------------------------------------------
add_handler(EventServer) ->

0 comments on commit 49dfb72

Please sign in to comment.