Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge pull request #87 from basho/bs-merge-expiration-change

Change behavior of merge when merging for data expiration
  • Loading branch information...
commit 31126cc45714123511bfae56a28b22063b77a153 2 parents 2d8f1b9 + fd9a29c
@bsparrow435 bsparrow435 authored
Showing with 68 additions and 27 deletions.
  1. +65 −25 src/bitcask.erl
  2. +3 −2 src/bitcask_merge_worker.erl
View
90 src/bitcask.erl
@@ -140,9 +140,6 @@ open(Dirname, Opts) ->
{error, Reason}
end.
-
-
-
%% @doc Close a bitcask data store and flush any pending writes to disk.
-spec close(reference()) -> ok.
close(Ref) ->
@@ -452,34 +449,39 @@ iterator_release(Ref) ->
%% into a more compact form.
-spec merge(Dirname::string()) -> ok.
merge(Dirname) ->
- merge(Dirname, [], readable_files(Dirname)).
+ merge(Dirname, [], {readable_files(Dirname), []}).
%% @doc Merge several data files within a bitcask datastore
%% into a more compact form.
-spec merge(Dirname::string(), Opts::[_]) -> ok.
merge(Dirname, Opts) ->
- merge(Dirname, Opts, readable_files(Dirname)).
+ merge(Dirname, Opts, {readable_files(Dirname), []}).
%% @doc Merge several data files within a bitcask datastore
%% into a more compact form.
-spec merge(Dirname::string(), Opts::[_], FilesToMerge::[string()]) -> ok.
merge(_Dirname, _Opts, []) ->
ok;
-merge(Dirname, Opts, FilesToMerge0) ->
+merge(Dirname,Opts,FilesToMerge) when is_list(FilesToMerge) ->
+ merge(Dirname,Opts,{FilesToMerge,[]});
+merge(_Dirname, _Opts, {[],_}) ->
+ ok;
+merge(Dirname, Opts, {FilesToMerge0, ExpiredFiles0}) ->
%% Make sure bitcask app is started so we can pull defaults from env
ok = start_app(),
-
%% Filter the files to merge and ensure that they all exist. It's
%% possible in some circumstances that we'll get an out-of-date
%% list of files.
FilesToMerge = [F || F <- FilesToMerge0,
filelib:is_file(F)],
- merge1(Dirname, Opts, FilesToMerge).
+ ExpiredFiles = [F || F <- ExpiredFiles0,
+ filelib:is_file(F)],
+ merge1(Dirname, Opts, FilesToMerge, ExpiredFiles).
%% Inner merge function, assumes that bitcask is running and all files exist.
-merge1(_Dirname, _Opts, []) ->
+merge1(_Dirname, _Opts, [], []) ->
ok;
-merge1(Dirname, Opts, FilesToMerge) ->
+merge1(Dirname, Opts, FilesToMerge, ExpiredFiles) ->
%% Test to see if this is a complete or partial merge
Partial = not(lists:usort(readable_files(Dirname)) ==
lists:usort(FilesToMerge)),
@@ -550,18 +552,22 @@ merge1(Dirname, Opts, FilesToMerge) ->
TooNew = [F#file_status.filename ||
F <- Summary,
F#file_status.newest_tstamp >= MergeStart],
- InFiles = lists:reverse(
- lists:foldl(fun(F, Acc) ->
+ {InFiles,InExpiredFiles} = lists:foldl(fun(F, {InFilesAcc,InExpiredAcc} = Acc) ->
case lists:member(F#filestate.filename,
TooNew) of
false ->
- [F|Acc];
+ case lists:member(F#filestate.filename,
+ ExpiredFiles) of
+ false ->
+ {[F|InFilesAcc],InExpiredAcc};
+ true ->
+ {InFilesAcc,[F|InExpiredAcc]}
+ end;
true ->
bitcask_fileops:close(F),
Acc
end
- end, [], InFiles1)),
-
+ end, {[],[]}, InFiles1),
%% Setup our first output merge file and update the merge lock accordingly
{ok, Outfile} = bitcask_fileops:create_file(Dirname, Opts),
ok = bitcask_lockops:write_activefile(
@@ -585,6 +591,7 @@ merge1(Dirname, Opts, FilesToMerge) ->
opts = Opts },
%% Finally, start the merge process
+ ExpiredFilesFinished = expiry_merge(InExpiredFiles, LiveKeyDir, []),
State1 = merge_files(State),
%% Make sure to close the final output file
@@ -593,11 +600,16 @@ merge1(Dirname, Opts, FilesToMerge) ->
%% Close the original input files, schedule them for deletion,
%% close keydirs, and release our lock
- [bitcask_fileops:close(F) || F <- State#mstate.input_files],
+ [bitcask_fileops:close(F) || F <- State#mstate.input_files ++ ExpiredFilesFinished],
{_, _, _, {IterGeneration, _, _}} = bitcask_nifs:keydir_info(LiveKeyDir),
- FileNames = [F#filestate.filename || F <- State#mstate.input_files],
+ FileNames = [F#filestate.filename || F <- State#mstate.input_files ++ ExpiredFilesFinished],
[catch set_setuid_bit(F) || F <- FileNames],
bitcask_merge_delete:defer_delete(Dirname, IterGeneration, FileNames),
+ if InFiles == [] ->
+ bitcask_fileops:delete(Outfile);
+ true ->
+ ok
+ end,
%% Explicitly release our keydirs instead of waiting for GC
bitcask_nifs:keydir_release(LiveKeyDir),
@@ -610,8 +622,8 @@ consider_for_merge(FragTrigger, DeadBytesTrigger, ExpirationGraceTime) ->
fun (F) ->
(F#file_status.fragmented >= FragTrigger)
orelse (F#file_status.dead_bytes >= DeadBytesTrigger)
- orelse ( (F#file_status.oldest_tstamp > 0) %% means that the file has data
- andalso (F#file_status.oldest_tstamp < ExpirationGraceTime)
+ orelse ((F#file_status.oldest_tstamp > 0) andalso %% means that the file has data
+ (F#file_status.newest_tstamp < ExpirationGraceTime)
)
end.
@@ -684,9 +696,17 @@ needs_merge(Ref) ->
_ ->
ok
end,
-
FileNames = [Filename || {Filename, _Reasons} <- MergableFiles],
- {true, FileNames};
+ F = fun(X) ->
+ case X of
+ {data_expired,_,_} ->
+ true;
+ _ ->
+ false
+ end
+ end,
+ ExpiredFiles = [Filename || {Filename, Reasons} <- MergableFiles, lists:any(F,Reasons)],
+ {true, {FileNames, ExpiredFiles}};
false ->
false
end.
@@ -732,8 +752,8 @@ small_file_threshold(Opts) ->
expired_threshold(Cutoff) ->
fun(F) ->
- if F#file_status.oldest_tstamp < Cutoff ->
- [{oldest_tstamp, F#file_status.oldest_tstamp, Cutoff}];
+ if F#file_status.newest_tstamp < Cutoff ->
+ [{data_expired, F#file_status.newest_tstamp, Cutoff}];
true ->
[]
end
@@ -1291,6 +1311,26 @@ poll_deferred_delete_queue_empty() ->
_ -> receive after 1100 -> poll_deferred_delete_queue_empty() end
end.
+%% Internal merge function for cache_merge functionality.
+expiry_merge([], _LiveKeyDir, Acc) ->
+ Acc;
+
+expiry_merge([File | Files], LiveKeyDir, Acc0) ->
+ FileId = bitcask_fileops:file_tstamp(File),
+ Fun = fun(K, Tstamp, {Offset, _TotalSz}, Acc) ->
+ bitcask_nifs:keydir_remove(LiveKeyDir, K, Tstamp, FileId, Offset),
+ Acc
+ end,
+ case bitcask_fileops:fold_keys(File, Fun, ok, default) of
+ {error, Reason} ->
+ error_logger:error_msg("Error folding keys for ~p: ~p\n", [File#filestate.filename,Reason]),
+ Acc = Acc0;
+ _ ->
+ error_logger:info_msg("All keys expired in: ~p scheduling file for deletion\n", [File#filestate.filename]),
+ Acc = lists:append(Acc0, [File])
+ end,
+ expiry_merge(Files, LiveKeyDir, Acc).
+
%% ===================================================================
%% EUnit tests
%% ===================================================================
@@ -1700,9 +1740,9 @@ delete_partial_merge_test() ->
%% selective merge, hit all of the files with deletes but not
%% all of the ones with deleted data
timer:sleep(1100),
- ok = merge("/tmp/bc.test.pardel",[],lists:reverse(lists:nthtail(2,
+ ok = merge("/tmp/bc.test.pardel",[],{lists:reverse(lists:nthtail(2,
lists:reverse(readable_files(
- "/tmp/bc.test.pardel"))))),
+ "/tmp/bc.test.pardel")))),[]}),
%% Verify we've now only got one item left
B2 = bitcask:open("/tmp/bc.test.pardel"),
View
5 src/bitcask_merge_worker.erl
@@ -132,13 +132,14 @@ do_merge(Args) ->
Start = os:timestamp(),
Result = (catch apply(bitcask, merge, Args)),
ElapsedSecs = timer:now_diff(os:timestamp(), Start) / 1000000,
+ [_,_,{Pargs,_}] = Args,
case Result of
ok ->
error_logger:info_msg("Merged ~p in ~p seconds.\n",
- [Args, ElapsedSecs]);
+ [Pargs, ElapsedSecs]);
{Error, Reason} when Error == error; Error == 'EXIT' ->
error_logger:error_msg("Failed to merge ~p: ~p\n",
- [Args, Reason])
+ [Pargs, Reason])
end;
false ->
ok
Please sign in to comment.
Something went wrong with that request. Please try again.