Skip to content
Permalink
Browse files
Performance improvements to spiral, counter, histogram & spiral_uniform
Changes include:

Partition counter and spiral writes by erlang:system_info(scheduler_id)
and a bitwise mask. There is also potential for better cache behavior
given the fixed mapping between Erlang scheduler thread and partitioned
key.

Switch spiral and slide_uniform from ordered_set to set. Set supports
fine grained locking whereas ordered_set requires a full-table lock.
Combining set and separating values greatly reduces ETS contention.

Change histogram to avoid an ETS insert if the sample passed into the
histogram update function matches the result.

There are 2 places in folsom where an ets:insert_new is done immediately
followed by an ets:update_counter on the same key. Since, in the normal
case, the key is likely to already exist, this can be optimized by
trying the update_counter first in a try/catch and only do the
insert_new if needed. This is provided as a utility function called
folsom_utils:update_counter().

There is a bug in slide_uniform where it would not decrease the
probability of doing a write the more updates it received in a
particular moment. Effectively slide_uniform updates would always result
in a write. This bug has been corrected, along with the Quickcheck test.
  • Loading branch information
Vagabond committed Dec 18, 2012
1 parent d68bb19 commit 2be6249280fd686c6ecf79b1383433f2efb54efa
Showing 8 changed files with 129 additions and 40 deletions.
@@ -17,7 +17,7 @@

-record(spiral, {
tid = folsom_metrics_histogram_ets:new(folsom_spiral,
[ordered_set,
[set,
{write_concurrency, true},
public]),
server
@@ -33,7 +33,7 @@
-record(slide_uniform, {
window = ?DEFAULT_SLIDING_WINDOW,
size = ?DEFAULT_SIZE,
reservoir = folsom_metrics_histogram_ets:new(folsom_slide_uniform,[ordered_set, {write_concurrency, true}, public]),
reservoir = folsom_metrics_histogram_ets:new(folsom_slide_uniform,[set, {write_concurrency, true}, public]),
seed = now(),
server
}).
@@ -32,27 +32,34 @@
get_value/1,
clear/1]).

-define(WIDTH, 16). %% Keep this a power of two

-include("folsom.hrl").

new(Name) ->
Counter = {Name, 0},
ets:insert(?COUNTER_TABLE, Counter).
Counters = [{{Name,N}, 0} || N <- lists:seq(0,?WIDTH-1)],
ets:insert(?COUNTER_TABLE, Counters).

inc(Name) ->
ets:update_counter(?COUNTER_TABLE, Name, 1).
ets:update_counter(?COUNTER_TABLE, key(Name), 1).

inc(Name, Value) ->
ets:update_counter(?COUNTER_TABLE, Name, Value).
ets:update_counter(?COUNTER_TABLE, key(Name), Value).

dec(Name) ->
ets:update_counter(?COUNTER_TABLE, Name, -1).
ets:update_counter(?COUNTER_TABLE, key(Name), -1).

dec(Name, Value) ->
ets:update_counter(?COUNTER_TABLE, Name, -Value).
ets:update_counter(?COUNTER_TABLE, key(Name), -Value).

get_value(Name) ->
[{_, Values}] = ets:lookup(?COUNTER_TABLE, Name),
Values.
Count = lists:sum(ets:select(?COUNTER_TABLE, [{{{Name,'_'},'$1'},[],['$1']}])),
Count.

clear(Name) ->
new(Name).

key(Name) ->
X = erlang:system_info(scheduler_id),
Rnd = X band (?WIDTH-1),
{Name, Rnd}.
@@ -57,8 +57,14 @@ new(Name, SampleType, SampleSize, Alpha) ->

update(Name, Value) ->
Hist = get_value(Name),
NewSample = folsom_sample:update(Hist#histogram.type, Hist#histogram.sample, Value),
ets:insert(?HISTOGRAM_TABLE, {Name, Hist#histogram{sample = NewSample}}).
Sample = Hist#histogram.sample,
case folsom_sample:update(Hist#histogram.type, Hist#histogram.sample, Value) of
Sample ->
%% sample didn't change, don't need to write it back
true;
NewSample ->
ets:insert(?HISTOGRAM_TABLE, {Name, Hist#histogram{sample = NewSample}})
end.

% gets the histogram record from ets
get_value(Name) ->
@@ -34,6 +34,7 @@

%% size of the window in seconds
-define(WINDOW, 60).
-define(WIDTH, 16). %% Keep this a power of two

-include("folsom.hrl").

@@ -42,29 +43,31 @@ new(Name) ->
Pid = folsom_sample_slide_sup:start_slide_server(?MODULE,
Spiral#spiral.tid,
?WINDOW),
ets:insert_new(Spiral#spiral.tid, {count, 0}),
ets:insert_new(Spiral#spiral.tid,
[{{count, N}, 0} || N <- lists:seq(0,?WIDTH-1)]),
ets:insert(?SPIRAL_TABLE, {Name, Spiral#spiral{server=Pid}}).

update(Name, Value) ->
#spiral{tid=Tid} = get_value(Name),
Moment = folsom_utils:now_epoch(),
ets:insert_new(Tid, {Moment, 0}),
ets:update_counter(Tid, Moment, Value),
ets:update_counter(Tid, count, Value).
X = erlang:system_info(scheduler_id),
Rnd = X band (?WIDTH-1),
folsom_utils:update_counter(Tid, {Moment, Rnd}, Value),
ets:update_counter(Tid, {count, Rnd}, Value).

get_value(Name) ->
[{Name, Spiral}] = ets:lookup(?SPIRAL_TABLE, Name),
Spiral.

trim(Tid, _Window) ->
Oldest = oldest(),
ets:select_delete(Tid, [{{'$1','_'}, [{is_integer, '$1'}, {'<', '$1', Oldest}], ['true']}]).
ets:select_delete(Tid, [{{{'$1','_'},'_'}, [{is_integer, '$1'}, {'<', '$1', Oldest}], ['true']}]).

get_values(Name) ->
Oldest = oldest(),
#spiral{tid=Tid} = get_value(Name),
[{count, Count}] = ets:lookup(Tid, count),
One =lists:sum(ets:select(Tid, [{{'$1','$2'},[{is_integer, '$1'}, {'>=', '$1', Oldest}],['$2']}])),
Count = lists:sum(ets:select(Tid, [{{{count,'_'},'$1'},[],['$1']}])),
One = lists:sum(ets:select(Tid, [{{{'$1','_'},'$2'},[{is_integer, '$1'}, {'>=', '$1', Oldest}],['$2']}])),

[{count, Count}, {one, One}].

@@ -38,19 +38,19 @@ new({Window, SampleSize}) ->
Pid = folsom_sample_slide_sup:start_slide_server(?MODULE, Sample#slide_uniform.reservoir, Sample#slide_uniform.window),
Sample#slide_uniform{server=Pid}.

update(#slide_uniform{reservoir = Reservoir, size = Size, seed = Seed} = Sample0, Value) ->
Moment = moment(),
ets:insert_new(Reservoir, {Moment, 0}),
MCnt = ets:update_counter(Reservoir, Moment, 1),
update(#slide_uniform{reservoir = Reservoir, size = Size} = Sample0, Value) ->
Now = folsom_utils:timestamp(),
Moment = folsom_utils:now_epoch(Now),
MCnt = folsom_utils:update_counter(Reservoir, Moment, 1),
Sample = case MCnt > Size of
true ->
{Rnd, NewSeed} = random:uniform_s(Size, Seed),
{Rnd, _NewSeed} = random:uniform_s(MCnt, Now),
maybe_update(Reservoir, {{Moment, Rnd}, Value}, Size),
Sample0#slide_uniform{seed = NewSeed};
false ->
ets:insert(Reservoir, {{Moment, MCnt}, Value}),
Sample0
end,
Sample0;
false ->
ets:insert(Reservoir, {{Moment, MCnt}, Value}),
Sample0
end,
Sample.

maybe_update(Reservoir, {{_Moment, Rnd}, _Value}=Obj, Size) when Rnd =< Size ->
@@ -28,8 +28,11 @@
to_atom/1,
convert_tags/1,
now_epoch/0,
now_epoch/1,
now_epoch_micro/0,
get_ets_size/1
timestamp/0,
get_ets_size/1,
update_counter/3
]).

to_atom(Binary) when is_binary(Binary) ->
@@ -41,12 +44,39 @@ convert_tags(Tags) ->
[to_atom(Tag) || Tag <- Tags].

now_epoch() ->
{Mega, Sec, _} = os:timestamp(),
now_epoch(os:timestamp()).

now_epoch({Mega, Sec, _}) ->
(Mega * 1000000 + Sec).

now_epoch_micro() ->
{Mega, Sec, Micro} = os:timestamp(),
(Mega * 1000000 + Sec) * 1000000 + Micro.

%% useful because you can't meck os:timestamp for some reason
timestamp() ->
os:timestamp().

get_ets_size(Tab) ->
ets:info(Tab, size).

%% @doc
%% Same as {@link ets:update_counter/3} but inserts `{Key, Value}' if object
%% is missing in the table.
update_counter(Tid, Key, Value) when is_integer(Value) ->
%% try to update the counter, will badarg if it doesn't exist
try ets:update_counter(Tid, Key, Value) of
Res ->
Res
catch
error:badarg ->
%% row didn't exist, create it
%% use insert_new to avoid races
case ets:insert_new(Tid, {Key, Value}) of
true ->
Value;
false ->
%% someone beat us to it
ets:update_counter(Tid, Key, Value)
end
end.
@@ -73,3 +73,27 @@ cleanup_app(ok) ->
application:stop(folsom),
application:unload(folsom),
ok.

update_counter_test() ->
Tid = ets:new(sometable, [public, set]),
Workers = [spawn_monitor(fun() -> timer:sleep(100-N), folsom_utils:update_counter(Tid, hello, N) end) || N <- lists:seq(1, 100)],
wait_for_results(Workers),
?assertEqual([{hello, 5050}], ets:lookup(Tid, hello)).

wait_for_results([]) ->
ok;
wait_for_results(Workers) ->
receive
{'DOWN', _, _, Pid, Reason} ->
case lists:keyfind(Pid, 1, Workers) of
false ->
wait_for_results(Workers);
_ ->
case Reason of
normal ->
wait_for_results(lists:keydelete(Pid, 1, Workers));
_ ->
erlang:error(Reason)
end
end
end.
@@ -44,9 +44,11 @@
-record(state, {moment=1000,
sample,
name,
count=orddict:new(),
values=[]}).

initial_state() ->
meck:expect(folsom_utils, now_epoch, fun(_Now) -> 1000 end),
meck:expect(folsom_utils, now_epoch, fun() -> 1000 end),
#state{}.

@@ -64,8 +66,9 @@ next_state(S, V, {call, ?MODULE, new_histo, []}) ->
S#state{name={call, erlang, element, [1, V]}, sample={call, erlang, element, [2, V]}};
next_state(S, V, {call, ?MODULE, tick, [_Moment]}) ->
S#state{moment=V};
next_state(#state{moment=Moment, values=Values0, sample=Sample}=S, NewSample, {call, ?MODULE, update, [_, Val]}) ->
S#state{values={call, slide_uniform_eqc, new_state_values, [Sample, Moment, Values0, Val]},
next_state(#state{moment=Moment, values=Values0, sample=Sample, count=Count}=S, NewSample, {call, ?MODULE, update, [_, Val]}) ->
S#state{values={call, slide_uniform_eqc, new_state_values, [Sample, Moment, Values0, Val, Count]},
count={call, orddict, update_counter, [Moment, 1, Count]},
sample=NewSample};
next_state(#state{values=Values, moment=Moment}=S, _V, {call, ?MODULE, trim, _}) ->
%% trim the model
@@ -107,11 +110,14 @@ postcondition(_S, {call, ?MODULE, _, _}, _Res) ->
prop_window_test_() ->
{setup, fun() -> ok end, fun(_X) -> (catch meck:unload(folsom_utils)), folsom:stop() end,
fun(_X) ->
?_assert(eqc:quickcheck(eqc:numtests(?NUMTESTS, ?QC_OUT(prop_window())))) end}.
{timeout, 30,
?_assert(eqc:quickcheck(eqc:numtests(?NUMTESTS, ?QC_OUT(prop_window()))))} end}.

prop_window() ->
folsom:start(),
(catch meck:new(folsom_utils)),
(catch meck:expect(folsom_utils, update_counter, fun(Tid, Key, Value) -> meck:passthrough([Tid, Key, Value]) end)),
(catch meck:expect(folsom_utils, timestamp, fun() -> Res = os:timestamp(), put(timestamp, Res), Res end)),
?FORALL(Cmds, commands(?MODULE),
aggregate(command_names(Cmds),
begin
@@ -144,6 +150,7 @@ new_histo() ->
tick(Moment) ->
IncrBy = trunc(random:uniform(10)),
meck:expect(folsom_utils, now_epoch, fun() -> Moment + IncrBy end),
meck:expect(folsom_utils, now_epoch, fun(_Now) -> Moment + IncrBy end),
Moment+IncrBy.

update(Sample, Val) ->
@@ -159,16 +166,28 @@ get_values(Sample) ->
trim(L, Moment, Window) ->
[{K, V} || {{M, _C}=K, V} <- L, M >= Moment - Window].

new_state_values(Sample, Moment, Values, Val) ->
Cnt = length([true || {{M, _C}, _V} <- Values, M == Moment]),
case Cnt >= ?SIZE of
new_state_values(_Sample, Moment, Values, Val, Count) ->
%Cnt = length([true || {{M, _C}, _V} <- Values, M == Moment]),
Cnt =
case orddict:find(Moment, Count) of
error ->
1;
{ok, V} ->
V+1
end,
case Cnt > ?SIZE of
true ->
%% replace
{Rnd, _} = random:uniform_s(?SIZE, Sample#slide_uniform.seed),
lists:keyreplace({Moment, Rnd}, 1, Values, {{Moment, Rnd}, Val});
{Rnd, _} = random:uniform_s(Cnt, get(timestamp)),
case Rnd =< ?SIZE of
true ->
lists:keyreplace({Moment, Rnd}, 1, Values, {{Moment, Rnd}, Val});
false ->
Values
end;
false ->
%% insert
Values ++ [{{Moment, Cnt+1}, Val}]
Values ++ [{{Moment, Cnt}, Val}]
end.

-endif.

0 comments on commit 2be6249

Please sign in to comment.