Permalink
Browse files

Moving to an ETS-based implementation of deduper

This allows much better performances than a dict-based version.
Also note that most of the improvements are also available due
to moving to a 'first match' search for duplicates (below
the given treshold) rather than a 'best match' option, in order
to possibly reduce the maximum number of match attempts to be made
when logs are wildly different.

It's still not known if this is the best approach, but it did
allow to survive overload situations much better.

Also using ETS gives way to try some interesting optimizations
by making the table protected later on (i.e. do not send
an error message if 1000 are already in the table).
  • Loading branch information...
1 parent 7405033 commit 1240b68ba25cb5263458f209c2c373575dd51d7b @ferd committed Oct 15, 2012
Showing with 128 additions and 45 deletions.
  1. +61 −45 src/lager_deduper.erl
  2. +67 −0 test/lager_overload.erl
View
@@ -37,8 +37,8 @@ handle_call({seen, Key}, _From, S = #state{db=DB}) ->
{ok, _} ->
{reply, yes, S#state{db=increment(Key, DB)}};
undefined ->
- case {closest(Key, DB), treshold()} of
- {{Dist, MatchKey}, Treshold} when Dist =< Treshold ->
+ case close_enough(Key, DB, treshold()) of
+ {_Dist, MatchKey} ->
{reply, yes, S#state{db=increment(MatchKey, DB)}};
_ ->
{reply, no, S#state{db=store(Key, undefined, DB)}}
@@ -61,56 +61,72 @@ terminate(_, _) -> ok.
delay() -> lager_mochiglobal:get(duplicate_dump, ?DEFAULT_TIMEOUT).
treshold() -> lager_mochiglobal:get(duplicate_treshold, 1).
-%empty() -> [].
-empty() -> dict:new().
+empty() -> ets:new(?MODULE, [private]).
-lookup(Key, Dict) ->
- case dict:find(Key, Dict) of
- error -> undefined;
- X -> X
+lookup(Key, Tab) ->
+ case ets:lookup(Tab, Key) of
+ [] -> undefined;
+ [{_,Ct,Val}] -> {ok,{Ct,Val}}
end.
%% assumes the key is present
-increment(Key, Dict) ->
- dict:update(Key, fun({Ct, Val}) -> {Ct+1, Val} end, Dict).
-
-store(Key, Val, Dict) ->
- dict:update(Key, fun({Ct, _}) -> {Ct, Val} end, {1, Val}, Dict).
-
-closest({Lvl, Hash}, Dict) ->
- dict:fold(
- fun({Level, H}, _Val, undefined) when Level =:= Lvl ->
- {simhash:distance(Hash, H), {Level, H}};
- ({Level, H}, _Val, Best) when Level =:= Lvl ->
- min({simhash:distance(Hash, H), {Level,H}}, Best);
- (_Key, _Val, Best) ->
- Best
- end,
- undefined,
- Dict).
-
-
-dump(Dict) ->
- dict:fold(
- fun(Key, Val = {_, undefined}, D) ->
- %% race condition between hash seen and log, hash
- %% likely incoming
- dict:store(Key, Val, D);
- (_K, {1, Log = {log, _Lvl, _Ts, _Msg}}, D) ->
+increment(Key, Tab) ->
+ ets:update_counter(Tab, Key, 1),
+ Tab.
+
+store(Key, Val, Tab) ->
+ case ets:update_element(Tab, Key, {3,Val}) of
+ false -> ets:insert(Tab, {Key, 1, Val});
+ true -> ok
+ end,
+ Tab.
+
+close_enough(Key, Tab, Limit) ->
+ close_enough(Key, Tab, Limit, ets:first(Tab)).
+
+close_enough({Level, Hash}, Tab, Limit, Current = {Level, H}) ->
+ case simhash:distance(Hash, H) of
+ X when X =< Limit ->
+ {X, {Level, H}};
+ _ ->
+ close_enough({Level, Hash}, Tab, Limit, ets:next(Tab, Current))
+ end;
+close_enough(_, _, _, '$end_of_table') ->
+ undefined;
+close_enough(Key, Tab, Limit, Current) ->
+ close_enough(Key, Tab, Limit, ets:next(Tab, Current)).
+
+dump(Tab) ->
+ dump(Tab, ets:first(Tab)).
+
+dump(Tab, '$end_of_table') ->
+ Tab;
+dump(Tab, Current) ->
+ case ets:lookup(Tab, Current) of
+ [{_,_,undefined}] -> % may occur between hash set and log
+ dump(Tab, ets:next(Tab, Current));
+ [{Key, 1, Log = {log, _Lvl, _Ts, _Msg}}] ->
safe_notify(Log),
- D;
- (_K, {Ct, {log, Lvl, Ts, [LvlStr, Loc, Msg]}}, D) ->
- safe_notify({log, Lvl, Ts, [LvlStr, Loc, [Msg, io_lib:format(" (~b times)", [Ct])]]}),
- D;
- (_K, {1, Log = {log, _Dest, _Lvl, _Ts, _Msg}}, D) ->
+ Next = ets:next(Tab, Current),
+ ets:delete(Tab,Key),
+ dump(Tab, Next);
+ [{Key, 1, Log = {log, _Dest, _Lvl, _Ts, _Msg}}] ->
safe_notify(Log),
- D;
- (_K, {Ct, {log, Dest, Lvl, Ts, [LvlStr, Loc, Msg]}}, D) ->
+ Next = ets:next(Tab, Current),
+ ets:delete(Tab,Key),
+ dump(Tab, Next);
+ [{Key, Ct, {log, Lvl, Ts, [LvlStr, Loc, Msg] }}] ->
+ safe_notify({log, Lvl, Ts, [LvlStr, Loc, [Msg, io_lib:format(" (~b times)", [Ct])]]}),
+ Next = ets:next(Tab, Current),
+ ets:delete(Tab,Key),
+ dump(Tab, Next);
+ [{Key, Ct, {log, Dest, Lvl, Ts, [LvlStr, Loc, Msg]}}] ->
safe_notify({log, Dest, Lvl, Ts, [LvlStr, Loc, [Msg, io_lib:format(" (~b times)", [Ct])]]}),
- D
- end,
- dict:new(),
- Dict).
+ Next = ets:next(Tab, Current),
+ ets:delete(Tab,Key),
+ dump(Tab, Next)
+ end.
+
safe_notify(Event) ->
case whereis(lager_event) of
View
@@ -0,0 +1,67 @@
+%%% A modules that tries to crash stuff *a lot* to test how long it may
+%%% take to overload a given node with log messages, and then compare
+%%% that value with what log deduplication may do.
+-module(lager_overload).
+-compile([{parse_transform, lager_transform}]).
+-compile(export_all).
+
+init_regular() ->
+ error_logger:tty(false),
+ application:load(lager),
+ application:set_env(lager, handlers, [{lager_console_backend, info}]),
+ application:set_env(lager, error_logger_redirect, false),
+ application:start(crypto),
+ ok=application:start(simhash),
+ application:start(compiler),
+ application:start(syntax_tools),
+ application:set_env(lager, duplicate_treshold, 0),
+ application:set_env(lager, duplicate_dump, infinity),
+ ok=application:start(lager).
+
+
+init_dedup() ->
+ error_logger:tty(false),
+ application:load(lager),
+ application:set_env(lager, handlers, [{lager_console_backend, info}]),
+ application:set_env(lager, error_logger_redirect, false),
+ application:start(crypto),
+ application:load(simhash),
+ application:start(simhash),
+ application:start(compiler),
+ application:start(syntax_tools),
+ application:set_env(lager, duplicate_treshold, 4),
+ application:set_env(lager, duplicate_dump, 1000),
+ ok=application:start(lager).
+
+regular(N) ->
+ init_regular(),
+ spawn(fun() -> spawn_errs(N) end).
+
+dedup(N) ->
+ init_dedup(),
+ spawn(fun() -> spawn_errs(N) end).
+
+spawn_errs(N) ->
+ [spawn(fun() -> err(X) end) || X <- lists:seq(1,N)],
+ spawn_errs(N).
+
+err(_) ->
+ %% 8: 1a 1b 2b 2c 3b
+ %% 6: 1b 1c 1d 2b 2c 3c 3b
+ Str = element(erlang:phash2(self(), 3)+1,
+ {"1 my error has the following stacktrace: ~p"
+ ,"2 module X failed with reason ~p"
+ ,"3 Some other different error message (~p) told me things failed."
+ }),
+ Stack = element(erlang:phash2(self(),4)+1,
+ {{a,'EXIT',{badarith,[{erlang,'/',[1,0],[]},
+ {erl_eval,do_apply,6,[{file,"erl_eval.erl"},{line,576}]},
+ {erl_eval,expr,5,[{file,"erl_eval.erl"},{line,360}]},
+ {shell,exprs,7,[{file,"shell.erl"},{line,668}]},
+ {shell,eval_exprs,7,[{file,"shell.erl"},{line,623}]},
+ {shell,eval_loop,3,[{file,"shell.erl"},{line,608}]}]}}
+ ,{b,badarg,[{erlang,atom_to_list,["y"],[]}]}
+ ,{c,{badarity,{"#Fun<erl_eval.20.82930912>",[1]}},[{lists,map,2,[{file,"lists.erl"},{line,1173}]}]}
+ ,{d,{shell_undef,apply,1,[]},[{shell,shell_undef,2,[{file,"shell.erl"},{line,1092}]},{erl_eval,local_func,5,[{file,"erl_eval.erl"},{line,475}]}]}
+ }),
+ lager:error(Str, [Stack]).

0 comments on commit 1240b68

Please sign in to comment.