Permalink
Browse files

Switch between gen_event notify/sync_notify based on message queue le…

…ngth

In normal operation, there's no need for log messages to be synchronous.
Its slower and introduces a global serialization point in the
application.

However, when in an overload condition, synchronous logging is good
because it limits each process to at most 1 log message in flight.

So, this change allows the gen_event at the core of lager to switch
modes depending on the size of the gen_event's mailbox. It should
provide better performance in the case of normal load, but it will also
prevent unbounded mailbox growth if an overload occurs.

The threshold at which the switch between async and sync is done is
configured via the 'async_threshold' app env var.
  • Loading branch information...
1 parent 7993c8f commit cf89804ceda359be6dd9ae9e2402e3744afdbd41 @Vagabond Vagabond committed Mar 8, 2013
Showing with 74 additions and 2 deletions.
  1. +8 −2 src/lager.erl
  2. +11 −0 src/lager_app.erl
  3. +55 −0 src/lager_backend_throttle.erl
View
@@ -78,8 +78,14 @@ dispatch_log(Severity, Metadata, Format, Args, Size) when is_atom(Severity)->
_ ->
Format
end,
- gen_event:sync_notify(Pid, {log, lager_msg:new(Msg, Timestamp,
- Severity, Metadata, Destinations)});
+ case lager_config:get(async, false) of
+ true ->
+ gen_event:notify(Pid, {log, lager_msg:new(Msg, Timestamp,
+ Severity, Metadata, Destinations)});
+ false ->
+ gen_event:sync_notify(Pid, {log, lager_msg:new(Msg, Timestamp,
+ Severity, Metadata, Destinations)})
+ end;
false ->
ok
end;
View
@@ -34,6 +34,17 @@ start() ->
start(_StartType, _StartArgs) ->
{ok, Pid} = lager_sup:start_link(),
+
+
+ case application:get_env(lager, async_threshold) of
+ undefined ->
+ ok;
+ {ok, Threshold} when is_integer(Threshold), Threshold > 0 ->
+ _ = supervisor:start_child(lager_handler_watcher_sup, [lager_event, lager_backend_throttle, [Threshold]]);
+ {ok, _BadVal} ->
+ ok
+ end,
+
Handlers = case application:get_env(lager, handlers) of
undefined ->
[{lager_console_backend, info},
@@ -0,0 +1,55 @@
+-module(lager_backend_throttle).
+
+-include("lager.hrl").
+
+-behaviour(gen_event).
+
+-export([init/1, handle_call/2, handle_event/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-record(state, {
+ hwm,
+ async = true
+ }).
+
+init([Hwm]) ->
+ lager_config:set(async, true),
+ {ok, #state{hwm=Hwm}}.
+
+
+handle_call(get_loglevel, State) ->
+ {ok, {mask, ?LOG_NONE}, State};
+handle_call({set_loglevel, _Level}, State) ->
+ {ok, ok, State};
+handle_call(_Request, State) ->
+ {ok, ok, State}.
+
+handle_event({log, _Message},State) ->
+ {message_queue_len, Len} = erlang:process_info(self(), message_queue_len),
+ case {Len > State#state.hwm, State#state.async} of
+ {true, true} ->
+ %% need to flip to sync mode
+ lager_config:set(async, false),
+ {ok, State#state{async=false}};
+ {false, false} ->
+ %% need to flip to async mode
+ lager_config:set(async, true),
+ {ok, State#state{async=true}};
+ _ ->
+ %% nothing needs to change
+ {ok, State}
+ end;
+handle_event(_Event, State) ->
+ {ok, State}.
+
+handle_info(_Info, State) ->
+ {ok, State}.
+
+%% @private
+terminate(_Reason, _State) ->
+ ok.
+
+%% @private
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+

0 comments on commit cf89804

Please sign in to comment.