Skip to content

Commit

Permalink
Removed RED functionnality + Refactored code (iolists)
Browse files Browse the repository at this point in the history
  • Loading branch information
lpgauth committed Mar 6, 2012
1 parent 7f0827d commit fbc5c3e
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 88 deletions.
2 changes: 1 addition & 1 deletion src/statsderl.app.src
@@ -1,6 +1,6 @@
{application, statsderl, [
{description, "StatsD client"},
{vsn, "0.2"},
{vsn, "0.3"},
{registered, []},
{applications, [
kernel,
Expand Down
117 changes: 47 additions & 70 deletions src/statsderl.erl
Expand Up @@ -2,8 +2,6 @@
-behaviour(gen_server).

-define(SERVER, ?MODULE).
-define(LOW_BACKLOG, 5000).
-define(HIGH_BACKLOG, 10000).

-record(state, {
hostname,
Expand All @@ -16,7 +14,8 @@
%% API Function Exports
%% ------------------------------------------------------------------

-export([start_link/1, increment/3, decrement/3, timing/3, timing_now/3]).
-export([start_link/0, increment/3, decrement/3, timing/3,
timing_now/3]).

%% ------------------------------------------------------------------
%% gen_server Function Exports
Expand All @@ -29,51 +28,47 @@
%% API Function Definitions
%% ------------------------------------------------------------------

start_link(BaseKey) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, BaseKey, []).

increment(Key, Magnitude, SampleRate) ->
Stats = io_lib:format("~s:~B|c|@~f", [Key, Magnitude, SampleRate]),
udp_send(Stats, SampleRate).

decrement(Key, Magnitude, SampleRate) ->
Stats = io_lib:format("~s:-~B|c|@~f", [Key, Magnitude, SampleRate]),
udp_send(Stats, SampleRate).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

increment(Key, Value, SampleRate) ->
send(increment, Key, Value, SampleRate).

decrement(Key, Value, SampleRate) ->
send(decrement, Key, Value, SampleRate).

timing(Key, Value, SampleRate) ->
Stats = io_lib:format("~s:~B|ms|@~f", [Key, Value, SampleRate]),
udp_send(Stats, SampleRate).
send(timing, Key, Value, SampleRate).

timing_now(Key, Timestamp, SampleRate) ->
Value = timer:now_diff(erlang:now(), Timestamp) div 1000,
timing(Key, Value, SampleRate).
timing(Key, now_diff_ms(Timestamp), SampleRate).

%% ------------------------------------------------------------------
%% gen_server Function Definitions
%% ------------------------------------------------------------------

init(BaseKey) ->
init(_Args) ->
{ok, Hostname} = application:get_env(statsderl, hostname),
{ok, Port} = application:get_env(statsderl, port),
BaseKey = case application:get_env(statsderl, base_key) of
{ok, Key} -> [Key, $.];
undefined -> <<"">>
end,
{ok, Socket} = gen_udp:open(0, [{active, false}]),
State = #state {
hostname = Hostname,
port = Port,
socket = Socket,
basekey = case BaseKey of
"" -> "";
_ -> [BaseKey, $.]
end
basekey = BaseKey,
socket = Socket
},
{ok, State}.

handle_call(_Request, _From, State) ->
{noreply, ok, State}.

handle_cast({udp_send, Stats},
#state{hostname=Hostname, port=Port, socket=Socket, basekey=BaseKey}=State) ->
gen_udp:send(Socket, Hostname, Port, [BaseKey, Stats]),
decrease_backlog(),
handle_cast({send, Packet}, State=#state{hostname=Hostname,
port=Port, socket=Socket, basekey=BaseKey}) ->
gen_udp:send(Socket, Hostname, Port, [BaseKey, Packet]),
{noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.
Expand All @@ -91,51 +86,33 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal Function Definitions
%% ------------------------------------------------------------------

udp_send(Stats, SampleRate) ->
case sample(SampleRate) of
send(Method, Key, Value, SampleRate) ->
random:seed(erlang:now()),
case random:uniform() =< SampleRate of
true ->
increase_backlog(),
gen_server:cast(?MODULE, {udp_send, Stats});
Packet = generate_packet(Method, Key, Value, SampleRate),
gen_server:cast(?MODULE, {send, Packet});
false ->
ok
end.

sample(SampleRate) ->
case red() of
true ->
false;
false ->
random:seed(erlang:now()),
random:uniform() =< SampleRate
end.

%% ------------------------------------------------------------------
%% Random Early Drop
%% http://en.wikipedia.org/wiki/Random_early_detection
%% ------------------------------------------------------------------

red() ->
Requests = ets:lookup_element(statsderl, backlog, 2),
case Requests of
_ when ?LOW_BACKLOG >= Requests->
false;
_ when Requests >= ?HIGH_BACKLOG ->
true;
_ ->
random_drop_function(Requests)
end.

random_drop_function(Requests) ->
Distribution = ?HIGH_BACKLOG - Requests,
case erlang:phash2({self(), now()}, Distribution) + 1 of
Distribution ->
true;
_ ->
false
end.

increase_backlog() ->
ets:update_counter(statsderl, backlog, 1).

decrease_backlog() ->
ets:update_counter(statsderl, backlog, -1).
now_diff_ms(Timestamp) ->
timer:now_diff(os:timestamp(), Timestamp) div 1000.

generate_packet(Method, Key, Value, SampleRate) ->
BinSampleRate =
case SampleRate >= 1 of
true ->
<<"">>;
false ->
[<<"|@">>, io_lib:format("~.3f", [SampleRate])]
end,
BinValue = list_to_binary(integer_to_list(Value)),
case Method of
increment ->
[Key, <<":">>, BinValue, <<"|c">>, BinSampleRate];
decrement ->
[Key, <<":-">>, BinValue, <<"|c">>, BinSampleRate];
timing ->
[Key, <<":">>, BinValue, <<"|ms">>, BinSampleRate]
end.
21 changes: 4 additions & 17 deletions src/statsderl_sup.erl
Expand Up @@ -10,31 +10,18 @@
-export([init/1]).

%% Helper macro for declaring children of supervisor
-define(CHILD(I, Type, Args), {I, {I, start_link, Args}, permanent, 5000, Type, [I]}).
-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).

%% ===================================================================
%% API functions
%% ===================================================================

start_link() ->
init_red(),
BaseKey = case application:get_env(statsderl, base_key) of
{ok, Key} -> Key;
undefined -> ""
end,
supervisor:start_link({local, ?MODULE}, ?MODULE, BaseKey).
supervisor:start_link({local, ?MODULE}, ?MODULE, []).

%% ===================================================================
%% Supervisor callbacks
%% ===================================================================

init(BaseKey) ->
{ok, { {one_for_one, 5, 10}, [?CHILD(statsderl, worker, [BaseKey])]} }.

%% ------------------------------------------------------------------
%% Internal Function Definitions
%% ------------------------------------------------------------------

init_red() ->
statsderl = ets:new(statsderl, [set, public, named_table, {read_concurrency, true}]),
true = ets:insert(statsderl, {backlog, 0}).
init(_Args) ->
{ok, { {one_for_one, 5, 10}, [?CHILD(statsderl, worker)]} }.

0 comments on commit fbc5c3e

Please sign in to comment.