Change behavior of merge when merging for data expiration #87

Merged
merged 13 commits into from May 10, 2013
View
@@ -140,9 +140,6 @@ open(Dirname, Opts) ->
{error, Reason}
end.
-
-
-
%% @doc Close a bitcask data store and flush any pending writes to disk.
-spec close(reference()) -> ok.
close(Ref) ->
@@ -446,34 +443,39 @@ iterator_release(Ref) ->
%% into a more compact form.
-spec merge(Dirname::string()) -> ok.
merge(Dirname) ->
- merge(Dirname, [], readable_files(Dirname)).
+ merge(Dirname, [], {readable_files(Dirname), []}).
%% @doc Merge several data files within a bitcask datastore
%% into a more compact form.
-spec merge(Dirname::string(), Opts::[_]) -> ok.
merge(Dirname, Opts) ->
- merge(Dirname, Opts, readable_files(Dirname)).
+ merge(Dirname, Opts, {readable_files(Dirname), []}).
%% @doc Merge several data files within a bitcask datastore
%% into a more compact form.
-spec merge(Dirname::string(), Opts::[_], FilesToMerge::[string()]) -> ok.
merge(_Dirname, _Opts, []) ->
ok;
-merge(Dirname, Opts, FilesToMerge0) ->
+merge(Dirname,Opts,FilesToMerge) when is_list(FilesToMerge) ->
+ merge(Dirname,Opts,{FilesToMerge,[]});
+merge(_Dirname, _Opts, {[],_}) ->
+ ok;
+merge(Dirname, Opts, {FilesToMerge0, ExpiredFiles0}) ->
%% Make sure bitcask app is started so we can pull defaults from env
ok = start_app(),
-
%% Filter the files to merge and ensure that they all exist. It's
%% possible in some circumstances that we'll get an out-of-date
%% list of files.
FilesToMerge = [F || F <- FilesToMerge0,
filelib:is_file(F)],
- merge1(Dirname, Opts, FilesToMerge).
+ ExpiredFiles = [F || F <- ExpiredFiles0,
+ filelib:is_file(F)],
+ merge1(Dirname, Opts, FilesToMerge, ExpiredFiles).
%% Inner merge function, assumes that bitcask is running and all files exist.
-merge1(_Dirname, _Opts, []) ->
+merge1(_Dirname, _Opts, [], []) ->
ok;
-merge1(Dirname, Opts, FilesToMerge) ->
+merge1(Dirname, Opts, FilesToMerge, ExpiredFiles) ->
%% Test to see if this is a complete or partial merge
Partial = not(lists:usort(readable_files(Dirname)) ==
lists:usort(FilesToMerge)),
@@ -544,18 +546,22 @@ merge1(Dirname, Opts, FilesToMerge) ->
TooNew = [F#file_status.filename ||
F <- Summary,
F#file_status.newest_tstamp >= MergeStart],
- InFiles = lists:reverse(
- lists:foldl(fun(F, Acc) ->
+ {InFiles,InExpiredFiles} = lists:foldl(fun(F, {InFilesAcc,InExpiredAcc} = Acc) ->
case lists:member(F#filestate.filename,
TooNew) of
false ->
- [F|Acc];
+ case lists:member(F#filestate.filename,
+ ExpiredFiles) of
+ false ->
+ {[F|InFilesAcc],InExpiredAcc};
+ true ->
+ {InFilesAcc,[F|InExpiredAcc]}
+ end;
true ->
bitcask_fileops:close(F),
Acc
end
- end, [], InFiles1)),
-
+ end, {[],[]}, InFiles1),
%% Setup our first output merge file and update the merge lock accordingly
{ok, Outfile} = bitcask_fileops:create_file(Dirname, Opts),
ok = bitcask_lockops:write_activefile(
@@ -579,6 +585,7 @@ merge1(Dirname, Opts, FilesToMerge) ->
opts = Opts },
%% Finally, start the merge process
+ ExpiredFilesFinished = expiry_merge(InExpiredFiles, LiveKeyDir, []),
State1 = merge_files(State),
%% Make sure to close the final output file
@@ -587,11 +594,16 @@ merge1(Dirname, Opts, FilesToMerge) ->
%% Close the original input files, schedule them for deletion,
%% close keydirs, and release our lock
- [bitcask_fileops:close(F) || F <- State#mstate.input_files],
+ [bitcask_fileops:close(F) || F <- State#mstate.input_files ++ ExpiredFilesFinished],
{_, _, _, {IterGeneration, _, _}} = bitcask_nifs:keydir_info(LiveKeyDir),
- FileNames = [F#filestate.filename || F <- State#mstate.input_files],
+ FileNames = [F#filestate.filename || F <- State#mstate.input_files ++ ExpiredFilesFinished],
[catch set_setuid_bit(F) || F <- FileNames],
bitcask_merge_delete:defer_delete(Dirname, IterGeneration, FileNames),
+ if InFiles == [] ->
@bsparrow435
bsparrow435 Apr 25, 2013 Basho Technologies member

This is necessary if all files in the merge are being deleted, we need to delete the output file for the merge. Otherwise, we create zero length files on each merge.

+ bitcask_fileops:delete(Outfile);
+ true ->
+ ok
+ end,
%% Explicitly release our keydirs instead of waiting for GC
bitcask_nifs:keydir_release(LiveKeyDir),
@@ -604,8 +616,8 @@ consider_for_merge(FragTrigger, DeadBytesTrigger, ExpirationGraceTime) ->
fun (F) ->
(F#file_status.fragmented >= FragTrigger)
orelse (F#file_status.dead_bytes >= DeadBytesTrigger)
- orelse ( (F#file_status.oldest_tstamp > 0) %% means that the file has data
- andalso (F#file_status.oldest_tstamp < ExpirationGraceTime)
+ orelse ((F#file_status.oldest_tstamp > 0) andalso %% means that the file has data
+ (F#file_status.newest_tstamp < ExpirationGraceTime)
)
end.
@@ -678,9 +690,17 @@ needs_merge(Ref) ->
_ ->
ok
end,
-
FileNames = [Filename || {Filename, _Reasons} <- MergableFiles],
- {true, FileNames};
+ F = fun(X) ->
+ case X of
+ {data_expired,_,_} ->
+ true;
+ _ ->
+ false
+ end
+ end,
+ ExpiredFiles = [Filename || {Filename, Reasons} <- MergableFiles, lists:any(F,Reasons)],
+ {true, {FileNames, ExpiredFiles}};
false ->
false
end.
@@ -726,8 +746,8 @@ small_file_threshold(Opts) ->
expired_threshold(Cutoff) ->
fun(F) ->
- if F#file_status.oldest_tstamp < Cutoff ->
- [{oldest_tstamp, F#file_status.oldest_tstamp, Cutoff}];
+ if F#file_status.newest_tstamp < Cutoff ->
+ [{data_expired, F#file_status.newest_tstamp, Cutoff}];
true ->
[]
end
@@ -1283,6 +1303,26 @@ poll_deferred_delete_queue_empty() ->
_ -> receive after 1100 -> poll_deferred_delete_queue_empty() end
end.
+%% Internal merge function for cache_merge functionality.
+expiry_merge([], _LiveKeyDir, Acc) ->
+ Acc;
+
+expiry_merge([File | Files], LiveKeyDir, Acc0) ->
+ FileId = bitcask_fileops:file_tstamp(File),
+ Fun = fun(K, Tstamp, {Offset, _TotalSz}, Acc) ->
+ bitcask_nifs:keydir_remove(LiveKeyDir, K, Tstamp, FileId, Offset),
+ Acc
+ end,
+ case bitcask_fileops:fold_keys(File, Fun, ok, default) of
+ {error, Reason} ->
+ error_logger:error_msg("Error folding keys for ~p: ~p\n", [File#filestate.filename,Reason]),
+ Acc = Acc0;
+ _ ->
+ error_logger:info_msg("All keys expired in: ~p scheduling file for deletion\n", [File#filestate.filename]),
+ Acc = lists:append(Acc0, [File])
+ end,
+ expiry_merge(Files, LiveKeyDir, Acc).
+
%% ===================================================================
%% EUnit tests
%% ===================================================================
@@ -1692,9 +1732,9 @@ delete_partial_merge_test() ->
%% selective merge, hit all of the files with deletes but not
%% all of the ones with deleted data
timer:sleep(1100),
- ok = merge("/tmp/bc.test.pardel",[],lists:reverse(lists:nthtail(2,
+ ok = merge("/tmp/bc.test.pardel",[],{lists:reverse(lists:nthtail(2,
lists:reverse(readable_files(
- "/tmp/bc.test.pardel"))))),
+ "/tmp/bc.test.pardel")))),[]}),
%% Verify we've now only got one item left
B2 = bitcask:open("/tmp/bc.test.pardel"),
@@ -132,13 +132,14 @@ do_merge(Args) ->
Start = os:timestamp(),
Result = (catch apply(bitcask, merge, Args)),
ElapsedSecs = timer:now_diff(os:timestamp(), Start) / 1000000,
+ [_,_,{Pargs,_}] = Args,
@bsparrow435
bsparrow435 Apr 25, 2013 Basho Technologies member

Only matching on the filenames that are merged. No reason to note all the args, you can read them from the app.config or get_all_env

case Result of
ok ->
error_logger:info_msg("Merged ~p in ~p seconds.\n",
- [Args, ElapsedSecs]);
+ [Pargs, ElapsedSecs]);
{Error, Reason} when Error == error; Error == 'EXIT' ->
error_logger:error_msg("Failed to merge ~p: ~p\n",
- [Args, Reason])
+ [Pargs, Reason])
end;
false ->
ok