Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge commit '64c8d84ba25ee077cf1eaff4b09bfa251147e79d' into jhc-expi…

…ry-merge-1.3.1
  • Loading branch information...
commit 858a2ab879f8db8d268d93076da6223da7354abd 2 parents fbbf11e + 64c8d84
@joecaswell joecaswell authored
Showing with 59 additions and 23 deletions.
  1. +58 −23 src/bitcask.erl
  2. +1 −0  src/bitcask_merge_worker.erl
View
81 src/bitcask.erl
@@ -446,34 +446,39 @@ iterator_release(Ref) ->
%% into a more compact form.
-spec merge(Dirname::string()) -> ok.
merge(Dirname) ->
- merge(Dirname, [], readable_files(Dirname)).
+ merge(Dirname, [], {readable_files(Dirname), []}).
%% @doc Merge several data files within a bitcask datastore
%% into a more compact form.
-spec merge(Dirname::string(), Opts::[_]) -> ok.
merge(Dirname, Opts) ->
- merge(Dirname, Opts, readable_files(Dirname)).
+ merge(Dirname, Opts, {readable_files(Dirname), []}).
%% @doc Merge several data files within a bitcask datastore
%% into a more compact form.
-spec merge(Dirname::string(), Opts::[_], FilesToMerge::[string()]) -> ok.
merge(_Dirname, _Opts, []) ->
ok;
-merge(Dirname, Opts, FilesToMerge0) ->
+merge(Dirname,Opts,FilesToMerge) when is_list(FilesToMerge) ->
+ merge(Dirname,Opts,{FilesToMerge,[]});
+merge(_Dirname, _Opts, {[],_}) ->
+ ok;
+merge(Dirname, Opts, {FilesToMerge0, ExpiredFiles0}) ->
%% Make sure bitcask app is started so we can pull defaults from env
ok = start_app(),
-
%% Filter the files to merge and ensure that they all exist. It's
%% possible in some circumstances that we'll get an out-of-date
%% list of files.
FilesToMerge = [F || F <- FilesToMerge0,
filelib:is_file(F)],
- merge1(Dirname, Opts, FilesToMerge).
+ ExpiredFiles = [F || F <- ExpiredFiles0,
+ filelib:is_file(F)],
+ merge1(Dirname, Opts, FilesToMerge, ExpiredFiles).
%% Inner merge function, assumes that bitcask is running and all files exist.
-merge1(_Dirname, _Opts, []) ->
+merge1(_Dirname, _Opts, [], _ExpiredFiles) ->
ok;
-merge1(Dirname, Opts, FilesToMerge) ->
+merge1(Dirname, Opts, FilesToMerge, ExpiredFiles) ->
%% Test to see if this is a complete or partial merge
Partial = not(lists:usort(readable_files(Dirname)) ==
lists:usort(FilesToMerge)),
@@ -544,18 +549,22 @@ merge1(Dirname, Opts, FilesToMerge) ->
TooNew = [F#file_status.filename ||
F <- Summary,
F#file_status.newest_tstamp >= MergeStart],
- InFiles = lists:reverse(
- lists:foldl(fun(F, Acc) ->
+ {InFiles,InExpiredFiles} = lists:foldl(fun(F, {InFilesAcc,InExpiredAcc} = Acc) ->
case lists:member(F#filestate.filename,
TooNew) of
false ->
- [F|Acc];
+ case lists:member(F#filestate.filename,
+ ExpiredFiles) of
+ false ->
+ {[F|InFilesAcc],InExpiredAcc};
+ true ->
+ {InFilesAcc,[F|InExpiredAcc]}
+ end;
true ->
bitcask_fileops:close(F),
Acc
end
- end, [], InFiles1)),
-
+ end, {[],[]}, InFiles1),
%% Setup our first output merge file and update the merge lock accordingly
{ok, Outfile} = bitcask_fileops:create_file(Dirname, Opts),
ok = bitcask_lockops:write_activefile(
@@ -579,6 +588,7 @@ merge1(Dirname, Opts, FilesToMerge) ->
opts = Opts },
%% Finally, start the merge process
+ ExpiredFilesFinished = expiry_merge(InExpiredFiles, LiveKeyDir, []),
State1 = merge_files(State),
%% Make sure to close the final output file
@@ -587,11 +597,16 @@ merge1(Dirname, Opts, FilesToMerge) ->
%% Close the original input files, schedule them for deletion,
%% close keydirs, and release our lock
- [bitcask_fileops:close(F) || F <- State#mstate.input_files],
+ [bitcask_fileops:close(F) || F <- State#mstate.input_files ++ ExpiredFilesFinished],
{_, _, _, {IterGeneration, _, _}} = bitcask_nifs:keydir_info(LiveKeyDir),
- FileNames = [F#filestate.filename || F <- State#mstate.input_files],
+ FileNames = [F#filestate.filename || F <- State#mstate.input_files ++ ExpiredFilesFinished],
[catch set_setuid_bit(F) || F <- FileNames],
bitcask_merge_delete:defer_delete(Dirname, IterGeneration, FileNames),
+ if InFiles == [] ->
+ bitcask_fileops:delete(Outfile);
+ true ->
+ ok
+ end,
%% Explicitly release our keydirs instead of waiting for GC
bitcask_nifs:keydir_release(LiveKeyDir),
@@ -604,8 +619,8 @@ consider_for_merge(FragTrigger, DeadBytesTrigger, ExpirationGraceTime) ->
fun (F) ->
(F#file_status.fragmented >= FragTrigger)
orelse (F#file_status.dead_bytes >= DeadBytesTrigger)
- orelse ( (F#file_status.oldest_tstamp > 0) %% means that the file has data
- andalso (F#file_status.oldest_tstamp < ExpirationGraceTime)
+ orelse ((F#file_status.oldest_tstamp > 0) andalso %% means that the file has data
+ (F#file_status.newest_tstamp < ExpirationGraceTime)
)
end.
@@ -678,9 +693,9 @@ needs_merge(Ref) ->
_ ->
ok
end,
-
FileNames = [Filename || {Filename, _Reasons} <- MergableFiles],
- {true, FileNames};
+ ExpiredFiles = [Filename || {Filename, [{data_expired,_,_}]} <- MergableFiles],
+ {true, {FileNames, ExpiredFiles}};
false ->
false
end.
@@ -726,8 +741,8 @@ small_file_threshold(Opts) ->
expired_threshold(Cutoff) ->
fun(F) ->
- if F#file_status.oldest_tstamp < Cutoff ->
- [{oldest_tstamp, F#file_status.oldest_tstamp, Cutoff}];
+ if F#file_status.newest_tstamp < Cutoff ->
+ [{data_expired, F#file_status.newest_tstamp, Cutoff}];
true ->
[]
end
@@ -999,7 +1014,7 @@ merge_files(#mstate { dirname = Dirname,
[File#filestate.filename, Dirname, Error]),
State
after
- catch bitcask_fileops:close(File)
+ catch bitcask_fileops:close(File)
end,
merge_files(State2#mstate { input_files = Rest }).
@@ -1283,6 +1298,26 @@ poll_deferred_delete_queue_empty() ->
_ -> receive after 1100 -> poll_deferred_delete_queue_empty() end
end.
+%% Internal merge function for cache_merge functionality.
+expiry_merge([], _LiveKeyDir, Acc) ->
+ Acc;
+
+expiry_merge([File | Files], LiveKeyDir, Acc0) ->
+ FileId = bitcask_fileops:file_tstamp(File),
+ Fun = fun(K, Tstamp, {Offset, _TotalSz}, Acc) ->
+ bitcask_nifs:keydir_remove(LiveKeyDir, K, Tstamp, FileId, Offset),
+ Acc
+ end,
+ case bitcask_fileops:fold_keys(File, Fun, ok, default) of
+ {error, Reason} ->
+ error_logger:error_msg("Error folding keys for ~p: ~p\n", [File#filestate.filename,Reason]),
+ Acc = Acc0;
+ _ ->
+ error_logger:info_msg("All keys expired in: ~p scheduling file for deletion\n", [File#filestate.filename]),
+ Acc = lists:append(Acc0, [File])
+ end,
+ expiry_merge(Files, LiveKeyDir, Acc).
+
%% ===================================================================
%% EUnit tests
%% ===================================================================
@@ -1683,9 +1718,9 @@ delete_partial_merge_test() ->
%% selective merge, hit all of the files with deletes but not
%% all of the ones with deleted data
timer:sleep(1100),
- ok = merge("/tmp/bc.test.pardel",[],lists:reverse(lists:nthtail(2,
+ ok = merge("/tmp/bc.test.pardel",[],{lists:reverse(lists:nthtail(2,
lists:reverse(readable_files(
- "/tmp/bc.test.pardel"))))),
+ "/tmp/bc.test.pardel")))),[]}),
%% Verify we've now only got one item left
B2 = bitcask:open("/tmp/bc.test.pardel"),
View
1  src/bitcask_merge_worker.erl
@@ -134,6 +134,7 @@ do_merge(Args) ->
ElapsedSecs = timer:now_diff(os:timestamp(), Start) / 1000000,
case Result of
ok ->
+ % TODO: Don't log all the Args, just the merged file names
error_logger:info_msg("Merged ~p in ~p seconds.\n",
[Args, ElapsedSecs]);
{Error, Reason} when Error == error; Error == 'EXIT' ->
Please sign in to comment.
Something went wrong with that request. Please try again.