Skip to content

Commit

Permalink
Add a seperate process to do file operations in
Browse files Browse the repository at this point in the history
This helps bound the mailbox when we need to do a selective receive from
the file driver.
  • Loading branch information
Vagabond committed Mar 23, 2013
1 parent 9719370 commit 888be1d
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 84 deletions.
3 changes: 0 additions & 3 deletions TODO

This file was deleted.

119 changes: 38 additions & 81 deletions src/lager_file_backend.erl
Expand Up @@ -54,20 +54,12 @@

-record(state, {
name :: string(),
file_writer :: pid(),
level :: {'mask', integer()},
fd :: file:io_device(),
inode :: integer(),
flap=false :: boolean(),
size = 0 :: integer(),
date,
count = 10,
formatter,
formatter_config,
sync_on,
check_interval = ?DEFAULT_CHECK_INTERVAL,
sync_interval = ?DEFAULT_SYNC_INTERVAL,
sync_size = ?DEFAULT_SYNC_SIZE,
last_check = os:timestamp()
sync_on
}).

%% @private
Expand Down Expand Up @@ -96,13 +88,11 @@ init(LogFileConfig) when is_list(LogFileConfig) ->
%% probabably a better way to do this, but whatever
[Name, Level, Date, Size, Count, SyncInterval, SyncSize, SyncOn, CheckInterval, Formatter, FormatterConfig] =
[proplists:get_value(Key, Config) || Key <- [file, level, date, size, count, sync_interval, sync_size, sync_on, check_interval, formatter, formatter_config]],
schedule_rotation(Name, Date),
State0 = #state{name=Name, level=Level, size=Size, date=Date, count=Count, formatter=Formatter,
formatter_config=FormatterConfig, sync_on=SyncOn, sync_interval=SyncInterval, sync_size=SyncSize,
check_interval=CheckInterval},
State = case lager_util:open_logfile(Name, {SyncSize, SyncInterval}) of
{ok, {FD, Inode, _}} ->
State0#state{fd=FD, inode=Inode};
State0 = #state{name=Name, level=Level, formatter=Formatter,
formatter_config=FormatterConfig, sync_on=SyncOn},
State = case lager_file_writer:start_link(Name, SyncSize, SyncInterval, CheckInterval, Size, Date, Count) of
{ok, Pid} ->
State0#state{file_writer=Pid};
{error, Reason} ->
?INT_LOG(error, "Failed to open log file ~s with error ~s", [Name, file:format_error(Reason)]),
State0#state{flap=true}
Expand All @@ -129,26 +119,48 @@ handle_event({log, Message},
#state{name=Name, level=L,formatter=Formatter,formatter_config=FormatConfig} = State) ->
case lager_util:is_loggable(Message,L,{lager_file_backend, Name}) of
true ->
{ok,write(State, lager_msg:timestamp(Message), lager_msg:severity_as_int(Message), Formatter:format(Message,FormatConfig)) };
{mask, SyncLevel} = State#state.sync_on,
WriteRes = case (lager_msg:severity_as_int(Message) band SyncLevel) /= 0 of
true ->
lager_file_writer:sync_write(State#state.file_writer,
Formatter:format(Message, FormatConfig),
lager_msg:timestamp(Message));
false ->
lager_file_writer:write(State#state.file_writer,
Formatter:format(Message, FormatConfig),
lager_msg:timestamp(Message))
end,
Flap = State#state.flap,
NewFlap = case WriteRes of
ok ->
%% clear flap
false;
{open_error, Reason} when not Flap ->
?INT_LOG(error, "Failed to reopen log file ~s with error ~s", [Name, file:format_error(Reason)]),
true;
{write_error, Reason} when not Flap ->
?INT_LOG(error, "Failed to write log message to file ~s: ~s",
[Name, file:format_error(Reason)]),
true;
_ ->
%% leave it alone
Flap
end,

{ok, State#state{flap=NewFlap}};
false ->
{ok, State}
end;
handle_event(_Event, State) ->
{ok, State}.

%% @private
handle_info({rotate, File}, #state{name=File,count=Count,date=Date} = State) ->
lager_util:rotate_logfile(File, Count),
schedule_rotation(File, Date),
{ok, State};
handle_info(_Info, State) ->
{ok, State}.

%% @private
terminate(_Reason, #state{fd=FD}) ->
%% flush and close any file handles
_ = file:datasync(FD),
_ = file:close(FD),
terminate(_Reason, #state{file_writer=FW}) ->
lager_file_writer:close(FW),
ok.

%% @private
Expand All @@ -172,56 +184,6 @@ config_to_id(Config) ->
{?MODULE, File}
end.


write(#state{name=Name, fd=FD, inode=Inode, flap=Flap, size=RotSize,
count=Count} = State, Timestamp, Level, Msg) ->
LastCheck = timer:now_diff(os:timestamp(), Timestamp) div 1000,
case LastCheck >= State#state.check_interval orelse FD == undefined of
true ->
%% need to check for rotation
case lager_util:ensure_logfile(Name, FD, Inode, {State#state.sync_size, State#state.sync_interval}) of
{ok, {_, _, Size}} when RotSize /= 0, Size > RotSize ->
lager_util:rotate_logfile(Name, Count),
%% go around the loop again, we'll do another rotation check and hit the next clause here
write(State, Timestamp, Level, Msg);
{ok, {NewFD, NewInode, _}} ->
%% update our last check and try again
do_write(State#state{last_check=Timestamp, fd=NewFD, inode=NewInode}, Level, Msg);
{error, Reason} ->
case Flap of
true ->
State;
_ ->
?INT_LOG(error, "Failed to reopen log file ~s with error ~s", [Name, file:format_error(Reason)]),
State#state{flap=true}
end
end;
false ->
do_write(State, Level, Msg)
end.

do_write(#state{fd=FD, name=Name, flap=Flap} = State, Level, Msg) ->
%% delayed_write doesn't report errors
_ = file:write(FD, Msg),
{mask, SyncLevel} = State#state.sync_on,
case (Level band SyncLevel) /= 0 of
true ->
%% force a sync on any message that matches the 'sync_on' bitmask
Flap2 = case file:datasync(FD) of
{error, Reason2} when Flap == false ->
?INT_LOG(error, "Failed to write log message to file ~s: ~s",
[Name, file:format_error(Reason2)]),
true;
ok ->
false;
_ ->
Flap
end,
State#state{flap=Flap2};
_ ->
State
end.

validate_loglevel(Level) ->
try lager_util:config_to_mask(Level) of
Levels ->
Expand Down Expand Up @@ -339,11 +301,6 @@ validate_logfile_proplist([{formatter_config, FmtCfg}|Tail], Acc) ->
validate_logfile_proplist([Other|_Tail], _Acc) ->
throw({bad_config, "Invalid option", Other}).

schedule_rotation(_, undefined) ->
ok;
schedule_rotation(Name, Date) ->
erlang:send_after(lager_util:calculate_next_rotation(Date) * 1000, self(), {rotate, Name}),
ok.

-ifdef(TEST).

Expand Down
136 changes: 136 additions & 0 deletions src/lager_file_writer.erl
@@ -0,0 +1,136 @@
-module(lager_file_writer).
-behaviour(gen_server).

-record(state, {
file :: string(),
fd :: file:io_device(),
inode :: integer(),
size = 0 :: non_neg_integer(),
date,
count,
check_interval,
sync_interval,
sync_size,
last_check = os:timestamp()
}).

-export([start_link/7, write/2, write/3, sync_write/2, sync_write/3, close/1]).

%% gen_server callbacks
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).

%% public API
start_link(FileName, SyncSize, SyncInterval, CheckInterval, RotationSize, RotationDate, RotationCount) ->
gen_server:start_link(?MODULE, [
FileName, SyncSize,
SyncInterval, CheckInterval,
RotationSize, RotationDate, RotationCount], []).

write(Pid, Message) ->
write(Pid, Message, os:timestamp()).

write(Pid, Message, Now) ->
gen_server:call(Pid, {write, Message, Now}).

sync_write(Pid, Message) ->
sync_write(Pid, Message, os:timestamp()).

sync_write(Pid, Message, Now) ->
gen_server:call(Pid, {sync_write, Message, Now}).

close(Pid) ->
gen_server:call(Pid, stop).

%% gen_server API
init([FileName, SyncSize, SyncInterval, CheckInterval,
RotationSize, RotationDate, RotationCount]) ->
case lager_util:open_logfile(FileName, {SyncSize, SyncInterval}) of
{ok, {FD, Inode, _}} ->
schedule_rotation(RotationDate),
{ok, #state{fd=FD, inode=Inode, file=FileName, size=RotationSize,
date=RotationDate, count=RotationCount, check_interval=CheckInterval,
sync_interval=SyncInterval, sync_size=SyncSize}};
{error, Reason} ->
{stop, Reason}
end.

handle_call({write, Msg, Now}, _From, State) ->
write(Msg, Now, false, State);
handle_call({sync_write, Msg, Now}, _From, State) ->
write(Msg, Now, true, State).

handle_cast(_Msg, State) ->
{noreply, State}.

handle_info(rotate, #state{file=File,count=Count,date=Date} = State) ->
lager_util:rotate_logfile(File, Count),
schedule_rotation(Date),
{noreply, State};
handle_info(_Info, State) ->
{noreply, State}.

code_change(_OldVsn, State, _Extra) ->
{ok, State}.

terminate(_Reason, #state{fd=FD}) ->
%% flush and close any file handles
_ = file:datasync(FD),
_ = file:close(FD),
ok.

%% internal functions
schedule_rotation(undefined) ->
ok;
schedule_rotation(Date) ->
erlang:send_after(lager_util:calculate_next_rotation(Date) * 1000, self(), rotate),
ok.

write(Msg, Timestamp, Sync, #state{file=Name, fd=FD, inode=Inode, size=RotSize,
count=Count} = State) ->
LastCheck = timer:now_diff(os:timestamp(), Timestamp) div 1000,
case LastCheck >= State#state.check_interval orelse FD == undefined of
true ->
%% need to check for rotation
case lager_util:ensure_logfile(Name, FD, Inode, {State#state.sync_size, State#state.sync_interval}) of
{ok, {_, _, Size}} when RotSize /= 0, Size > RotSize ->
lager_util:rotate_logfile(Name, Count),
%% go around the loop again, we'll do another rotation check and hit the next clause here
write(Msg, Sync, State);
{ok, {NewFD, NewInode, _}} ->
%% update our last check and try again
do_write(Msg, Sync, State#state{last_check=Timestamp, fd=NewFD, inode=NewInode});
{error, Reason} ->
{reply, {open_error, Reason}, State}
end;
false ->
do_write(Msg, Sync, State)
end.

do_write(Msg, Sync, #state{fd=FD} = State) ->
%% delayed_write doesn't report errors
case file:write(FD, Msg) of
ok ->
case Sync of
true ->
case file:datasync(FD) of
ok ->
{reply, ok, State};
{error, Reason} ->
{reply, {write_error, Reason}, State}
end;
_ ->
{reply, ok, State}
end;
{error, Reason} ->
{reply, {write_error, Reason}, State}
end.



0 comments on commit 888be1d

Please sign in to comment.