Permalink
Browse files

initial try

  • Loading branch information...
1 parent f706bad commit 51d1cab07375c0b139b240ee713e6cb42d99d618 @bsparrow435 bsparrow435 committed Nov 19, 2012
Showing with 58 additions and 7 deletions.
  1. +54 −7 src/bitcask.erl
  2. +4 −0 src/bitcask_merge_worker.erl
View
61 src/bitcask.erl
@@ -32,7 +32,7 @@
fold_keys/3, fold_keys/5,
fold/3, fold/5,
iterator/3, iterator_next/1, iterator_release/1,
- merge/1, merge/2, merge/3,
+ merge/1, merge/2, merge/3, merge/4,
needs_merge/1,
is_empty_estimate/1,
status/1]).
@@ -470,12 +470,29 @@ merge(Dirname, Opts, FilesToMerge0) ->
%% list of files.
FilesToMerge = [F || F <- FilesToMerge0,
filelib:is_file(F)],
- merge1(Dirname, Opts, FilesToMerge).
+ merge1(Dirname, Opts, FilesToMerge, []).
+
+%% @doc
+-spec merge(Dirname::string(), Opts::[_], FilesToMerge::[string()], ExpiredFiles::[string()]) -> ok.
+merge(Dirname, Opts, FilesToMerge0, []) ->
+ merge(Dirname, Opts, FilesToMerge0);
+merge(Dirname, Opts, FilesToMerge0, ExpiredFiles0) ->
+ %% Make sure bitcask app is started so we can pull defaults from env
+ ok = start_app(),
+ %% Filter the files to merge and ensure that they all exist. It's
+ %% possible in some circumstances that we'll get an out-of-date
+ %% list of files.
+ FilesToMerge = [F || F <- FilesToMerge0,
+ filelib:is_file(F)],
+ ExpiredFiles = [F || F <- ExpiredFiles0,
+ filelib:is_file(F)],
+ merge1(Dirname, Opts, FilesToMerge, ExpiredFiles).
+
%% Inner merge function, assumes that bitcask is running and all files exist.
-merge1(_Dirname, _Opts, []) ->
+merge1(_Dirname, _Opts, [], []) ->
ok;
-merge1(Dirname, Opts, FilesToMerge) ->
+merge1(Dirname, Opts, FilesToMerge, ExpiredFiles) ->
%% Test to see if this is a complete or partial merge
Partial = not(lists:usort(readable_files(Dirname)) ==
lists:usort(FilesToMerge)),
@@ -551,12 +568,21 @@ merge1(Dirname, Opts, FilesToMerge) ->
case lists:member(F#filestate.filename,
TooNew) of
false ->
- [F|Acc];
+ case lists:member(F#filestate.filename,
+ ExpiredFiles) of
+ false ->
+ [F|Acc];
+ true ->
+ Acc
+ end;
true ->
bitcask_fileops:close(F),
Acc
end
end, [], InFiles1)),
+ InExpiredFiles = [F#file_status.filename || F <- Summary,
+ F#file_status.newest_tstamp < expiry_time(Opts)],
+ expiry_merge(InExpiredFiles, LiveKeyDir),
%% Setup our first output merge file and update the merge lock accordingly
{ok, Outfile} = bitcask_fileops:create_file(Dirname, Opts),
@@ -680,9 +706,9 @@ needs_merge(Ref) ->
_ ->
ok
end,
-
FileNames = [Filename || {Filename, _Reasons} <- MergableFiles],
- {true, FileNames};
+ ExpiredFiles = [Filename || {Filename, [{oldest_tstamp,_,_}]} <- MergableFiles],
+ {true, FileNames, ExpiredFiles};
false ->
false
end.
@@ -1278,6 +1304,27 @@ poll_deferred_delete_queue_empty() ->
_ -> receive after 1100 -> poll_deferred_delete_queue_empty() end
end.
+%% Internal merge function for cache_merge functionality.
+expiry_merge([], _LiveKeyDir) ->
+ ok;
+
+expiry_merge([File | Files], LiveKeyDir) ->
+ FileId = bitcask_fileops:file_tstamp(File),
+ {ok, FileDesc} = bitcask_fileops:open_file(File),
+ Fun = fun(K, Tstamp, {Offset, _TotalSz}, Acc) ->
+ bitcask_nifs:keydir_remove(LiveKeyDir, K, Tstamp, FileId, Offset),
+ Acc
+ end,
+ case bitcask_fileops:fold_keys(FileDesc, Fun, accumulator, hintfile) of
+ {error, Stuff} ->
+ error_logger:error_msg("Ohhh nooo! something went wrong: ~p\n", Stuff);
+ Result ->
+ error_logger:info_msg("YAARRRRR all keys expired in: ~p\n", [File]),
+ bitcask_fileops:delete(FileDesc),
+ bitcask_fileops:close(FileDesc)
+ end,
+ expiry_merge(Files, LiveKeyDir).
+
%% ===================================================================
%% EUnit tests
%% ===================================================================
View
4 src/bitcask_merge_worker.erl
@@ -61,6 +61,9 @@ merge(Dir, Opts) ->
merge(Dir, Opts, Files) ->
gen_server:call(?MODULE, {merge, [Dir, Opts, Files]}, infinity).
+merge(Dir, Opts, Files, ExpiredFiles) ->
+ gen_server:call(?MODULE, {merge, [Dir, Opts, Files, ExpiredFiles]}, infinity).
+
%% ====================================================================
%% gen_server callbacks
%% ====================================================================
@@ -134,6 +137,7 @@ do_merge(Args) ->
ElapsedSecs = timer:now_diff(os:timestamp(), Start) / 1000000,
case Result of
ok ->
+ % TODO: Don't log all the Args, just the merged file names
error_logger:info_msg("Merged ~p in ~p seconds.\n",
[Args, ElapsedSecs]);
{Error, Reason} when Error == error; Error == 'EXIT' ->

0 comments on commit 51d1cab

Please sign in to comment.