Skip to content

Commit

Permalink
Merge pull request #137 from basho/bugfix/eexist-issue
Browse files Browse the repository at this point in the history
add code to clean up empty data and hintfiles
  • Loading branch information
evanmcc committed Feb 12, 2014
2 parents a99f2f0 + 334860e commit 230b6d6
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 51 deletions.
161 changes: 113 additions & 48 deletions src/bitcask.erl
Expand Up @@ -520,7 +520,7 @@ merge1(Dirname, Opts, FilesToMerge, ExpiredFiles) ->
%% reader/writer comes along in the same VM. Note that we
%% won't necessarily merge all these files.
AllFiles = scan_key_files(readable_files(Dirname), LiveKeyDir, [],
false, true),
false, true, false),

%% Partition all files to files we'll merge and files we
%% won't (so that we can close those extra files once
Expand Down Expand Up @@ -889,31 +889,39 @@ put_state(Ref, State) ->
reverse_sort(L) ->
lists:reverse(lists:sort(L)).

scan_key_files([], _KeyDir, Acc, _CloseFile, _EnoentOK) ->
scan_key_files([], _KeyDir, Acc, _CloseFile, _EnoentOK, _DelZeroFilesP) ->
Acc;
scan_key_files([Filename | Rest], KeyDir, Acc, CloseFile, EnoentOK) ->
scan_key_files([Filename | Rest], KeyDir, Acc, CloseFile, EnoentOK, DelZeroFilesP) ->
%% Restrictive pattern matching below is intentional
case bitcask_fileops:open_file(Filename) of
{ok, File} ->
FileTstamp = bitcask_fileops:file_tstamp(File),
F = fun(K, Tstamp, {Offset, TotalSz}, _) ->
F = fun(K, Tstamp, {Offset, TotalSz}, FunAcc) ->
bitcask_nifs:keydir_put(KeyDir,
K,
FileTstamp,
TotalSz,
Offset,
Tstamp,
false)
false),
FunAcc + 1
end,
bitcask_fileops:fold_keys(File, F, undefined, recovery),
case bitcask_fileops:fold_keys(File, F, 0, recovery) of
0 when DelZeroFilesP ->
bitcask_fileops:delete(File),
error_logger:info_msg("Removing empty cask file ~p",
[Filename]);
_R ->
ok
end,
if CloseFile == true ->
bitcask_fileops:close(File);
true ->
ok
end,
scan_key_files(Rest, KeyDir, [File | Acc], CloseFile, EnoentOK);
scan_key_files(Rest, KeyDir, [File | Acc], CloseFile, EnoentOK, DelZeroFilesP);
{error, enoent} when EnoentOK ->
scan_key_files(Rest, KeyDir, Acc, CloseFile, EnoentOK)
scan_key_files(Rest, KeyDir, Acc, CloseFile, EnoentOK, DelZeroFilesP)
end.

%%
Expand Down Expand Up @@ -984,7 +992,7 @@ init_keydir_scan_key_files(_Dirname, _Keydir, 0) ->
init_keydir_scan_key_files(Dirname, KeyDir, Count) ->
try
SortedFiles = readable_files(Dirname),
_ = scan_key_files(SortedFiles, KeyDir, [], true, false)
_ = scan_key_files(SortedFiles, KeyDir, [], true, false, true)
catch _:_ ->
init_keydir_scan_key_files(Dirname, KeyDir, Count - 1)
end.
Expand Down Expand Up @@ -1217,60 +1225,74 @@ readable_files(Dirname) ->
do_put(_Key, _Value, State, 0, LastErr) ->
{{error, LastErr}, State};
do_put(Key, Value, #bc_state{write_file = WriteFile} = State, Retries, _LastErr) ->
case bitcask_fileops:check_write(WriteFile, Key, Value,
State#bc_state.max_file_size) of
{GoForward, State2} =
case bitcask_fileops:check_write(WriteFile, Key, Value,
State#bc_state.max_file_size) of
wrap ->
%% Time to start a new write file. Note that we do not close the old
%% one, just transition it. The thinking is that closing/reopening
%% for read only access would flush the O/S cache for the file,
%% which may be undesirable.
State2 = wrap_write_file(State);
{go_forward, wrap_write_file(State)};
fresh ->
%% Time to start our first write file.
case bitcask_lockops:acquire(write, State#bc_state.dirname) of
{ok, WriteLock} ->
{ok, NewWriteFile} = bitcask_fileops:create_file(
State#bc_state.dirname,
State#bc_state.opts,
State#bc_state.keydir),
ok = bitcask_lockops:write_activefile(
WriteLock,
bitcask_fileops:filename(NewWriteFile)),
State2 = State#bc_state{ write_file = NewWriteFile,
write_lock = WriteLock };
case bitcask_fileops:create_file(
State#bc_state.dirname,
State#bc_state.opts,
State#bc_state.keydir) of
{ok, NewWriteFile} ->
ok = bitcask_lockops:write_activefile(
WriteLock,
bitcask_fileops:filename(NewWriteFile)),
{go_forward, State#bc_state{ write_file = NewWriteFile,
write_lock = WriteLock }};
{error, Error} ->
error_logger:error_msg("~s:create_file() failed with ~p, retrying put, write_file=~p\n",
[?MODULE, Error, State#bc_state.write_file]),
{Error, wrap_write_file(State#bc_state{write_lock=WriteLock})}
end;
{error, Reason} ->
State2 = undefined,
throw({error, {write_locked, Reason, State#bc_state.dirname}})
end;

ok ->
State2 = State
{go_forward, State}
end,

Tstamp = bitcask_time:tstamp(),
{ok, WriteFile2, Offset, Size} = bitcask_fileops:write(
State2#bc_state.write_file,
Key, Value, Tstamp),
case bitcask_nifs:keydir_put(State2#bc_state.keydir, Key,
bitcask_fileops:file_tstamp(WriteFile2),
Size, Offset, Tstamp, true) of
ok ->
{ok, State2#bc_state { write_file = WriteFile2 }};
already_exists ->
%% Assuming the timestamps in the keydir are
%% valid, there is an edge case where the merge thread
%% could have rewritten this Key to a file with a greater
%% file_id. Rather than synchronize the merge/writer processes,
%% wrap to a new file with a greater file_id and rewrite
%% the key there. Limit the number of recursions in case
%% there is a different issue with the keydir.
State3 = wrap_write_file(State2#bc_state { write_file = WriteFile2 }),
do_put(Key, Value, State3, Retries - 1, already_exists)
if GoForward =/= go_forward ->
do_put(Key, Value, State2, Retries - 1, GoForward);
true ->
Tstamp = bitcask_time:tstamp(),
{ok, WriteFile2, Offset, Size} = bitcask_fileops:write(
State2#bc_state.write_file,
Key, Value, Tstamp),
case bitcask_nifs:keydir_put(State2#bc_state.keydir, Key,
bitcask_fileops:file_tstamp(WriteFile2),
Size, Offset, Tstamp, true) of
ok ->
{ok, State2#bc_state { write_file = WriteFile2 }};
already_exists ->
%% Assuming the timestamps in the keydir are
%% valid, there is an edge case where the merge thread
%% could have rewritten this Key to a file with a greater
%% file_id. Rather than synchronize the merge/writer processes,
%% wrap to a new file with a greater file_id and rewrite
%% the key there. Limit the number of recursions in case
%% there is a different issue with the keydir.
State3 = wrap_write_file(State2#bc_state { write_file = WriteFile2 }),
do_put(Key, Value, State3, Retries - 1, already_exists)
end
end.


wrap_write_file(#bc_state{write_file = WriteFile} = State) ->
LastWriteFile = bitcask_fileops:close_for_writing(WriteFile),
wrap_write_file(#bc_state{write_file = WriteFile,
read_files = ReadFiles} = State) ->
NewReadFiles = case bitcask_fileops:close_for_writing(WriteFile) of
ok ->
ReadFiles;
LastWriteFile ->
[LastWriteFile|ReadFiles]
end,
{ok, NewWriteFile} = bitcask_fileops:create_file(
State#bc_state.dirname,
State#bc_state.opts,
Expand All @@ -1279,8 +1301,7 @@ wrap_write_file(#bc_state{write_file = WriteFile} = State) ->
State#bc_state.write_lock,
bitcask_fileops:filename(NewWriteFile)),
State#bc_state{ write_file = NewWriteFile,
read_files = [LastWriteFile |
State#bc_state.read_files]}.
read_files = NewReadFiles}.

set_setuid_bit(File) ->
%% We're intentionally opinionated about pattern matching here.
Expand Down Expand Up @@ -1997,6 +2018,50 @@ truncated_merge_test() ->
{KV, {ok, V}} = {KV, bitcask:get(B, K)}
end, undefined, GoodData).

gh137_regression_test_() ->
{timeout, 300,
fun() ->
%% Case 1-2: there are ok cask files/hints following N
%% Case 3: mangle either a data file or hint file
%% Case 4-5: a mangled data/hint file exists but not the
%% corresponding type
[{ok, N} = {gh137_regression_test(N, Blob, Type), N} ||
N <- [1,2,3,4,5],
Blob <- [<<>>,
<<"foobar">>,
<<N:(80*8)>> % 80 bytes
],
Type <- ["data", "hint"]],
ok
end}.

gh137_regression_test(N, Blob, Type) ->
Dir = "/tmp/bc.test.gh137",
os:cmd("rm -rf " ++ Dir),
os:cmd("mkdir " ++ Dir),
try
close(init_dataset(Dir, [{max_file_size, 1}], default_dataset())),

%% Verify number of files in directory
3 = length(readable_files(Dir)),

%% GH 137: make a 0-byte data file
file:write_file(Dir ++ "/" ++
integer_to_list(N) ++ ".bitcask." ++ Type, Blob),

B = bitcask:open(Dir, [read_write]),
KsVs = [{<<"k10">>, <<"v10a">>},
{<<"k11">>, <<"v11a">>},
{<<"k12">>, <<"v12a">>},
{<<"k13">>, <<"v13a">>}],
[ok = bitcask:put(B, K, V) || {K, V} <- KsVs],
[{ok, V} = bitcask:get(B, K) || {K, V} <- KsVs],
bitcask:close(B),
ok
after
os:cmd("rm -rf " ++ Dir)
end.

truncate_file(Path, Offset) ->
{ok, FH} = file:open(Path, [read, write]),
{ok, Offset} = file:position(FH, Offset),
Expand Down
2 changes: 1 addition & 1 deletion src/bitcask_file.erl
Expand Up @@ -167,7 +167,7 @@ handle_cast(_Msg, State) ->

handle_info({'DOWN', _Ref, _, _Pid, _Status}, State=#state{fd=Fd}) ->
%% Owner has stopped, close file and shutdown
ok = file:close(Fd),
_ = file:close(Fd),
{stop, normal, State};
handle_info(_Info, State) ->
{noreply, State}.
Expand Down
2 changes: 1 addition & 1 deletion src/bitcask_fileops.erl
Expand Up @@ -93,7 +93,7 @@ create_file(DirName, Opts0, Keydir) ->
catch Error:Reason ->
%% if we fail somehow, do we need to nuke any partial
%% state?
{Error, Reason}
{error, {Error, Reason}}
after
bitcask_lockops:release(Lock)
end.
Expand Down
2 changes: 1 addition & 1 deletion test/bitcask_qc_expiry.erl
Expand Up @@ -90,7 +90,7 @@ prop_expiry() ->
?FORALL({Ops, Expiry, ExpiryGrace, Timestep, M1},
{eqc_gen:non_empty(list(ops(Keys, Values))),
choose(1,10), choose(1, 10), choose(5, 50), choose(5,128)},
?IMPLIES(length(Ops) > 1,
?IMPLIES(length(Ops) > 3,
begin
Dirname = "/tmp/bc.prop.expiry",
?cmd("rm -rf " ++ Dirname),
Expand Down

0 comments on commit 230b6d6

Please sign in to comment.