Permalink
Browse files

Merge pull request #54 from basho/jfw-br-grace_timeout

Adds "grace period" to stop just-written files from expiring.
  • Loading branch information...
chardan committed Sep 13, 2012
2 parents 0f59e13 + 8f11340 commit aab96acfe92a653b1cbc1b77d180e1a9dca89cbc
Showing with 38 additions and 13 deletions.
  1. +4 −1 rebar.config
  2. +29 −9 src/bitcask.erl
  3. +5 −3 test/bitcask_qc_expiry.erl
View
@@ -30,4 +30,7 @@
% {d, 'PULSE'},
% {pulse_side_effect, [{bitcask, get_filestate, '_'},
% {bitcask_nifs, keydir_get, '_'}]}
- ]}.
+
+ ]}.
+
+{eunit_opts, [verbose]}.
View
@@ -73,6 +73,7 @@
hint_keydir,
del_keydir,
expiry_time,
+ expiry_grace_time,
opts }).
%% A bitcask is a directory containing:
@@ -503,6 +504,7 @@ merge1(Dirname, Opts, FilesToMerge) ->
live_keydir = LiveKeyDir,
del_keydir = DelKeyDir,
expiry_time = expiry_time(Opts),
+ expiry_grace_time = expiry_grace_time(Opts),
opts = Opts },
%% Finally, start the merge process
@@ -523,6 +525,16 @@ merge1(Dirname, Opts, FilesToMerge) ->
end || F <- State#mstate.input_files],
ok = bitcask_lockops:release(Lock).
+%% @doc Predicate which determines whether or not a file should be considered for a merge.
+consider_for_merge(FragTrigger, DeadBytesTrigger, ExpirationGraceTime) ->
+ fun (F) ->
+ (F#file_status.fragmented >= FragTrigger)
+ orelse (F#file_status.dead_bytes >= DeadBytesTrigger)
+ orelse ( (F#file_status.oldest_tstamp > 0) %% means that the file has data
+ andalso (F#file_status.oldest_tstamp < ExpirationGraceTime)
+ )
+ end.
+
-spec needs_merge(reference()) -> {true, [string()]} | false.
needs_merge(Ref) ->
State = get_state(Ref),
@@ -547,18 +559,18 @@ needs_merge(Ref) ->
%%
%% frag_merge_trigger - Any file exceeds this % fragmentation
%% dead_bytes_merge_trigger - Any file has more than this # of dead bytes
- %% expiry_secs - Any file has an expired key
+ %% expiry_time - Any file has an expired key
+ %% expiry_grace_time - avoid expiring in the case of continuous writes
%%
FragTrigger = get_opt(frag_merge_trigger, State#bc_state.opts),
DeadBytesTrigger = get_opt(dead_bytes_merge_trigger, State#bc_state.opts),
- ExpirationTime = expiry_time(State#bc_state.opts),
-
- NeedsMerge = lists:any(fun(F) ->
- (F#file_status.fragmented >= FragTrigger)
- or (F#file_status.dead_bytes >= DeadBytesTrigger)
- or (F#file_status.oldest_tstamp < ExpirationTime)
- end, Summary),
+ ExpirationTime =
+ max(expiry_time(State#bc_state.opts), 0),
+ ExpirationGraceTime =
+ max(expiry_time(State#bc_state.opts) - expiry_grace_time(State#bc_state.opts), 0),
+ NeedsMerge = lists:any(consider_for_merge(FragTrigger, DeadBytesTrigger, ExpirationGraceTime),
+ Summary),
case NeedsMerge of
true ->
%% Build a list of threshold checks; a file which meets ANY
@@ -711,6 +723,15 @@ expiry_time(Opts) ->
false -> 0
end.
+to_lower_grace_time_bound(undefined) -> 0;
+to_lower_grace_time_bound(X) ->
+ case X > 0 of
+ true -> X;
+ false -> 0
+ end.
+
+expiry_grace_time(Opts) -> to_lower_grace_time_bound(get_opt(expiry_grace_time, Opts)).
+
start_app() ->
case application:start(?MODULE) of
ok ->
@@ -1641,4 +1662,3 @@ truncate_file(Path, Offset) ->
file:close(FH).
-endif.
-
@@ -86,9 +86,9 @@ set_tstamp_step(Step) ->
prop_expiry() ->
?LET({Keys, Values}, {keys(), values()},
- ?FORALL({Ops, Expiry, Timestep, M1},
+ ?FORALL({Ops, Expiry, ExpiryGrace, Timestep, M1},
{eqc_gen:non_empty(list(ops(Keys, Values))),
- choose(1,10), choose(5, 50), choose(5,128)},
+ choose(1,10), choose(1, 10), choose(5, 50), choose(5,128)},
begin
Dirname = "/tmp/bc.prop.expiry",
?cmd("rm -rf " ++ Dirname),
@@ -106,6 +106,7 @@ prop_expiry() ->
{small_file_threshold, disabled},
{frag_threshold, disabled},
{expiry_secs, Expiry},
+ {expiry_grace_secs, ExpiryGrace},
{max_file_size, M1}]),
try
@@ -118,7 +119,7 @@ prop_expiry() ->
bitcask:close_write_file(Bref),
%% Identify items in the Model that should be expired
- ExpireCutoff = erlang:max(current_tstamp() + Timestep - Expiry, -1),
+ ExpireCutoff = erlang:max(current_tstamp() + Timestep - erlang:max(Expiry - ExpiryGrace, 0), -1),
{Expired, _Live} = lists:partition(fun({_K, {_Value, Tstamp}}) ->
Tstamp < ExpireCutoff
@@ -133,6 +134,7 @@ prop_expiry() ->
_ ->
?assertMatch({true, _}, bitcask:needs_merge(Bref))
end
+ catch X:Y -> io:format(user, "exception: ~p ~p @ ~p\n", [X,Y, erlang:get_stacktrace()])
after
bitcask:close(Bref)
end,

0 comments on commit aab96ac

Please sign in to comment.