Permalink
Browse files

Performance improvements to spiral, counter, histogram & spiral_uniform

Changes include:

Partition counter and spiral writes by erlang:system_info(scheduler_id)
and a bitwise mask. There is also potential for better cache behavior
given the fixed mapping between Erlang scheduler thread and partitioned
key.

Switch spiral and slide_uniform from ordered_set to set. Set supports
fine grained locking whereas ordered_set requires a full-table lock.
Combining set and separating values greatly reduces ETS contention.

Change histogram to avoid an ETS insert if the sample passed into the
histogram update function matches the result.

There are 2 places in folsom where an ets:insert_new is done immediately
followed by an ets:update_counter on the same key. Since, in the normal
case, the key is likely to already exist, this can be optimized by
trying the update_counter first in a try/catch and only do the
insert_new if needed. This is provided as a utility function called
folsom_utils:update_counter().

There is a bug in slide_uniform where it would not decrease the
probability of doing a write the more updates it received in a
particular moment. Effectively slide_uniform updates would always result
in a write. This bug has been corrected, along with the Quickcheck test.
  • Loading branch information...
1 parent d68bb19 commit 2be6249280fd686c6ecf79b1383433f2efb54efa @Vagabond Vagabond committed Dec 15, 2012
View
@@ -17,7 +17,7 @@
-record(spiral, {
tid = folsom_metrics_histogram_ets:new(folsom_spiral,
- [ordered_set,
+ [set,
{write_concurrency, true},
public]),
server
@@ -33,7 +33,7 @@
-record(slide_uniform, {
window = ?DEFAULT_SLIDING_WINDOW,
size = ?DEFAULT_SIZE,
- reservoir = folsom_metrics_histogram_ets:new(folsom_slide_uniform,[ordered_set, {write_concurrency, true}, public]),
+ reservoir = folsom_metrics_histogram_ets:new(folsom_slide_uniform,[set, {write_concurrency, true}, public]),
seed = now(),
server
}).
@@ -32,27 +32,34 @@
get_value/1,
clear/1]).
+-define(WIDTH, 16). %% Keep this a power of two
+
-include("folsom.hrl").
new(Name) ->
- Counter = {Name, 0},
- ets:insert(?COUNTER_TABLE, Counter).
+ Counters = [{{Name,N}, 0} || N <- lists:seq(0,?WIDTH-1)],
+ ets:insert(?COUNTER_TABLE, Counters).
inc(Name) ->
- ets:update_counter(?COUNTER_TABLE, Name, 1).
+ ets:update_counter(?COUNTER_TABLE, key(Name), 1).
inc(Name, Value) ->
- ets:update_counter(?COUNTER_TABLE, Name, Value).
+ ets:update_counter(?COUNTER_TABLE, key(Name), Value).
dec(Name) ->
- ets:update_counter(?COUNTER_TABLE, Name, -1).
+ ets:update_counter(?COUNTER_TABLE, key(Name), -1).
dec(Name, Value) ->
- ets:update_counter(?COUNTER_TABLE, Name, -Value).
+ ets:update_counter(?COUNTER_TABLE, key(Name), -Value).
get_value(Name) ->
- [{_, Values}] = ets:lookup(?COUNTER_TABLE, Name),
- Values.
+ Count = lists:sum(ets:select(?COUNTER_TABLE, [{{{Name,'_'},'$1'},[],['$1']}])),
+ Count.
clear(Name) ->
new(Name).
+
+key(Name) ->
+ X = erlang:system_info(scheduler_id),
+ Rnd = X band (?WIDTH-1),
+ {Name, Rnd}.
@@ -57,8 +57,14 @@ new(Name, SampleType, SampleSize, Alpha) ->
update(Name, Value) ->
Hist = get_value(Name),
- NewSample = folsom_sample:update(Hist#histogram.type, Hist#histogram.sample, Value),
- ets:insert(?HISTOGRAM_TABLE, {Name, Hist#histogram{sample = NewSample}}).
+ Sample = Hist#histogram.sample,
+ case folsom_sample:update(Hist#histogram.type, Hist#histogram.sample, Value) of
+ Sample ->
+ %% sample didn't change, don't need to write it back
+ true;
+ NewSample ->
+ ets:insert(?HISTOGRAM_TABLE, {Name, Hist#histogram{sample = NewSample}})
+ end.
% gets the histogram record from ets
get_value(Name) ->
@@ -34,6 +34,7 @@
%% size of the window in seconds
-define(WINDOW, 60).
+-define(WIDTH, 16). %% Keep this a power of two
-include("folsom.hrl").
@@ -42,29 +43,31 @@ new(Name) ->
Pid = folsom_sample_slide_sup:start_slide_server(?MODULE,
Spiral#spiral.tid,
?WINDOW),
- ets:insert_new(Spiral#spiral.tid, {count, 0}),
+ ets:insert_new(Spiral#spiral.tid,
+ [{{count, N}, 0} || N <- lists:seq(0,?WIDTH-1)]),
ets:insert(?SPIRAL_TABLE, {Name, Spiral#spiral{server=Pid}}).
update(Name, Value) ->
#spiral{tid=Tid} = get_value(Name),
Moment = folsom_utils:now_epoch(),
- ets:insert_new(Tid, {Moment, 0}),
- ets:update_counter(Tid, Moment, Value),
- ets:update_counter(Tid, count, Value).
+ X = erlang:system_info(scheduler_id),
+ Rnd = X band (?WIDTH-1),
+ folsom_utils:update_counter(Tid, {Moment, Rnd}, Value),
+ ets:update_counter(Tid, {count, Rnd}, Value).
get_value(Name) ->
[{Name, Spiral}] = ets:lookup(?SPIRAL_TABLE, Name),
Spiral.
trim(Tid, _Window) ->
Oldest = oldest(),
- ets:select_delete(Tid, [{{'$1','_'}, [{is_integer, '$1'}, {'<', '$1', Oldest}], ['true']}]).
+ ets:select_delete(Tid, [{{{'$1','_'},'_'}, [{is_integer, '$1'}, {'<', '$1', Oldest}], ['true']}]).
get_values(Name) ->
Oldest = oldest(),
#spiral{tid=Tid} = get_value(Name),
- [{count, Count}] = ets:lookup(Tid, count),
- One =lists:sum(ets:select(Tid, [{{'$1','$2'},[{is_integer, '$1'}, {'>=', '$1', Oldest}],['$2']}])),
+ Count = lists:sum(ets:select(Tid, [{{{count,'_'},'$1'},[],['$1']}])),
+ One = lists:sum(ets:select(Tid, [{{{'$1','_'},'$2'},[{is_integer, '$1'}, {'>=', '$1', Oldest}],['$2']}])),
[{count, Count}, {one, One}].
@@ -38,19 +38,19 @@ new({Window, SampleSize}) ->
Pid = folsom_sample_slide_sup:start_slide_server(?MODULE, Sample#slide_uniform.reservoir, Sample#slide_uniform.window),
Sample#slide_uniform{server=Pid}.
-update(#slide_uniform{reservoir = Reservoir, size = Size, seed = Seed} = Sample0, Value) ->
- Moment = moment(),
- ets:insert_new(Reservoir, {Moment, 0}),
- MCnt = ets:update_counter(Reservoir, Moment, 1),
+update(#slide_uniform{reservoir = Reservoir, size = Size} = Sample0, Value) ->
+ Now = folsom_utils:timestamp(),
+ Moment = folsom_utils:now_epoch(Now),
+ MCnt = folsom_utils:update_counter(Reservoir, Moment, 1),
Sample = case MCnt > Size of
true ->
- {Rnd, NewSeed} = random:uniform_s(Size, Seed),
+ {Rnd, _NewSeed} = random:uniform_s(MCnt, Now),
maybe_update(Reservoir, {{Moment, Rnd}, Value}, Size),
- Sample0#slide_uniform{seed = NewSeed};
- false ->
- ets:insert(Reservoir, {{Moment, MCnt}, Value}),
- Sample0
- end,
+ Sample0;
+ false ->
+ ets:insert(Reservoir, {{Moment, MCnt}, Value}),
+ Sample0
+ end,
Sample.
maybe_update(Reservoir, {{_Moment, Rnd}, _Value}=Obj, Size) when Rnd =< Size ->
View
@@ -28,8 +28,11 @@
to_atom/1,
convert_tags/1,
now_epoch/0,
+ now_epoch/1,
now_epoch_micro/0,
- get_ets_size/1
+ timestamp/0,
+ get_ets_size/1,
+ update_counter/3
]).
to_atom(Binary) when is_binary(Binary) ->
@@ -41,12 +44,39 @@ convert_tags(Tags) ->
[to_atom(Tag) || Tag <- Tags].
now_epoch() ->
- {Mega, Sec, _} = os:timestamp(),
+ now_epoch(os:timestamp()).
+
+now_epoch({Mega, Sec, _}) ->
(Mega * 1000000 + Sec).
now_epoch_micro() ->
{Mega, Sec, Micro} = os:timestamp(),
(Mega * 1000000 + Sec) * 1000000 + Micro.
+%% useful because you can't meck os:timestamp for some reason
+timestamp() ->
+ os:timestamp().
+
get_ets_size(Tab) ->
ets:info(Tab, size).
+
+%% @doc
+%% Same as {@link ets:update_counter/3} but inserts `{Key, Value}' if object
+%% is missing in the table.
+update_counter(Tid, Key, Value) when is_integer(Value) ->
+ %% try to update the counter, will badarg if it doesn't exist
+ try ets:update_counter(Tid, Key, Value) of
+ Res ->
+ Res
+ catch
+ error:badarg ->
+ %% row didn't exist, create it
+ %% use insert_new to avoid races
+ case ets:insert_new(Tid, {Key, Value}) of
+ true ->
+ Value;
+ false ->
+ %% someone beat us to it
+ ets:update_counter(Tid, Key, Value)
+ end
+ end.
View
@@ -73,3 +73,27 @@ cleanup_app(ok) ->
application:stop(folsom),
application:unload(folsom),
ok.
+
+update_counter_test() ->
+ Tid = ets:new(sometable, [public, set]),
+ Workers = [spawn_monitor(fun() -> timer:sleep(100-N), folsom_utils:update_counter(Tid, hello, N) end) || N <- lists:seq(1, 100)],
+ wait_for_results(Workers),
+ ?assertEqual([{hello, 5050}], ets:lookup(Tid, hello)).
+
+wait_for_results([]) ->
+ ok;
+wait_for_results(Workers) ->
+ receive
+ {'DOWN', _, _, Pid, Reason} ->
+ case lists:keyfind(Pid, 1, Workers) of
+ false ->
+ wait_for_results(Workers);
+ _ ->
+ case Reason of
+ normal ->
+ wait_for_results(lists:keydelete(Pid, 1, Workers));
+ _ ->
+ erlang:error(Reason)
+ end
+ end
+ end.
@@ -44,9 +44,11 @@
-record(state, {moment=1000,
sample,
name,
+ count=orddict:new(),
values=[]}).
initial_state() ->
+ meck:expect(folsom_utils, now_epoch, fun(_Now) -> 1000 end),
meck:expect(folsom_utils, now_epoch, fun() -> 1000 end),
#state{}.
@@ -64,8 +66,9 @@ next_state(S, V, {call, ?MODULE, new_histo, []}) ->
S#state{name={call, erlang, element, [1, V]}, sample={call, erlang, element, [2, V]}};
next_state(S, V, {call, ?MODULE, tick, [_Moment]}) ->
S#state{moment=V};
-next_state(#state{moment=Moment, values=Values0, sample=Sample}=S, NewSample, {call, ?MODULE, update, [_, Val]}) ->
- S#state{values={call, slide_uniform_eqc, new_state_values, [Sample, Moment, Values0, Val]},
+next_state(#state{moment=Moment, values=Values0, sample=Sample, count=Count}=S, NewSample, {call, ?MODULE, update, [_, Val]}) ->
+ S#state{values={call, slide_uniform_eqc, new_state_values, [Sample, Moment, Values0, Val, Count]},
+ count={call, orddict, update_counter, [Moment, 1, Count]},
sample=NewSample};
next_state(#state{values=Values, moment=Moment}=S, _V, {call, ?MODULE, trim, _}) ->
%% trim the model
@@ -107,11 +110,14 @@ postcondition(_S, {call, ?MODULE, _, _}, _Res) ->
prop_window_test_() ->
{setup, fun() -> ok end, fun(_X) -> (catch meck:unload(folsom_utils)), folsom:stop() end,
fun(_X) ->
- ?_assert(eqc:quickcheck(eqc:numtests(?NUMTESTS, ?QC_OUT(prop_window())))) end}.
+ {timeout, 30,
+ ?_assert(eqc:quickcheck(eqc:numtests(?NUMTESTS, ?QC_OUT(prop_window()))))} end}.
prop_window() ->
folsom:start(),
(catch meck:new(folsom_utils)),
+ (catch meck:expect(folsom_utils, update_counter, fun(Tid, Key, Value) -> meck:passthrough([Tid, Key, Value]) end)),
+ (catch meck:expect(folsom_utils, timestamp, fun() -> Res = os:timestamp(), put(timestamp, Res), Res end)),
?FORALL(Cmds, commands(?MODULE),
aggregate(command_names(Cmds),
begin
@@ -144,6 +150,7 @@ new_histo() ->
tick(Moment) ->
IncrBy = trunc(random:uniform(10)),
meck:expect(folsom_utils, now_epoch, fun() -> Moment + IncrBy end),
+ meck:expect(folsom_utils, now_epoch, fun(_Now) -> Moment + IncrBy end),
Moment+IncrBy.
update(Sample, Val) ->
@@ -159,16 +166,28 @@ get_values(Sample) ->
trim(L, Moment, Window) ->
[{K, V} || {{M, _C}=K, V} <- L, M >= Moment - Window].
-new_state_values(Sample, Moment, Values, Val) ->
- Cnt = length([true || {{M, _C}, _V} <- Values, M == Moment]),
- case Cnt >= ?SIZE of
+new_state_values(_Sample, Moment, Values, Val, Count) ->
+ %Cnt = length([true || {{M, _C}, _V} <- Values, M == Moment]),
+ Cnt =
+ case orddict:find(Moment, Count) of
+ error ->
+ 1;
+ {ok, V} ->
+ V+1
+ end,
+ case Cnt > ?SIZE of
true ->
%% replace
- {Rnd, _} = random:uniform_s(?SIZE, Sample#slide_uniform.seed),
- lists:keyreplace({Moment, Rnd}, 1, Values, {{Moment, Rnd}, Val});
+ {Rnd, _} = random:uniform_s(Cnt, get(timestamp)),
+ case Rnd =< ?SIZE of
+ true ->
+ lists:keyreplace({Moment, Rnd}, 1, Values, {{Moment, Rnd}, Val});
+ false ->
+ Values
+ end;
false ->
%% insert
- Values ++ [{{Moment, Cnt+1}, Val}]
+ Values ++ [{{Moment, Cnt}, Val}]
end.
-endif.

0 comments on commit 2be6249

Please sign in to comment.