Skip to content

Loading…

Filename from max timestamp (resubmitted) #19

Closed
wants to merge 2 commits into from

1 participant

@jonmeredith
Basho Technologies member

Changed next data file name (timestamp) to be greater than any current fileid

If muliple data files are created in a second, previously bitcask would
just increment the timestamp based filename by one and try and create again.
With very small file sizes the next active filename could get many seconds
ahead of the current time. If merge ran and removed files then it would create
new files with timestamps less than the current output file confusing the merge
logic.

(Resubmitted with bctt enhancements committed first)

jonmeredith added some commits
@jonmeredith jonmeredith Improved merging in bctt.
The writer thread now calls needs_merge (which closes open file handles
from the old merge) and kicks the merge process using the filenames
returned by merge.

Added support for merge parameters.

Added extra docs in the header comment.
c1eeaa0
@jonmeredith jonmeredith Changed next data file name (timestamp) to be greater than any curren…
…t fileid.

If multiple data files are created in a second, previously bitcask would
just increment the timestamp based filename by one and try and create again.
With very small file sizes the next active filename could get many seconds
ahead of the current time.  If merge ran and removed files then it would create
new files with timestamps less than the current output file confusing the merge
logic.
dd953ea
@jonmeredith
Basho Technologies member

Third times the charm - bctt was broken. Resubmitting.

@slfritchie slfritchie added a commit that referenced this pull request
@slfritchie slfritchie Work-around model limitation/bug in PULSE test, bitcask_eqc.erl
In the C65 counterexample below, the {var,2} worker proc tries to
get key #19 and allegedly fails.  The last step of the main proc is
a put loop of keys #1 - #34.  The {var,2} worker is racing with
both the main proc and also with the {var,7} worker that is
doing a fold.  The fold happens first, which freezes the keydir
when the next put happens.  Then the {var,2} worker does its get
and finds (correctly) `not_found` ... but the model is buggy and
can't predict the correct answer.

I'm not good enough at the temporal logic to fix the model
The Correct Way.  So I've patched it up so that any 'bad' thing
detected by the main/1st proc is truly bad.  But if a 'bad' thing
happens in a forked worker, then if we know that there's a fold
happening at the same time, we assume that the model's predicted
answer is bad and thus we *ignore* the 'bad' thing.

    191> C65 = binary_to_term(element(2,file:read_file("zzz.c65@develop@commit=a2d75.bin"))).
    [[{set,{var,2},
           {call,bitcask_eqc,fork,
                 [[{init,{state,undefined,false,false,[]}},
                   {set,{not_var,8},{not_call,bitcask_eqc,bc_open,[false]}}]]}},
      {set,{var,4},
           {call,bitcask_eqc,fork,
                 [[{init,{state,undefined,false,false,[]}},
                   {set,{not_var,2},{not_call,bitcask_eqc,bc_open,[false]}},
                   {set,{not_var,3},
                        {not_call,bitcask_eqc,bc_close,[{not_var,2}]}},
                   {set,{not_var,9},{not_call,bitcask_eqc,bc_open,[false]}},
                   {set,{not_var,10},
                        {not_call,bitcask_eqc,get,[{not_var,9},19]}}]]}},
      {set,{var,6},
           {call,bitcask_eqc,fork,
                 [[{init,{state,undefined,false,false,[]}},
                   {set,{not_var,3},{not_call,bitcask_eqc,bc_open,[false]}}]]}},
      {set,{var,7},
           {call,bitcask_eqc,fork,
                 [[{init,{state,undefined,false,false,[]}},
                   {set,{not_var,5},{not_call,bitcask_eqc,bc_open,[false]}},
                   {set,{not_var,7},
                        {not_call,bitcask_eqc,fold,[{not_var,5}]}}]]}},
      {set,{var,12},
           {call,bitcask_eqc,fork,
                 [[{init,{state,undefined,false,false,[]}},
                   {set,{not_var,3},{not_call,bitcask_eqc,bc_open,[false]}},
                   {set,{not_var,8},
                        {not_call,bitcask_eqc,fold,[{not_var,3}]}}]]}},
      {set,{var,17},{call,bitcask_eqc,bc_open,[true]}},
      {set,{var,28},{call,bitcask_eqc,put,[{var,17},1,<<>>]}},
      {set,{var,29},{call,bitcask_eqc,needs_merge,[{var,17}]}},
      {set,{var,30},{call,bitcask_eqc,fork_merge,[{var,17}]}},
      {set,{var,32},
           {call,bitcask_eqc,puts,[{var,17},{1,34},<<>>]}}],
     {77863,46676,48146},
     [{events,[]}]]
3b70204
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jul 11, 2011
  1. @jonmeredith

    Improved merging in bctt.

    jonmeredith committed
    The writer thread now calls needs_merge (which closes open file handles
    from the old merge) and kicks the merge process using the filenames
    returned by merge.
    
    Added support for merge parameters.
    
    Added extra docs in the header comment.
  2. @jonmeredith

    Changed next data file name (timestamp) to be greater than any curren…

    jonmeredith committed
    …t fileid.
    
    If multiple data files are created in a second, previously bitcask would
    just increment the timestamp based filename by one and try and create again.
    With very small file sizes the next active filename could get many seconds
    ahead of the current time.  If merge ran and removed files then it would create
    new files with timestamps less than the current output file confusing the merge
    logic.
This page is out of date. Refresh to see the latest.
Showing with 186 additions and 51 deletions.
  1. +3 −7 src/bitcask.erl
  2. +27 −4 src/bitcask_fileops.erl
  3. +156 −40 test/bctt
View
10 src/bitcask.erl
@@ -738,13 +738,9 @@ get_filestate(FileId,
list_data_files(Dirname, WritingFile, Mergingfile) ->
- %% Build a list of {tstamp, filename} for all files in the directory that
- %% match our regex. Then reverse sort that list and extract the
- %% fully-qualified filename.
- Files = filelib:fold_files(Dirname, "[0-9]+.bitcask.data", false,
- fun(F, Acc) ->
- [{bitcask_fileops:file_tstamp(F), F} | Acc]
- end, []),
+ %% Get list of {tstamp, filename} for all files in the directory then
+ %% reverse sort that list and extract the fully-qualified filename.
+ Files = bitcask_fileops:data_file_tstamps(Dirname),
[F || {_Tstamp, F} <- reverse_sort(Files),
F /= WritingFile,
F /= Mergingfile].
View
31 src/bitcask_fileops.erl
@@ -30,6 +30,7 @@
open_file/1,
close/1,
close_for_writing/1,
+ data_file_tstamps/1,
write/4,
read/3,
sync/1,
@@ -63,7 +64,8 @@
%% Called on a Dirname, will open a fresh file in that directory.
-spec create_file(Dirname :: string(), Opts :: [any()]) -> {ok, #filestate{}}.
create_file(DirName, Opts) ->
- create_file_loop(DirName, Opts, tstamp()).
+ TS = erlang:max(most_recent_tstamp(DirName) + 1, tstamp()),
+ create_file_loop(DirName, Opts, TS).
%% @doc Open an existing file for reading.
%% Called with fully-qualified filename.
@@ -103,6 +105,16 @@ close_for_writing(State =
file:close(HintFd),
State#filestate { mode = read_only, hintfd = undefined }.
+%% Build a list of {tstamp, filename} for all files in the directory that
+%% match our regex.
+-spec data_file_tstamps(Dirname :: string()) -> [{integer(), string()}].
+data_file_tstamps(Dirname) ->
+ filelib:fold_files(Dirname, "[0-9]+.bitcask.data", false,
+ fun(F, Acc) ->
+ [{file_tstamp(F), F} | Acc]
+ end, []).
+
+
%% @doc Use only after merging, to permanently delete a data file.
-spec delete(#filestate{}) -> ok | {error, atom()}.
delete(#filestate{ filename = FN } = State) ->
@@ -459,14 +471,17 @@ create_file_loop(DirName, Opts, Tstamp) ->
end;
false ->
%% Couldn't create a new file with the requested name,
- %% increment the tstamp by 1 and try again. Conceptually,
- %% this introduces some drift into the actual creation
+ %% check for the more recent timestamp and increment by
+ %% 1 and try again.
+ %% This introduces some drift into the actual creation
%% time, but given that we only have at most 2 writers
%% (writer + merger) for a given bitcask, it shouldn't be
%% more than a few seconds. The alternative it to sleep
%% until the next second rolls around -- but this
%% introduces lengthy, unnecessary delays.
- create_file_loop(DirName, Opts, Tstamp + 1)
+ %% This also handles the possibility of clock skew
+ MostRecentTS = most_recent_tstamp(DirName),
+ create_file_loop(DirName, Opts, MostRecentTS + 1)
end.
generate_hintfile(Filename, {FolderMod, FolderFn, FolderArgs}) ->
@@ -493,6 +508,14 @@ hintfile_entry(Key, Tstamp, {Offset, TotalSz}) ->
[<<Tstamp:?TSTAMPFIELD>>, <<KeySz:?KEYSIZEFIELD>>,
<<TotalSz:?TOTALSIZEFIELD>>, <<Offset:?OFFSETFIELD>>, Key].
+
+%% Find the most recent timestamp for a .data file
+most_recent_tstamp(DirName) ->
+ lists:foldl(fun({TS,_},Acc) ->
+ erlang:max(TS, Acc)
+ end, 0, data_file_tstamps(DirName)).
+
+
temp_filename(Template) ->
temp_filename(Template, now()).
View
196 test/bctt
@@ -4,13 +4,52 @@
%%
%% Bitcask Torture Tests
%%
-%% Pre-populates a bitcask file with a known range of keys, the value for
+%% Pre-populates a bitcask with a known range of keys, the value for
%% each key stores the key itself and a sequence number for the write
%% (initially 1).
%%
-%% The test itself consists of a writer continually rewriting all keys with
-%% the next sequence number while other processes read, fold over and merge
-%% the cask.
+%% The test then continually rewrites all keys with the next sequence number
+%% while other processes read, fold over and merge the cask.
+%%
+%% Parameters:
+%%
+%% duration - number of milliseconds to run the test for. If 0 then
+%% only the initial write will take place. Default 0 ms.
+%%
+%% max_file_size - maximum file size within the bitcask in bytes.
+%% Default 10Mb.
+%%
+%% stats_freq - how often to output test status. Interval in ms. Default
+%% 10,000 ms.
+%%
+%% cask - name of bitcask to open
+%%
+%% num_keys - number of keys used in test
+%%
+%% readers - number of threads reading from the bitcask. Default 0.
+%%
+%% folders - number of threads calling bitcask:fold. Each fold checks
+%% all keys visited only once. A new process is created for
+%% each fold. Default 0.
+%%
+%% foldkeys - number of threads calling bitcask:fold. Each fold checks
+%% all keys visited only once. A new process is created for
+%% each fold. Default 0.
+%%
+%% mergers - if non-zero run a single merge thread. The merge thread
+%% is kicked by the writer thread if needs_merge returns
+%% true and passes the returned merge filenames. Default 0.
+%%
+%% needs_merge_freq - writer thread calls needs_merge after needs_merge_freq
+%% writes to ensure files are closed after merges.
+%% Default 1000.
+%%
+%% frag_merge_trigger - bitcask app env variables - set before running
+%% dead_bytes_merge_trigger
+%% frag_threshold
+%% dead_bytes_threshold
+%% small_file_threshold
+
%%
-module(bctt).
@@ -21,6 +60,7 @@
folder_reps=0,
fold_keys_reps=0,
merger_reps=0,
+ merges_pending=0,
duration,
status_freq = 10000,
cask = "bctt.bc",
@@ -28,6 +68,12 @@
writer_pid,
restart_writer=false,
writers=1,
+ needs_merge_freq = 1000,
+ frag_merge_trigger, % Env default trigger on 60% fragmentation
+ dead_bytes_merge_trigger, % Env default trigger on dead bytes > 512 MB
+ frag_threshold, % Include merge >= 40% fragmentation
+ dead_bytes_threshold, % Include merge dead bytes > 128 MB
+ small_file_threshold, % Include merge file is < 10 MB
readers=0,
folders=0,
foldkeys=0,
@@ -56,6 +102,7 @@ main(Args) ->
print_params(State),
ensure_bitcask(),
ensure_deps(),
+ set_bitcask_env(State),
do_test(State);
syntax ->
syntax(),
@@ -93,11 +140,11 @@ do_test(State0) ->
%% Start continually rewriting the keys and optionally reading,
%% folding and merging
State = State1#state{seq = 1},
+ start_merger(State, State#state.mergers),
kick_writer(State),
start_readers(State, State#state.readers),
start_folders(State, State#state.folders),
start_fold_keys(State, State#state.foldkeys),
- start_mergers(State, State#state.mergers),
schedule_status(State),
case State#state.duration of
undefined ->
@@ -116,10 +163,10 @@ restart_procs(State) ->
io:format("Test ending...\n"),
State;
status ->
- io:format("Writer seq: ~p Readers=~p Folders=~p FoldKeys=~p Merges=~p\n",
+ io:format("Writer seq: ~p Readers=~p Folders=~p FoldKeys=~p Merges=~p MergesPending=~p\n",
[State#state.seq, State#state.reader_reps,
State#state.folder_reps, State#state.fold_keys_reps,
- State#state.merger_reps]),
+ State#state.merger_reps, State#state.merges_pending]),
schedule_status(State),
restart_procs(State);
{write_done, WriteSeq} ->
@@ -136,9 +183,11 @@ restart_procs(State) ->
State1 = start_writer(State),
kick_writer(State1),
restart_procs(State1);
+ merge_pending ->
+ restart_procs(State#state{merges_pending = State#state.merges_pending+1});
merge_done ->
- start_mergers(State, 1),
- restart_procs(State#state{merger_reps = State#state.merger_reps+1});
+ restart_procs(State#state{merges_pending = State#state.merges_pending-1,
+ merger_reps = State#state.merger_reps+1});
{read_done, _ReadSeq} ->
start_readers(State, 1),
restart_procs(State#state{reader_reps = State#state.reader_reps+1});
@@ -158,6 +207,7 @@ restart_procs(State) ->
State
end.
+
%% Wait for the initial writer to complete - the os:cmd call
%% can generate an EXIT message
wait_for_writer(State) ->
@@ -171,7 +221,8 @@ wait_for_writer(State) ->
ok
end.
-wait_for_procs(#state{writers = 0, readers = 0, folders = 0, foldkeys = 0, mergers = 0}) ->
+wait_for_procs(#state{writers = 0, readers = 0, folders = 0, foldkeys = 0, merges_pending = 0}) ->
+ catch merger ! exit,
io:format("Test complete\n");
wait_for_procs(State) ->
receive
@@ -183,8 +234,11 @@ wait_for_procs(State) ->
wait_for_procs(State#state{folders = State#state.folders - 1});
{fold_keys_done, _FoldKeySeq} ->
wait_for_procs(State#state{foldkeys = State#state.foldkeys - 1});
+ merge_pending -> %% The writer could still be adding them as we try to shut down.
+ wait_for_procs(State#state{merges_pending = State#state.merges_pending + 1});
merge_done ->
- wait_for_procs(State#state{mergers = State#state.mergers - 1});
+ wait_for_procs(State#state{merges_pending = State#state.merges_pending - 1,
+ merger_reps = State#state.merger_reps+1});
{write_done, _WriteSeq} ->
wait_for_procs(State);
write_exit ->
@@ -209,8 +263,9 @@ start_writer(State) ->
Caller = self(),
Pid = spawn_link(fun() ->
Opts = writer_opts(State),
+ %% Until closing stale file handles resolved
Ref = bitcask:open(State#state.cask, Opts),
- write_proc(Ref, Caller)
+ write_proc(Ref, Caller)
end),
%%X io:format("Started writer pid ~p\n", [Pid]),
State#state{writer_pid = Pid}.
@@ -261,15 +316,18 @@ start_fold_keys(State, NumFoldKeys) ->
end),
start_fold_keys(State, NumFoldKeys - 1).
-start_mergers(_State, 0) ->
- ok;
-start_mergers(State, NumMergers) ->
+start_merger(_State, 0) ->
+ ok; % no merger, messages to it will be dropped
+start_merger(State, _NumMergers) -> % non-zero mergers kicks it off
Caller = self(),
- spawn_link(fun() -> merge_proc(State#state.cask, Caller) end),
- start_mergers(State, NumMergers - 1).
+ spawn_link(fun() ->
+ register(merger, self()),
+ merge_proc(State#state.cask, Caller)
+ end).
kick_writer(State) ->
- State#state.writer_pid ! {start, State#state.seq + 1, State#state.num_keys}.
+ State#state.writer_pid ! {start, State#state.seq + 1, State#state.num_keys,
+ State#state.needs_merge_freq}.
stop_writer(State) ->
%%X io:format("Stopping writer ~p\n", [State#state.writer_pid]),
@@ -280,26 +338,45 @@ stop_writer(State) ->
%%X io:format("Stopped writer ~p\n", [State#state.writer_pid]),
ok
after
- 10000 ->
+ 60000 ->
erlang:error({writer_pid_timeout, State#state.writer_pid})
end.
+
write_proc(Ref, Caller) ->
receive
stop ->
%%X io:format("Writer ~p received stop request\n", [self()]),
Caller ! write_exit;
- {start, Seq, NumKeys} ->
- write(Ref, NumKeys, Seq),
+ {start, Seq, NumKeys, NeedsMergeFreq} ->
+%% io:format("Writer starting\n"),
+ write(Ref, NumKeys, NeedsMergeFreq, Seq, Caller),
Caller ! {write_done, Seq},
write_proc(Ref, Caller)
end.
-write(_Ref, 0, _Seq) ->
+write(_Ref, 0, _Seq, _NeedsMergeFreq, _Caller) ->
ok;
-write(Ref, Key, Seq) ->
- bitcask:put(Ref, <<Key:32>>, term_to_binary({Key, Seq})),
- write(Ref, Key - 1, Seq).
+write(Ref, Key, Seq, NeedsMergeFreq, Caller) ->
+ try
+ case (Key rem NeedsMergeFreq) == 0 of
+ true ->
+ case bitcask:needs_merge(Ref) of
+ {true, Filenames} ->
+ Caller ! merge_pending,
+ merger ! {kick_merge, Filenames};
+ false ->
+ ok
+ end;
+ _ ->
+ ok
+ end,
+ ok = bitcask:put(Ref, <<Key:32>>, term_to_binary({Key, Seq}))
+ catch
+ _:Err ->
+ io:format(user, "Put ~p: ~p\n", [Key, Err])
+ end,
+ write(Ref, Key - 1, Seq, NeedsMergeFreq, Caller).
read_proc(Cask, NumKeys, Seq, Caller) ->
Ref = bitcask:open(Cask),
@@ -310,9 +387,14 @@ read_proc(Cask, NumKeys, Seq, Caller) ->
read(_Ref, 0, _Iter) ->
ok;
read(Ref, Key, MinSeq) ->
- {ok, Bin} = bitcask:get(Ref, <<Key:32>>),
- {Key, Seq} = binary_to_term(Bin),
- true = (Seq >= MinSeq),
+ try
+ {ok, Bin} = bitcask:get(Ref, <<Key:32>>),
+ {Key, Seq} = binary_to_term(Bin),
+ true = (Seq >= MinSeq)
+ catch
+ _:Err ->
+ io:format(user, "Get ~p: ~p\n", [Key, Err])
+ end,
read(Ref, Key - 1, MinSeq).
fold_proc(Cask, NumKeys, Seq, Caller) ->
@@ -356,8 +438,20 @@ check_fold(Key1, _MaxKey, [_Key2 | _Rest]) ->
merge_proc(Cask, Caller) ->
- ok = bitcask:merge(Cask),
- Caller ! merge_done.
+ receive
+ {kick_merge, Filenames} ->
+ try
+ ok = bitcask:merge(Cask, [], Filenames)
+ catch
+ %% Maybe keydir is being loaded
+ _:{error, not_ready} ->
+ ok
+ end,
+ Caller ! merge_done,
+ merge_proc(Cask, Caller);
+ stop ->
+ ok
+ end.
ensure_bitcask() ->
@@ -422,6 +516,21 @@ parent_dir(Dirs) ->
bitcask_beam(Cwd) ->
filename:join(Cwd, ["bitcask" ++ code:objfile_extension()]).
+set_bitcask_env(State) ->
+ application:load(bitcask),
+ set_env(frag_merge_trigger, State#state.frag_merge_trigger),
+ set_env(dead_bytes_merge_trigger, State#state.dead_bytes_merge_trigger),
+ set_env(frag_threshold, State#state.frag_threshold),
+ set_env(dead_bytes_threshold, State#state.dead_bytes_threshold),
+ set_env(small_file_threshold, State#state.small_file_threshold),
+ io:format("Bitcask AppEnv:\n~p\n", [application:get_all_env(bitcask)]).
+
+set_env(_, undefined) ->
+ ok;
+set_env(Var, Val) ->
+ application:set_env(bitcask, Var, Val).
+
+
process_args([], State) ->
{ok, State};
process_args([Arg | Rest], State) ->
@@ -442,7 +551,13 @@ process_arg({Name, Val}, State) when Name =:= duration;
Name =:= foldkeys;
Name =:= mergers;
Name =:= max_file_size;
- Name =:= open_timeout ->
+ Name =:= open_timeout;
+ Name =:= needs_merge_freq;
+ Name =:= frag_merge_trigger;
+ Name =:= dead_bytes_merge_trigger;
+ Name =:= frag_threshold;
+ Name =:= dead_bytes_threshold;
+ Name =:= small_file_threshold ->
case is_integer(Val) of
true ->
{ok, setelement(get_state_index(Name), State, Val)};
@@ -495,15 +610,16 @@ get_state_index(Name, Index, [_OtherName | Rest]) ->
print_params(State) ->
io:format("Bitcask Test\n"),
- io:format("duration: ~s\n", [format_duration(State#state.duration)]),
- io:format("cask: ~s\n", [State#state.cask]),
- io:format("num_keys: ~b\n", [State#state.num_keys]),
- io:format("readers: ~b\n", [State#state.readers]),
- io:format("folders: ~b\n", [State#state.folders]),
- io:format("foldkeys: ~b\n", [State#state.foldkeys]),
- io:format("mergers: ~b\n", [State#state.mergers]),
- io:format("max_file_size: ~s\n", [format_max_file_size(State#state.max_file_size)]),
- io:format("open_timeout: ~s\n", [format_timeout(State#state.open_timeout)]),
+ io:format("duration: ~s\n", [format_duration(State#state.duration)]),
+ io:format("cask: ~s\n", [State#state.cask]),
+ io:format("num_keys: ~b\n", [State#state.num_keys]),
+ io:format("readers: ~b\n", [State#state.readers]),
+ io:format("folders: ~b\n", [State#state.folders]),
+ io:format("foldkeys: ~b\n", [State#state.foldkeys]),
+ io:format("mergers: ~b\n", [State#state.mergers]),
+ io:format("max_file_size: ~s\n", [format_max_file_size(State#state.max_file_size)]),
+ io:format("needs_merge_freq: ~b\n", [State#state.needs_merge_freq]),
+ io:format("open_timeout: ~s\n", [format_timeout(State#state.open_timeout)]),
io:format("\n").
format_duration(undefined) ->
Something went wrong with that request. Please try again.