Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Initial commit

  • Loading branch information...
commit 733f78f37568c13fbc62de5e8ae358362793fbd3 0 parents
@RJ RJ authored
5 .gitignore
@@ -0,0 +1,5 @@
+ebin
+.*
+*.beam
+*~
+erl_crash.dump
23 LICENSE.txt
@@ -0,0 +1,23 @@
+ Copyright (c) 2011 Richard Jones
+
+ Permission is hereby granted, free of charge, to any person
+ obtaining a copy of this software and associated documentation
+ files (the "Software"), to deal in the Software without
+ restriction, including without limitation the rights to use,
+ copy, modify, merge, publish, distribute, sublicense, and/or sell
+ copies of the Software, and to permit persons to whom the
+ Software is furnished to do so, subject to the following
+ conditions:
+
+ The above copyright notice and this permission notice shall be
+ included in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ OTHER DEALINGS IN THE SOFTWARE.
+
69 README.txt
@@ -0,0 +1,69 @@
+estatsd is a simple stats aggregation service that periodically dumps data to
+Graphite: http://graphite.wikidot.com/
+
+NB: Graphite is good, despite the website being a bit ghetto.
+
+Inspired heavily by etsy statsd:
+http://codeascraft.etsy.com/2011/02/15/measure-anything-measure-everything/
+
+QUICK DEMO
+==========
+
+1) Install and configure graphite (quick-ish)
+2) Install rebar, have it in your path
+3) rebar compile
+4) erl -pa ebin
+5) > application:start(estatsd).
+ > estatsd:increment(foo, 123).
+6) Observe graphite now has 1 data point.
+
+USAGE
+=====
+
+Add this app to your rebar deps, and make sure it's started somehow
+eg: application:start(estatsd).
+
+You can configure custom graphite host/port and flush interval using
+application environment vars. See estatsd_sup for details.
+
+The following calls to estatsd are all gen_server:cast, ie non-blocking.
+
+Counters
+--------
+
+ estatsd:increment(num_foos). %% increment num_foos by one
+
+ estatsd:decrement(<<"num_bars">>, 3). %% increment num_bars by 3
+
+ estatsd:increment("tcp.bytes_in", 512). %% increment tcp.bytes_in by 512
+
+Timers
+------
+
+ estatsd:timing(sometask, 1534). %% report that sometask took 1534ms
+
+Or for your convenience:
+
+ Start = erlang:now(),
+ do_sometask(),
+ estatsd:timing(sometast, Start). %% uses now() and now_diff for you
+
+Sampling
+--------
+
+Only report 10% of some_frequent_task measurements:
+
+ estatsd:timing(some_frequent_task, 12, 0.1)
+
+
+
+NOTES
+=====
+
+This could be extended to take a callback for reporting mechanisms.
+Right now it's hardcoded to stick data into graphite.
+
+
+
+Richard Jones <rj@metabrew.com>
+@metabrew
12 src/estatsd.app.src
@@ -0,0 +1,12 @@
+{application, estatsd,
+ [
+ {description, "Stats aggregation service that writes to graphite"},
+ {vsn, "1.0"},
+ {registered, []},
+ {applications, [
+ kernel,
+ stdlib
+ ]},
+ {mod, { estatsd_app, []}},
+ {env, []}
+ ]}.
31 src/estatsd.erl
@@ -0,0 +1,31 @@
+-module(estatsd).
+
+-export([
+ increment/1, increment/2, increment/3,
+ decrement/1, decrement/2, decrement/3,
+ timing/2
+ ]).
+
+-define(SERVER, estatsd_server).
+
+% Log timing information, ms
+timing(Key, Duration) when is_integer(Duration) ->
+ gen_server:cast(?SERVER, {timing, Key, Duration});
+
+% Convenience: just give it the now() tuple when the work started
+timing(Key, StartTime = {_,_,_}) ->
+ Dur = erlang:round(timer:now_diff(erlang:now(), StartTime)/1000),
+ timing(Key,Dur).
+
+
+% Increments one or more stats counters
+increment(Key) -> increment(Key, 1, 1).
+increment(Key, Amount) -> increment(Key, Amount, 1).
+increment(Key, Amount, Sample) ->
+ gen_server:cast(?SERVER, {increment, Key, Amount, Sample}).
+
+decrement(Key) -> decrement(Key, -1, 1).
+decrement(Key, Amount) -> decrement(Key, Amount, 1).
+decrement(Key, Amount, Sample) ->
+ increment(Key, 0 - Amount, Sample).
+
16 src/estatsd_app.erl
@@ -0,0 +1,16 @@
+-module(estatsd_app).
+
+-behaviour(application).
+
+%% Application callbacks
+-export([start/2, stop/1]).
+
+%% ===================================================================
+%% Application callbacks
+%% ===================================================================
+
+start(_StartType, _StartArgs) ->
+ estatsd_sup:start_link().
+
+stop(_State) ->
+ ok.
184 src/estatsd_server.erl
@@ -0,0 +1,184 @@
+%% Stats aggregation process that periodically dumps data to graphite
+%% Will calculate 90th percentile etc.
+%% Inspired by etsy statsd:
+%% http://codeascraft.etsy.com/2011/02/15/measure-anything-measure-everything/
+%%
+%% This could be extended to take a callback for reporting mechanisms.
+%% Right now it's hardcoded to stick data into graphite.
+%%
+%% Richard Jones <rj@metabrew.com>
+%%
+-module(estatsd_server).
+-behaviour(gen_server).
+
+-export([start_link/3]).
+
+%-export([key2str/1,flush/0]). %% export for debugging
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-record(state, {timers, % gb_tree of timer data
+ flush_interval, % ms interval between stats flushing
+ flush_timer, % TRef of interval timer
+ graphite_host, % graphite server host
+ graphite_port % graphite server port
+ }).
+
+start_link(FlushIntervalMs, GraphiteHost, GraphitePort) ->
+ gen_server:start_link({local, ?MODULE},
+ ?MODULE,
+ [FlushIntervalMs, GraphiteHost, GraphitePort],
+ []).
+
+%%
+
+init([FlushIntervalMs, GraphiteHost, GraphitePort]) ->
+ error_logger:info_msg("estatsd will flush stats to ~p:~w every ~wms\n",
+ [ GraphiteHost, GraphitePort, FlushIntervalMs ]),
+ ets:new(statsd, [named_table, set]),
+ %% Flush out stats to graphite periodically
+ {ok, Tref} = timer:apply_interval(FlushIntervalMs, gen_server, cast,
+ [?MODULE, flush]),
+ State = #state{ timers = gb_trees:empty(),
+ flush_interval = FlushIntervalMs,
+ flush_timer = Tref,
+ graphite_host = GraphiteHost,
+ graphite_port = GraphitePort
+ },
+ {ok, State}.
+
+handle_cast({increment, Key, Delta0, Sample}, State) when Sample >= 0, Sample =< 1 ->
+ Delta = Delta0 * ( 1 / Sample ), %% account for sample rates < 1.0
+ case ets:lookup(statsd, Key) of
+ [] ->
+ ets:insert(statsd, {Key, {Delta,1}});
+ [{Key,{Tot,Times}}] ->
+ ets:insert(statsd, {Key,{Tot+Delta, Times+1}}),
+ ok
+ end,
+ {noreply, State};
+
+handle_cast({timing, Key, Duration}, State) ->
+ case gb_trees:lookup(Key, State#state.timers) of
+ none ->
+ {noreply, State#state{timers = gb_trees:insert(Key, [Duration], State#state.timers)}};
+ {value, Val} ->
+ {noreply, State#state{timers = gb_trees:update(Key, [Duration|Val], State#state.timers)}}
+ end;
+
+handle_cast(flush, State) ->
+ All = ets:tab2list(statsd),
+ spawn( fun() -> do_report(All, State) end ),
+ %% WIPE ALL
+ ets:delete_all_objects(statsd),
+ NewState = State#state{timers = gb_trees:empty()},
+ {noreply, NewState}.
+
+handle_call(_,_,State) -> {reply, ok, State}.
+
+handle_info(_Msg, State) -> {noreply, State}.
+
+code_change(_, _, State) -> {noreply, State}.
+
+terminate(_, _) -> ok.
+
+%% INTERNAL STUFF
+
+send_to_graphite(Msg, State) ->
+ % io:format("SENDING: ~s\n", [Msg]),
+ case gen_tcp:connect(State#state.graphite_host,
+ State#state.graphite_port,
+ [list, {packet, 0}]) of
+ {ok, Sock} ->
+ gen_tcp:send(Sock, Msg),
+ gen_tcp:close(Sock),
+ ok;
+ E ->
+ %error_logger:error_msg("Failed to connect to graphite: ~p", [E]),
+ E
+ end.
+
+% this string munging is damn ugly compared to javascript :(
+key2str(K) when is_atom(K) ->
+ atom_to_list(K);
+key2str(K) when is_binary(K) ->
+ key2str(binary_to_list(K));
+key2str(K) when is_list(K) ->
+ {ok, R1} = re:compile("\\s+"),
+ {ok, R2} = re:compile("/"),
+ {ok, R3} = re:compile("[^a-zA-Z_\\-0-9\\.]"),
+ Opts = [global, {return, list}],
+ S1 = re:replace(K, R1, "_", Opts),
+ S2 = re:replace(S1, R2, "-", Opts),
+ S3 = re:replace(S2, R3, "", Opts),
+ S3.
+
+num2str(NN) -> lists:flatten(io_lib:format("~w",[NN])).
+
+unixtime() -> {Meg,S,_Mic} = erlang:now(), Meg*1000000 + S.
+
+%% Aggregate the stats and generate a report to send to graphite
+do_report(All, State) ->
+ % One time stamp string used in all stats lines:
+ TsStr = num2str(unixtime()),
+ {MsgCounters, NumCounters} = do_report_counters(All, TsStr, State),
+ {MsgTimers, NumTimers} = do_report_timers(TsStr, State),
+ %% REPORT TO GRAPHITE
+ case NumTimers + NumCounters of
+ 0 -> nothing_to_report;
+ NumStats ->
+ FinalMsg = [ MsgCounters,
+ MsgTimers,
+ %% Also graph the number of graphs we're graphing:
+ "statsd.numStats ", num2str(NumStats), " ", TsStr, "\n"
+ ],
+ send_to_graphite(FinalMsg, State)
+ end.
+
+do_report_counters(All, TsStr, State) ->
+ Msg = lists:foldl(
+ fun({Key, {Val0,NumVals}}, Acc) ->
+ KeyS = key2str(Key),
+ Val = Val0 / (State#state.flush_interval/1000),
+ %% Build stats string for graphite
+ Fragment = [ "stats.", KeyS, " ",
+ io_lib:format("~w", [Val]), " ",
+ TsStr, "\n",
+
+ "stats_counts.", KeyS, " ",
+ io_lib:format("~w",[NumVals]), " ",
+ TsStr, "\n"
+ ],
+ [ Fragment | Acc ]
+ end, [], All),
+ {Msg, length(All)}.
+
+do_report_timers(TsStr, State) ->
+ Timings = gb_trees:to_list(State#state.timers),
+ Msg = lists:foldl(
+ fun({Key, Vals}, Acc) ->
+ KeyS = key2str(Key),
+ Values = lists:sort(Vals),
+ Count = length(Values),
+ Min = hd(Values),
+ Max = lists:last(Values),
+ PctThreshold = 90,
+ ThresholdIndex = erlang:round(((100-PctThreshold)/100)*Count),
+ NumInThreshold = Count - ThresholdIndex,
+ Values1 = lists:sublist(Values, NumInThreshold),
+ MaxAtThreshold = lists:nth(NumInThreshold, Values),
+ Mean = lists:sum(Values1) / NumInThreshold,
+ %% Build stats string for graphite
+ Startl = [ "stats.timers.", KeyS, "." ],
+ Endl = [" ", TsStr, "\n"],
+ Fragment = [ [Startl, Name, " ", num2str(Val), Endl] || {Name,Val} <-
+ [ {"mean", Mean},
+ {"upper", Max},
+ {"upper_"++num2str(PctThreshold), MaxAtThreshold},
+ {"lower", Min},
+ {"count", Count}
+ ]],
+ [ Fragment | Acc ]
+ end, [], Timings),
+ {Msg, length(Msg)}.
48 src/estatsd_sup.erl
@@ -0,0 +1,48 @@
+-module(estatsd_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0, start_link/1, start_link/3]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+-define(FLUSH_INTERVAL, appvar(flush_interval, 10000)).
+-define(GRAPHITE_HOST, appvar(graphite_host, "127.0.0.1")).
+-define(GRAPHITE_PORT, appvar(graphite_port, 2003)).
+
+%% ===================================================================
+%% API functions
+%% ===================================================================
+
+
+start_link() ->
+ start_link( ?FLUSH_INTERVAL, ?GRAPHITE_HOST, ?GRAPHITE_PORT).
+
+start_link(FlushIntervalMs) ->
+ start_link( FlushIntervalMs, ?GRAPHITE_HOST, ?GRAPHITE_PORT).
+
+start_link(FlushIntervalMs, GraphiteHost, GraphitePort) ->
+ supervisor:start_link({local, ?MODULE},
+ ?MODULE,
+ [FlushIntervalMs, GraphiteHost, GraphitePort]).
+
+%% ===================================================================
+%% Supervisor callbacks
+%% ===================================================================
+
+init([FlushIntervalMs, GraphiteHost, GraphitePort]) ->
+ Children = [
+ {estatsd_server,
+ {estatsd_server, start_link,
+ [FlushIntervalMs, GraphiteHost, GraphitePort]},
+ permanent, 5000, worker, [estatsd_server]}
+ ],
+ {ok, { {one_for_one, 10000, 10}, Children} }.
+
+appvar(K, Def) ->
+ case application:get_env(estatsd, K) of
+ {ok, Val} -> Val;
+ undefined -> Def
+ end.
Please sign in to comment.
Something went wrong with that request. Please try again.