Permalink
Browse files

Merge pull request #117 from basho/adt-sync-switch

Switch between gen_event notify/sync_notify based on message queue length
  • Loading branch information...
2 parents d0ca807 + acb8cf5 commit cf408671dbc0d3ddcdcda176cf6ad04f03228aab @Vagabond Vagabond committed Mar 14, 2013
Showing with 148 additions and 2 deletions.
  1. +8 −2 src/lager.erl
  2. +12 −0 src/lager_app.erl
  3. +77 −0 src/lager_backend_throttle.erl
  4. +51 −0 test/lager_test_backend.erl
View
@@ -78,8 +78,14 @@ dispatch_log(Severity, Metadata, Format, Args, Size) when is_atom(Severity)->
_ ->
Format
end,
- gen_event:sync_notify(Pid, {log, lager_msg:new(Msg, Timestamp,
- Severity, Metadata, Destinations)});
+ LagerMsg = lager_msg:new(Msg, Timestamp,
+ Severity, Metadata, Destinations),
+ case lager_config:get(async, false) of
+ true ->
+ gen_event:notify(Pid, {log, LagerMsg});
+ false ->
+ gen_event:sync_notify(Pid, {log, LagerMsg})
+ end;
false ->
ok
end;
View
@@ -34,6 +34,18 @@ start() ->
start(_StartType, _StartArgs) ->
{ok, Pid} = lager_sup:start_link(),
+
+
+ case application:get_env(lager, async_threshold) of
+ undefined ->
+ ok;
+ {ok, Threshold} when is_integer(Threshold), Threshold >= 0 ->
+ _ = supervisor:start_child(lager_handler_watcher_sup, [lager_event, lager_backend_throttle, Threshold]);
+ {ok, BadThreshold} ->
+ error_logger:error_msg("Invalid value for 'async_threshold': ~p~n", [BadThreshold]),
+ throw({error, bad_config})
+ end,
+
Handlers = case application:get_env(lager, handlers) of
undefined ->
[{lager_console_backend, info},
@@ -0,0 +1,77 @@
+%% Copyright (c) 2011-2013 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+%% @doc A simple gen_event backend used to monitor mailbox size and
+%% switch log messages between synchronous and asynchronous modes.
+%% A gen_event handler is used because a process getting its own mailbox
+%% size doesn't involve getting a lock, and gen_event handlers run in their
+%% parent's process.
+
+-module(lager_backend_throttle).
+
+-include("lager.hrl").
+
+-behaviour(gen_event).
+
+-export([init/1, handle_call/2, handle_event/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-record(state, {
+ hwm,
+ async = true
+ }).
+
+init(Hwm) ->
+ lager_config:set(async, true),
+ {ok, #state{hwm=Hwm}}.
+
+
+handle_call(get_loglevel, State) ->
+ {ok, {mask, ?LOG_NONE}, State};
+handle_call({set_loglevel, _Level}, State) ->
+ {ok, ok, State};
+handle_call(_Request, State) ->
+ {ok, ok, State}.
+
+handle_event({log, _Message},State) ->
+ {message_queue_len, Len} = erlang:process_info(self(), message_queue_len),
+ case {Len > State#state.hwm, State#state.async} of
+ {true, true} ->
+ %% need to flip to sync mode
+ lager_config:set(async, false),
+ {ok, State#state{async=false}};
+ {false, false} ->
+ %% need to flip to async mode
+ lager_config:set(async, true),
+ {ok, State#state{async=true}};
+ _ ->
+ %% nothing needs to change
+ {ok, State}
+ end;
+handle_event(_Event, State) ->
+ {ok, State}.
+
+handle_info(_Info, State) ->
+ {ok, State}.
+
+%% @private
+terminate(_Reason, _State) ->
+ ok.
+
+%% @private
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
@@ -1024,6 +1024,57 @@ safe_format_test() ->
?assertEqual("FORMAT ERROR: \"~p ~p ~p\" [foo,bar]", lists:flatten(lager:safe_format("~p ~p ~p", [foo, bar], 1024))),
ok.
+async_threshold_test_() ->
+ {foreach,
+ fun() ->
+ error_logger:tty(false),
+ application:load(lager),
+ application:set_env(lager, error_logger_redirect, false),
+ application:set_env(lager, async_threshold, 10),
+ application:set_env(lager, handlers, [{?MODULE, info}]),
+ application:start(lager)
+ end,
+ fun(_) ->
+ application:unset_env(lager, async_threshold),
+ application:stop(lager),
+ error_logger:tty(true)
+ end,
+ [
+ {"async threshold works",
+ fun() ->
+ %% we start out async
+ ?assertEqual(true, lager_config:get(async)),
+
+ %% put a ton of things in the queue
+ Workers = [spawn_monitor(fun() -> [lager:info("hello world") || _ <- lists:seq(1, 1000)] end) || _ <- lists:seq(1, 10)],
+
+ %% serialize on mailbox
+ _ = gen_event:which_handlers(lager_event),
+ %% there should be a ton of outstanding messages now, so async is false
+ ?assertEqual(false, lager_config:get(async)),
+ %% wait for all the workers to return, meaning that all the messages have been logged (since we're in sync mode)
+ collect_workers(Workers),
+ %% serialize ont the mailbox again
+ _ = gen_event:which_handlers(lager_event),
+ %% just in case...
+ timer:sleep(100),
+
+ %% async is true again now that the mailbox has drained
+ ?assertEqual(true, lager_config:get(async)),
+ ok
+ end
+ }
+ ]
+ }.
+
+collect_workers([]) ->
+ ok;
+collect_workers(Workers) ->
+ receive
+ {'DOWN', Ref, _, _, _} ->
+ collect_workers(lists:keydelete(Ref, 2, Workers))
+ end.
+
-endif.

0 comments on commit cf40867

Please sign in to comment.