Permalink
Browse files

some better checkpointing action

  • Loading branch information...
1 parent 498049f commit c7659a6963eadbd4530afe7328e17ef476a8e405 @etrepum committed Jun 7, 2012
Showing with 28 additions and 14 deletions.
  1. +28 −14 src/markov_server.erl
View
42 src/markov_server.erl
@@ -4,6 +4,7 @@
-export([input/1, output/1, output/0]).
-export([start_link/0, start_link/1, start/1]).
-export([checkpoint_async/2]).
+-export([write_count/0]).
-export([code_change/3, handle_call/3, handle_cast/2, handle_info/2,
init/1, terminate/2]).
@@ -27,6 +28,9 @@ output(L) ->
input(B) ->
ok = gen_server:cast(?MODULE, {input, B}).
+write_count() ->
+ gen_server:call(?MODULE, write_count).
+
start_link() ->
Args = [{K, V} || K <- [module, etf, opts, log_name, log_file,
checkpoint_writes],
@@ -59,14 +63,16 @@ init(Args) ->
L
end,
{ok,
- read_any(proplists:get_value(etf, Args),
- #state{writes=0,
- checkpoint_writes=CWrites,
- log=Log,
- log_file=LogFile,
- t=undefined,
- checkpoint_monitor=undefined,
- new_args=NewArgs}),
+ maybe_checkpoint_log(
+ read_any(
+ proplists:get_value(etf, Args),
+ #state{writes=0,
+ checkpoint_writes=CWrites,
+ log=Log,
+ log_file=LogFile,
+ t=undefined,
+ checkpoint_monitor=undefined,
+ new_args=NewArgs})),
hibernate}.
handle_cast({input, B}, S) ->
@@ -78,13 +84,18 @@ handle_info({'DOWN', MRef, _Type, _Obj, _Info},
State=#state{checkpoint_monitor=MRef,
writes=W,
log_file=LogFile}) ->
- error_logger:info_msg("Checkpoint of ~p finished", [LogFile]),
- {noreply, State#state{checkpoint_monitor=undefined, writes=1 + W}};
+ error_logger:info_msg("~s: Checkpoint of ~p finished",
+ [?MODULE, LogFile]),
+ {noreply,
+ maybe_checkpoint_log(
+ State#state{checkpoint_monitor=undefined, writes=1 + W})};
handle_info(_Req, State) ->
{noreply, State}.
handle_call({output, B}, _From, S=#state{t=T}) ->
{reply, markov:output(B, T), S};
+handle_call(write_count, _From, S=#state{writes=N, checkpoint_writes=W}) ->
+ {reply, {N, W}, S};
handle_call(_Req, _From, State) ->
{reply, ignored, State}.
@@ -121,13 +132,14 @@ read_any(ETF, S=#state{log=Log, log_file=LogFile}) ->
end,
fun (SAcc) -> read_etf(ETF, SAcc) end,
fun ({S0, Acc}) ->
- error_logger:info_msg("Starting a new log from scratch"),
+ error_logger:info_msg("~s: Starting a new log from scratch",
+ [?MODULE]),
{checkpoint_log(from_list([], S0)), Acc} end],
{S, []}).
read_log(Log, SAcc) ->
{_K, LogFile} = lists:keyfind(file, 1, disk_log:info(Log)),
- error_logger:info_msg("Reading log ~p", [LogFile]),
+ error_logger:info_msg("~s: Reading log ~p", [?MODULE, LogFile]),
read_chunk(Log, disk_log:chunk(Log, start), SAcc).
try_read([F | Rest], SAcc) ->
@@ -136,6 +148,7 @@ try_read([F | Rest], SAcc) ->
try_read(Rest, SAcc1);
{S0, Acc} ->
{S1, []} = lists:foldl(fun chunk_fold/2, {S0, []}, Acc),
+ error_logger:info_msg("~s: Initialized", [?MODULE]),
S1
end.
@@ -152,7 +165,7 @@ read_chunk(_Log, eof, SAcc) ->
read_etf(undefined, SAcc) ->
SAcc;
read_etf(FName, {S, Acc}) ->
- error_logger:info_msg("Reading ETF ~p", [FName]),
+ error_logger:info_msg("~s: Reading ETF ~p", [?MODULE, FName]),
{ok, B} = file:read_file(FName),
{checkpoint_log(from_list(binary_to_term(B), S)), Acc}.
@@ -178,7 +191,8 @@ maybe_checkpoint_log(S) ->
checkpoint_log(S=#state{log_file=LogFile, log=Log, t=T,
checkpoint_monitor=undefined}) ->
- error_logger:info_msg("Checkpoint of ~p starting", [LogFile]),
+ error_logger:info_msg("~s: Checkpoint of ~p starting",
+ [?MODULE, LogFile]),
disk_log:reopen(Log, LogFile ++ ".prev"),
MRef = monitor(process, spawn_link(?MODULE, checkpoint_async, [Log, T])),
S#state{writes=0, checkpoint_monitor=MRef}.

0 comments on commit c7659a6

Please sign in to comment.