Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logging and pretty printing of log file to background manager. #501

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 144 additions & 5 deletions src/riak_core_bg_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@
all_locks/0,
all_locks/1,
all_tokens/0,
all_tokens/1
all_tokens/1,
print_log/0,
iter_log/1
]).

%% Convenience
Expand All @@ -131,6 +133,9 @@

-define(NOT_TRANSFERED(S), S#state.info_table == undefined orelse S#state.entry_table == undefined).

-define(MAX_LOG_BYTES, 100 * 1024). %% maximum size of a single logfile
-define(MAX_LOG_FILES, 5). %% maximum number of logfiles in the wrap rotation

%%%===================================================================
%%% API
%%%===================================================================
Expand Down Expand Up @@ -404,7 +409,9 @@ query_resource(Resource, Types) ->
{info_table:: ets:tid(), %% TableID of ?BG_INFO_ETS_TABLE
entry_table:: ets:tid(), %% TableID of ?BG_ENTRY_ETS_TABLE
enabled :: boolean(), %% Global enable/disable switch, true at startup
bypassed:: boolean() %% Global kill switch. false at startup
bypassed:: boolean(), %% Global kill switch. false at startup
loglevel::atom(), %% Logging level = on | off | debug
log::disk_log:log()
}).

%%%===================================================================
Expand All @@ -423,10 +430,14 @@ init([]) ->
%% We have two to claim...
ok = riak_core_table_manager:claim_table(?BG_INFO_ETS_TABLE),
ok = riak_core_table_manager:claim_table(?BG_ENTRY_ETS_TABLE),
{ok, Log} = open_disk_log(read_write),
State = #state{info_table=undefined, %% resolved in the ETS-TRANSFER handler
entry_table=undefined, %% resolved in the ETS-TRANSFER handler
enabled=true,
bypassed=false},
bypassed=false,
loglevel=app_helper:get_env(riak_core, background_manager_loglevel, on),
log=Log
},
{ok, State}.

%% @private
Expand Down Expand Up @@ -458,7 +469,7 @@ handle_call(disable, _From, State) ->
State2 = update_enabled(false, State),
{reply, status_of(true, State2), State2};
handle_call({get_lock, Lock, Pid, Meta}, _From, State) ->
do_handle_call_exception(fun do_get_resource/5, [Lock, lock, Pid, Meta, State], State);
do_handle_call_exception(fun logged_do_get_resource/5, [Lock, lock, Pid, Meta, State], State);
handle_call({lock_count, Lock}, _From, State) ->
{reply, held_count(Lock, State), State};
handle_call({lock_limit_reached, Lock}, _From, State) ->
Expand All @@ -480,7 +491,7 @@ handle_call({token_info, Token}, _From, State) ->
handle_call({set_token_rate, Token, Rate}, _From, State) ->
do_handle_call_exception(fun do_set_token_rate/3, [Token, Rate, State], State);
handle_call({get_token, Token, Pid, Meta}, _From, State) ->
do_handle_call_exception(fun do_get_resource/5, [Token, token, Pid, Meta, State], State).
do_handle_call_exception(fun logged_do_get_resource/5, [Token, token, Pid, Meta, State], State).

%% @private
%% @doc Handling cast messages
Expand Down Expand Up @@ -583,13 +594,15 @@ validate_hold(_Obj, _TableId) -> %% tokens don't monitor processes
update_bypassed(_Bypassed, State) when ?NOT_TRANSFERED(State) ->
State;
update_bypassed(Bypassed, State=#state{info_table=TableId}) ->
maybe_log(bypass, Bypassed, ok, State),
ets:insert(TableId, {bypassed, Bypassed}),
State#state{bypassed=Bypassed}.

%% @doc Update state with enabled status and store to ETS
update_enabled(_Enabled, State) when ?NOT_TRANSFERED(State) ->
State;
update_enabled(Enabled, State=#state{info_table=TableId}) ->
maybe_log(enable, Enabled, ok, State),
ets:insert(TableId, {enabled, Enabled}),
State#state{enabled=Enabled}.

Expand Down Expand Up @@ -749,6 +762,7 @@ do_enabled(Resource, State) ->

do_enable_resource(Resource, Enabled, State) ->
Info = resource_info(Resource, State),
maybe_log(enable, {Resource, Enabled}, ok, State),
State2 = update_resource_enabled(Resource, Enabled, Info, State),
{reply, status_of(Enabled, State2), State2}.

Expand All @@ -759,6 +773,7 @@ update_resource_enabled(Resource, Value, Default, State) ->
State).

update_limit(Resource, Limit, Default, State) ->
maybe_log(set_limit, {Resource, Limit}, ok, State),
update_resource_info(Resource,
fun(Info) -> Info#resource_info{limit=Limit} end,
Default#resource_info{limit=Limit},
Expand Down Expand Up @@ -851,6 +866,20 @@ try_get_resource(true, Resource, Type, Pid, Meta, State) ->
{{ok,Ref}, give_resource(Resource, Type, Pid, Ref, Meta, State)}
end.

%% @private
%% @doc
%% Wrap the call to do_get_resource/5 so we can log it's operation
%% and status, depending on the logging level in effect.
logged_do_get_resource(Resource, Type, Pid, Meta, State) ->
{_R, Result, _S} = Reply = do_get_resource(Resource, Type, Pid, Meta, State),
Status = case Result of
ok -> ok;
{ok, _Ref} -> ok;
Other -> Other
end,
maybe_log(get_resource, {Resource, Type, Meta}, Status, State),
Reply.

%% @private
%% @doc reply now if resource is available. Returns max_concurrency
%% if resource not available or globally or specifically disabled.
Expand Down Expand Up @@ -943,3 +972,113 @@ use_bg_mgr() ->
-spec use_bg_mgr(atom(), atom()) -> boolean().
use_bg_mgr(Dependency, Key) ->
use_bg_mgr() andalso app_helper:get_env(Dependency, Key, true).

%%%%%%%%%%
%% Logging
%%%%%%%%%%

%% %% @private
%% %% @doc Create a logfile in the platform-dir/log/
open_disk_log(RWorRO) ->
DataDir = app_helper:get_env(riak_core, platform_data_dir, "/tmp"),
Log = now(),
LogFile = lists:flatten(io_lib:format("~s/~s/events.log",
[DataDir, ?MODULE])),
ok = filelib:ensure_dir(LogFile),
case open_disk_log(Log, LogFile, RWorRO) of
{ok, Log} ->
{ok, Log};
{repaired, _L, Recovered, Bad} ->
lager:info("Repaired damaged logfile, ~p, ~p", [Recovered, Bad]),
{ok, Log};
Error ->
lager:error("Failed to open logfile: '~p' error: ~p", [LogFile, Error]),
{ok, undefined}
end.

open_disk_log(Name, Path, RWorRO) ->
Size = app_helper:get_env(riak_core, background_manager_log_size, {?MAX_LOG_BYTES, ?MAX_LOG_FILES}),
open_disk_log(Name, Path, RWorRO, [{type, wrap}, {format, internal}, {size, Size}]).

open_disk_log(Name, Path, RWorRO, OtherOpts) ->
disk_log:open([{name, Name}, {file, Path}, {mode, RWorRO}|OtherOpts]).

%% @private
%% @doc Write binary terms to disk_log file.
%% Terms are always written asychrnonously. If some get lost due to crash,
%% that's not bad, but we never want to block the operational path.
log(_Op, _Args, _Status, #state{log=undefined}) ->
ok;
log(Op, Args, Status, #state{log=WriteLog}) ->
Timestamp = erlang:now(),
disk_log:alog_terms(WriteLog, [{Timestamp, Op, Args, Status}]).

%% @private
%% @doc Log an operation if log level is "debug", none if we're "off",
%% and only specific operations if we're "on".
maybe_log(Op, Args, Status, State=#state{loglevel=debug}) ->
log(Op, Args, Status, State);
maybe_log(get_resource=Op, Args, Status, State=#state{loglevel=on}) ->
log(Op, Args, Status, State);
maybe_log(bypass=Op, Args, Status, State=#state{loglevel=on}) ->
log(Op, Args, Status, State);
maybe_log(enable=Op, Args, Status, State=#state{loglevel=on}) ->
log(Op, Args, Status, State);
maybe_log(_Op, _Args, _Status, _State) ->
ok.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% Called from outside of riak
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

%% @private
%% @doc Format the timestamp into a human readable date string. Timezone depends
%% on the value of the app env var 'sasl/utc_log' which is true or false. If true,
%% then universal time is returned, otherwise local time prevails.
%% @link https://github.com/basho/lager/blob/master/src/lager_stdlib.erl#L80
format_utc_timestamp(TS) ->
DateTimeMsecs = lager_util:localtime_ms(TS),
{D,T} = lager_util:format_time(lager_util:maybe_utc(DateTimeMsecs)),
lists:flatten([D,$ ,T]).

%% @private
%% @doc
%% Print a single term from the background manager's log file.
print_entry({TS, get_resource, {Resource, Type, Meta}, Status}) ->
io:format("~s get_~p ~p ~p ~p~n", [format_utc_timestamp(TS), Type, Resource, Meta, Status]);
print_entry({TS, Op, {Resource, Value}, Status}) ->
io:format("~s ~p ~p ~p ~p~n", [format_utc_timestamp(TS), Op, Resource, Value, Status]);
print_entry({TS, Op, Args, Status}) ->
io:format("~s ~p ~p ~p~n", [format_utc_timestamp(TS), Op, Args, Status]);
print_entry(Term) ->
io:format("Unexpected: ~p~n", [Term]).

%% @doc
%% Pretty print background manager's logfile.
print_log() ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the intended use of this, and will it io:format all 5mb worth of logs, or just the latest one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As of now, it's the only way to dump the logs in a human readable way. I'd like to add more in the future, but I don't think we can jam it into 2.0 at this point. Yes, it will dump all of the logs. I could easily add an entry point that will only print N entries, but I'm trying to keep this dirt simple. Since the iterator walks over chunks, it won't io:format anything bigger than a single entry from the list at a time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's not so easy to add the print(N) thing. It would be easy to print the first N, but not the last N without doing a massive list creation and reversal in memory. Boo. While it's possible to index to the end of the log minus some amount, that seems complicated for a PR that's last minute. We can always print_log() | tail -N. It's only 5 MB.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still unsure about the wisdom of pushing 5mb through erlang's IO subsystem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair. I'm running a test now to generate the full 5 MB of logs and then I'll pretty print it. Since it's an iterator, it should do fine, but we'll see.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, it was hard generating that much data, so I turned down the file size to 100K and it prints the log with no effort at all. That gives 500K worth of log data.

So, hopefully 1a759c6 addresses your concerns.

iter_log(fun print_entry/1).

%% @doc
%% Iterate function Fun over the terms in the background manager's logfile.
iter_log(Fun) ->
{ok, ReadLog} = open_disk_log(read_only),
iter_disk_log(Fun, ReadLog).

%% @private
%% @doc
%% Iterate a function over the background manger's logfile terms.
iter_disk_log(Fun, DiskLog) ->
iter_disk_log(disk_log:chunk(DiskLog, start), Fun, DiskLog).

iter_disk_log(eof, _Fun, _DiskLog) ->
ok;
iter_disk_log({Cont, Terms}, Fun, DiskLog) ->
try
[Fun(Term) || Term <- Terms],
ok
catch X:Y ->
io:format("~s:fold_disk_log: caught ~p ~p @ ~p\n",
[?MODULE, X, Y, erlang:get_stacktrace()]),
ok
end,
iter_disk_log(disk_log:chunk(DiskLog, Cont), Fun, DiskLog).
6 changes: 6 additions & 0 deletions test/bg_manager_eqc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,12 @@ wait_for_pid(Pid) ->
Mref = erlang:monitor(process, Pid),
receive
{'DOWN', Mref, process, _, _} ->
%% listing all resources is no longer in the gen_server, so
%% we need to let the DOWN handler have a chance to complete
%% and delete the entry from the table. It's now a race to
%% see who notices the dead pid. Maybe bg-mgr should verify
%% that a process is still alive when doing the query.
timer:sleep(10),
ok
after
5000 ->
Expand Down