Skip to content
Find file
Fetching contributors…
Cannot retrieve contributors at this time
executable file 696 lines (635 sloc) 23.7 KB
#! /usr/bin/env escript
%% -*- erlang -*-
%%! -pz ../bitcask
%%
%% Bitcask Torture Tests
%%
%% Pre-populates a bitcask with a known range of keys, the value for
%% each key stores the key itself and a sequence number for the write
%% (initially 1).
%%
%% The test then continually rewrites all keys with the next sequence number
%% while other processes read, fold over and merge the cask.
%%
%% Parameters:
%%
%% duration - number of milliseconds to run the test for. If 0 then
%% only the initial write will take place. Default 0 ms.
%%
%% max_file_size - maximum file size within the bitcask in bytes.
%% Default 10Mb.
%%
%% stats_freq - how often to output test status. Interval in ms. Default
%% 10,000 ms.
%%
%% cask - name of bitcask to open
%%
%% num_keys - number of keys used in test
%%
%% readers - number of threads reading from the bitcask. Default 0.
%%
%% folders - number of threads calling bitcask:fold. Each fold checks
%% all keys visited only once. A new process is created for
%% each fold. Default 0.
%%
%% foldkeys - number of threads calling bitcask:fold. Each fold checks
%% all keys visited only once. A new process is created for
%% each fold. Default 0.
%%
%% mergers - if non-zero run a single merge thread. The merge thread
%% is kicked by the writer thread if needs_merge returns
%% true and passes the returned merge filenames. Default 0.
%%
%% needs_merge_freq - writer thread calls needs_merge after needs_merge_freq
%% writes to ensure files are closed after merges.
%% Default 1000.
%%
%% frag_merge_trigger - bitcask app env variables - set before running
%% dead_bytes_merge_trigger
%% frag_threshold
%% dead_bytes_threshold
%% small_file_threshold
%%
-module(bctt).
-compile([export_all]).
-include_lib("bitcask/include/bitcask.hrl").
-record(state, {seq = 0,
reader_reps=0,
folder_reps=0,
fold_keys_reps=0,
merger_reps=0,
merges_pending=0,
duration,
status_freq = 10000,
cask = "bctt.bc",
num_keys=16384,
writer_pid,
restart_writer=false,
writers=1,
needs_merge_freq = 1000,
frag_merge_trigger, % Env default trigger on 60% fragmentation
dead_bytes_merge_trigger, % Env default trigger on dead bytes > 512 MB
frag_threshold, % Include merge >= 40% fragmentation
dead_bytes_threshold, % Include merge dead bytes > 128 MB
small_file_threshold, % Include merge file is < 10 MB
max_fold_age = -1, % Frozen keydir can be as old as you like
max_fold_puts = 0, % as long as there are no updates
readers=0,
folders=0,
foldkeys=0,
mergers=0,
max_file_size=10*1024*1024,
open_timeout=undefined}).
%% Function to run from inside beam
test() ->
test([]).
test(Opts) ->
{ok, State} = process_args(Opts, #state{}),
print_params(State),
proc_lib:spawn(fun() -> do_test(State) end).
%% Escript version
main() ->
main([]).
main(Args) ->
io:format("PWD: ~p\n", [file:get_cwd()]),
try
case process_args([parse_arg(Arg) || Arg <- Args], #state{}) of
{ok, State} ->
print_params(State),
ensure_bitcask(),
ensure_deps(),
set_bitcask_env(State),
do_test(State);
syntax ->
syntax(),
halt(0)
end
catch
_:Why ->
io:format("Failed: ~p\n", [Why]),
halt(1)
end.
syntax() ->
io:format("bctt [duration=Msecs]\n"
" [cask=CaskFile]\n"
" [num_keys=NumKeys]\n"
" [readers=NumReaders]\n"
" [folders=NumFolders]\n"
" [foldkeys=NumFoldKeys]\n"
" [mergers=NumMergers]\n"
" [max_file_size=Bytes] # set 0 for defaults\n"
" [open_timeout=Secs]\n"
" [restart_writer=true|false]\n").
do_test(State0) ->
erlang:process_flag(trap_exit, true),
os:cmd("rm -rf " ++ State0#state.cask),
%% Put a base of keys - each value has {Key, Seq}, starting from 1.
io:format("\nInitial populate.\n"),
State1 = start_writer(State0),
kick_writer(State1),
wait_for_writer(State1),
%% Start continually rewriting the keys and optionally reading,
%% folding and merging
State = State1#state{seq = 1},
start_merger(State, State#state.mergers),
kick_writer(State),
start_readers(State, State#state.readers),
start_folders(State, State#state.folders),
start_fold_keys(State, State#state.foldkeys),
schedule_status(State),
case State#state.duration of
undefined ->
self() ! stop;
Duration ->
timer:send_after(Duration, self(), stop)
end,
io:format("Starting test.\n"),
EndState = restart_procs(State),
stop_writer(EndState),
wait_for_procs(EndState).
restart_procs(State) ->
receive
Msg ->
ok
end,
try
case Msg of
stop ->
io:format("Test ending...\n"),
State;
status ->
io:format("Writer seq: ~p Readers=~p Folders=~p FoldKeys=~p Merges=~p MergesPending=~p\n",
[State#state.seq, State#state.reader_reps,
State#state.folder_reps, State#state.fold_keys_reps,
State#state.merger_reps, State#state.merges_pending]),
schedule_status(State),
restart_procs(State);
{write_done, WriteSeq} ->
true = (State#state.seq+1 =:= WriteSeq),
NewState = State#state{seq = WriteSeq},
case State#state.restart_writer of
true ->
stop_writer(State);
false ->
kick_writer(NewState)
end,
restart_procs(NewState);
write_exit ->
State1 = start_writer(State),
kick_writer(State1),
restart_procs(State1);
merge_pending ->
restart_procs(State#state{merges_pending = State#state.merges_pending+1});
merge_done ->
restart_procs(State#state{merges_pending = State#state.merges_pending-1,
merger_reps = State#state.merger_reps+1});
read_done ->
start_readers(State, 1),
restart_procs(State#state{reader_reps = State#state.reader_reps+1});
fold_done ->
start_folders(State, 1),
restart_procs(State#state{folder_reps = State#state.folder_reps+1});
fold_keys_done ->
start_fold_keys(State, 1),
restart_procs(State#state{fold_keys_reps = State#state.fold_keys_reps+1});
{'EXIT', _From, normal} ->
restart_procs(State);
{'EXIT', From, Reason} ->
io:format("Test process ~p died: ~p\nState: ~p\n", [From, Reason, State]),
State;
Other ->
io:format("Restart procs got unexpected message: ~p\n", [Other]),
State
end
catch
What:Err ->
io:format("restart_proc: ~p ~p\nMsg: ~p\nState: ~p\n", [What, Err, Msg, State]),
State
end.
%% Wait for the initial writer to complete - the os:cmd call
%% can generate an EXIT message
wait_for_writer(State) ->
WriterPid = State#state.writer_pid,
receive
{'EXIT', WriterPid, Why} ->
erlang:error({initial_write_failed, Why});
{'EXIT', _Pid, _Why} ->
wait_for_writer(State);
{write_done, 1} ->
ok
end.
wait_for_procs(#state{writers = 0, readers = 0, folders = 0, foldkeys = 0, merges_pending = 0}) ->
catch merger ! exit,
io:format("Test complete\n");
wait_for_procs(State) ->
receive
stop ->
io:format("Wait for procs got stop message pid ~p\n", [self()]),
wait_for_procs(State);
status ->
wait_for_procs(State);
read_done ->
wait_for_procs(State#state{readers = State#state.readers - 1});
fold_done ->
wait_for_procs(State#state{folders = State#state.folders - 1});
fold_keys_done ->
wait_for_procs(State#state{foldkeys = State#state.foldkeys - 1});
merge_pending -> %% The writer could still be adding them as we try to shut down.
wait_for_procs(State#state{merges_pending = State#state.merges_pending + 1});
merge_done ->
wait_for_procs(State#state{merges_pending = State#state.merges_pending - 1,
merger_reps = State#state.merger_reps+1});
{write_done, _WriteSeq} ->
wait_for_procs(State);
write_exit ->
wait_for_procs(State#state{writers = State#state.writers - 1});
{'EXIT', _From, normal} ->
wait_for_procs(State);
{'EXIT', From, Reason} ->
io:format("Test process ~p died\n~p\n", [From, Reason]);
Other ->
io:format("Wait for procs got unexpected message: ~p\n", [Other])
end.
schedule_status(State) ->
case State#state.status_freq of
undefined ->
ok;
StatusFreq ->
timer:send_after(StatusFreq, self(), status)
end.
start_writer(State) ->
Caller = self(),
Pid = proc_lib:spawn_link(fun() ->
Opts = writer_opts(State),
%% Until closing stale file handles resolved
Ref = bitcask:open(State#state.cask, Opts),
write_proc(Ref, Caller)
end),
%%X io:format("Started writer pid ~p\n", [Pid]),
State#state{writer_pid = Pid}.
writer_opts(State) ->
lists:flatten([
[read_write],
case State#state.max_file_size of
Size when is_integer(Size), Size > 0->
[{max_file_size, Size}];
_ ->
[]
end,
case State#state.open_timeout of
OpenTimeout when is_integer(OpenTimeout) ->
[{open_timeout, OpenTimeout}];
_ ->
[]
end]).
start_readers(_State, 0) ->
ok;
start_readers(State, NumReaders) ->
Caller = self(),
spawn_worker(reader,
fun() ->
read_proc(State#state.cask, State#state.num_keys, Caller)
end),
start_readers(State, NumReaders - 1).
start_folders(_State, 0) ->
ok;
start_folders(State, NumFolders) ->
Caller = self(),
spawn_worker(folder,
fun() ->
fold_proc(State#state.cask, State#state.num_keys,
Caller)
end),
start_folders(State, NumFolders - 1).
start_fold_keys(_State, 0) ->
ok;
start_fold_keys(State, NumFoldKeys) ->
Caller = self(),
spawn_worker(fold_keys,
fun() ->
fold_keys_proc(State#state.cask, State#state.num_keys,
Caller)
end),
start_fold_keys(State, NumFoldKeys - 1).
start_merger(_State, 0) ->
ok; % no merger, messages to it will be dropped
start_merger(State, _NumMergers) -> % non-zero mergers kicks it off
Caller = self(),
spawn_worker(merger,
fun() ->
register(merger, self()),
merge_proc(State#state.cask, Caller)
end).
spawn_worker(Type, Fun) ->
proc_lib:spawn(fun() ->
try
Fun()
catch
_:Err ->
io:format("~p CRASHED ~p: ~p\n",
[Type, self(), Err]),
throw(Err)
end
end).
kick_writer(State) ->
State#state.writer_pid ! {start, State#state.seq + 1, State#state.num_keys,
State#state.needs_merge_freq}.
stop_writer(State) ->
%%X io:format("Stopping writer ~p\n", [State#state.writer_pid]),
MRef = erlang:monitor(process, State#state.writer_pid),
State#state.writer_pid ! stop,
receive
{'DOWN', MRef, _, _, _} ->
%%X io:format("Stopped writer ~p\n", [State#state.writer_pid]),
ok
after
60000 ->
erlang:error({writer_pid_timeout, State#state.writer_pid})
end.
write_proc(Ref, Caller) ->
receive
stop ->
%%X io:format("Writer ~p received stop request\n", [self()]),
Caller ! write_exit;
{start, Seq, NumKeys, NeedsMergeFreq} ->
%% io:format("Writer starting\n"),
write(Ref, NumKeys, Seq, NeedsMergeFreq, Caller),
application:set_env(bitcask, minseq, Seq),
Caller ! {write_done, Seq},
write_proc(Ref, Caller)
end.
write(_Ref, 0, _Seq, _NeedsMergeFreq, _Caller) ->
ok;
write(Ref, Key, Seq, NeedsMergeFreq, Caller) ->
try
case (Key rem NeedsMergeFreq) == 0 of
true ->
case bitcask:needs_merge(Ref) of
{true, Filenames} ->
Caller ! merge_pending,
%% Try and kick the merger, may
%% not be configured to run.
catch merger ! {kick_merge, Filenames};
false ->
ok
end;
_ ->
ok
end
catch
_:Err1 ->
io:format(user, "NeedsMerge ~p: ~p\n", [Key, Err1])
end,
try
ok = bitcask:put(Ref, <<Key:32>>, term_to_binary({Key, Seq}))
catch
_:Err2 ->
io:format(user, "Put ~p: ~p\n", [Key, Err2])
end,
write(Ref, Key - 1, Seq, NeedsMergeFreq, Caller).
read_proc(Cask, NumKeys, Caller) ->
Ref = bitcask:open(Cask),
{ok, Seq} = application:get_env(bitcask, minseq),
%io:format("read_proc starting: minseq=~p\n", [Seq]),
read(Ref, NumKeys, Seq),
bitcask:close(Ref),
Caller ! read_done.
read(_Ref, 0, _MinSeq) ->
ok;
read(Ref, Key, MinSeq) ->
try
{ok, Bin} = bitcask:get(Ref, <<Key:32>>),
{Key, Seq} = binary_to_term(Bin),
case Seq >= MinSeq of
true ->
ok;
false ->
io:format("Read ~p Got: ~p Expected(>=): ~p\n", [Key, Seq, MinSeq])
end
catch
_:Err ->
io:format(user, "Get ~p: ~p\n", [Key, Err])
end,
read(Ref, Key - 1, MinSeq).
fold_proc(Cask, NumKeys, Caller) ->
Ref = bitcask:open(Cask),
fold(Ref, NumKeys),
bitcask:close(Ref),
Caller ! fold_done.
fold(Ref, NumKeys) ->
{ok, MinSeq} = application:get_env(bitcask, minseq),
Folder = fun(<<Key:32>>, Bin, Keys) ->
%% Lookup minium sequence on first pass through
{Key, Seq} = binary_to_term(Bin),
case Seq >= MinSeq of
true ->
ok;
_ ->
io:format("fold returned early seq for ~p. "
"MinExpected: ~p Returned: ~p\n",
[Key, MinSeq, Seq])
end,
[Key | Keys]
end,
FoldedKeys = bitcask:fold(Ref, Folder, []),
check_fold(1, NumKeys, lists:sort(FoldedKeys)).
fold_keys_proc(Cask, NumKeys, Caller) ->
Ref = bitcask:open(Cask),
fold_keys(Ref, NumKeys),
bitcask:close(Ref),
Caller ! fold_keys_done.
fold_keys(Ref, NumKeys) ->
Folder = fun(#bitcask_entry{key = <<Key:32>>}, Keys) ->
%io:format(user, "FoldedKey: ~p\n", [Key]),
[Key | Keys]
end,
FoldedKeys = bitcask:fold_keys(Ref, Folder, []),
check_fold(1, NumKeys, lists:sort(FoldedKeys)).
check_fold(Key, MaxKey, []) when Key == MaxKey + 1 ->
ok;
check_fold(Key, _MaxKey, []) ->
io:format("Fold missing key ~p (stopping searching)\n", [Key]);
check_fold(Key, MaxKey, [Key | Rest]) ->
check_fold(Key + 1, MaxKey, Rest);
check_fold(Key1, _MaxKey, [_Key2 | _Rest]) ->
io:format("Fold missing key ~p (stopping searching)\n", [Key1]).
merge_proc(Cask, Caller) ->
receive
{kick_merge, Filenames} ->
try
ok = bitcask:merge(Cask, [], Filenames)
catch
%% Maybe keydir is being loaded
_:{error, not_ready} ->
ok
end,
Caller ! merge_done,
merge_proc(Cask, Caller);
stop ->
ok
end.
ensure_bitcask() ->
case code:ensure_loaded(bitcask) of
{module, bitcask} ->
ok;
_ ->
{ok, Cwd} = file:get_cwd(),
find_bitcask(filename:split(Cwd))
end.
%% Look for bitcask.beam in Cwd and Cwd/ebin
find_bitcask(["/"]) ->
erlang:error("Could not find bitcask\n");
find_bitcask(Cwd) ->
case try_bitcask_dir(Cwd) of
true ->
ok;
false ->
case try_bitcask_dir(Cwd ++ ["ebin"]) of
true ->
ok;
false ->
find_bitcask(parent_dir(Cwd))
end
end.
try_bitcask_dir(Dir) ->
CodeDir = filename:join(Dir),
Beam = bitcask_beam(CodeDir),
io:format("Looking for bitcask in \"~s\".\n", [CodeDir]),
case filelib:is_regular(Beam) of
true ->
io:format("Adding bitcask dir \"~s\".\n",
[CodeDir]),
code:add_pathz(CodeDir),
{module, bitcask} = code:ensure_loaded(bitcask),
true;
_ ->
false
end.
ensure_deps() ->
BitcaskBeam = code:where_is_file("bitcask.beam"),
BitcaskDir = parent_dir(filename:split(filename:dirname(BitcaskBeam))),
Pattern = filename:join(BitcaskDir) ++ "/deps/*/ebin",
Deps = filelib:wildcard(Pattern),
AddDepDir = fun(DepDir) ->
io:format("Adding dependency dir \"~s\".\n",
[DepDir]),
code:add_pathz(DepDir)
end,
lists:foreach(AddDepDir, Deps).
parent_dir([]) ->
["/"];
parent_dir(["/"]) ->
["/"];
parent_dir(Dirs) ->
lists:reverse(tl(lists:reverse(Dirs))).
bitcask_beam(Cwd) ->
filename:join(Cwd, ["bitcask" ++ code:objfile_extension()]).
set_bitcask_env(State) ->
application:load(bitcask),
set_env(frag_merge_trigger, State#state.frag_merge_trigger),
set_env(dead_bytes_merge_trigger, State#state.dead_bytes_merge_trigger),
set_env(frag_threshold, State#state.frag_threshold),
set_env(dead_bytes_threshold, State#state.dead_bytes_threshold),
set_env(small_file_threshold, State#state.small_file_threshold),
set_env(max_fold_age, State#state.max_fold_age),
set_env(max_fold_puts, State#state.max_fold_puts),
io:format("Bitcask AppEnv:\n~p\n", [application:get_all_env(bitcask)]).
set_env(_, undefined) ->
ok;
set_env(Var, Val) ->
application:set_env(bitcask, Var, Val).
process_args([], State) ->
{ok, State};
process_args([Arg | Rest], State) ->
case process_arg(Arg, State) of
{ok, NewState} ->
process_args(Rest, NewState);
Reason ->
Reason
end.
process_arg({help, _}, _State) ->
syntax;
process_arg({Name, Val}, State) when Name =:= duration;
Name =:= status_freq;
Name =:= num_keys;
Name =:= readers;
Name =:= folders;
Name =:= foldkeys;
Name =:= mergers;
Name =:= max_file_size;
Name =:= open_timeout;
Name =:= needs_merge_freq;
Name =:= frag_merge_trigger;
Name =:= dead_bytes_merge_trigger;
Name =:= frag_threshold;
Name =:= dead_bytes_threshold;
Name =:= small_file_threshold ->
case is_integer(Val) of
true ->
{ok, setelement(get_state_index(Name), State, Val)};
false ->
{ok, setelement(get_state_index(Name), State, list_to_integer(Val))}
end;
process_arg({Name, Val}, State) when Name =:= restart_writer ->
case is_atom(Val) of
true ->
{ok, setelement(get_state_index(Name), State, Val)};
false ->
{ok, setelement(get_state_index(Name), State, list_to_atom(Val))}
end;
process_arg({Name, Val}, State) ->
{ok, setelement(get_state_index(Name), State, Val)}.
parse_arg([$- | Arg]) -> % strip leading --'s
parse_arg(Arg);
parse_arg(Arg) ->
case string:tokens(Arg, "=") of
[NameStr] ->
Val = undefined;
[NameStr, Val] ->
ok;
_ ->
NameStr = Val = undefined,
erlang:error({badarg, Arg})
end,
case catch list_to_existing_atom(NameStr) of
{'EXIT', {badarg, _}} ->
Name = undefined,
erlang:error({badarg, NameStr});
Name ->
ok
end,
{Name, Val}.
get_state_index(Name) ->
get_state_index(Name, 2, record_info(fields, state)). % first element is 'state'
get_state_index(Name, _Index, []) ->
io:format("Cannot find index in state for ~p\n", [Name]),
erlang:error({badarg, Name});
get_state_index(Name, Index, [Name | _Rest]) ->
Index;
get_state_index(Name, Index, [_OtherName | Rest]) ->
get_state_index(Name, Index + 1, Rest).
print_params(State) ->
io:format("Bitcask Test\n"),
io:format("duration: ~s\n", [format_duration(State#state.duration)]),
io:format("cask: ~s\n", [State#state.cask]),
io:format("num_keys: ~b\n", [State#state.num_keys]),
io:format("readers: ~b\n", [State#state.readers]),
io:format("folders: ~b\n", [State#state.folders]),
io:format("foldkeys: ~b\n", [State#state.foldkeys]),
io:format("mergers: ~b\n", [State#state.mergers]),
io:format("max_file_size: ~s\n", [format_max_file_size(State#state.max_file_size)]),
io:format("needs_merge_freq: ~b\n", [State#state.needs_merge_freq]),
io:format("open_timeout: ~s\n", [format_timeout(State#state.open_timeout)]),
io:format("\n").
format_duration(undefined) ->
"once";
format_duration(Duration) ->
io_lib:format("~p ms", [Duration]).
format_timeout(undefined) ->
"default";
format_timeout(Secs) ->
io_lib:format("~b s", [Secs]).
format_max_file_size(MaxFileSize) when is_integer(MaxFileSize), MaxFileSize > 0 ->
io_lib:format("~b", [MaxFileSize]);
format_max_file_size(_) ->
"default".
Jump to Line
Something went wrong with that request. Please try again.