Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

initial draft

  • Loading branch information...
commit 4df882f03fdd067f345f40a13d873fdf6eead692 1 parent e0589b2
@evanmcc evanmcc authored
View
2  src/basho_bench_driver_2i.erl
@@ -21,6 +21,8 @@
%% -------------------------------------------------------------------
-module(basho_bench_driver_2i).
+-compile({parse_transform, basho_bench_provide}).
+
-export([new/1,
run/4]).
View
2  src/basho_bench_driver_bitcask.erl
@@ -21,6 +21,8 @@
%% -------------------------------------------------------------------
-module(basho_bench_driver_bitcask).
+-compile({parse_transform, basho_bench_provide}).
+
-export([new/1,
run/4]).
View
2  src/basho_bench_driver_cassandra.erl
@@ -21,6 +21,8 @@
%% -------------------------------------------------------------------
-module(basho_bench_driver_cassandra).
+-compile({parse_transform, basho_bench_provide}).
+
-export([new/1,
run/4]).
View
4 src/basho_bench_driver_cassandra_cql.erl
@@ -21,6 +21,8 @@
%% -------------------------------------------------------------------
-module(basho_bench_driver_cassandra_cql).
+-compile({parse_transform, basho_bench_provide}).
+
-export([new/1,
run/4]).
@@ -123,4 +125,4 @@ list_to_hexstr([H|T]) ->
to_hex(H) ++ list_to_hexstr(T).
bin_to_hexstr(Bin) ->
- lists:concat(["abcdef0123",list_to_hexstr(binary_to_list(Bin))]).
+ lists:concat(["abcdef0123",list_to_hexstr(binary_to_list(Bin))]).
View
2  src/basho_bench_driver_dets.erl
@@ -21,6 +21,8 @@
%% -------------------------------------------------------------------
-module(basho_bench_driver_dets).
+-compile({parse_transform, basho_bench_provide}).
+
-export([new/1,
run/4]).
View
2  src/basho_bench_driver_hibari.erl
@@ -21,6 +21,8 @@
%% -------------------------------------------------------------------
-module(basho_bench_driver_hibari).
+-compile({parse_transform, basho_bench_provide}).
+
-export([new/1,
run/4]).
View
2  src/basho_bench_driver_http_raw.erl
@@ -21,6 +21,8 @@
%% -------------------------------------------------------------------
-module(basho_bench_driver_http_raw).
+-compile({parse_transform, basho_bench_provide}).
+
-export([new/1,
run/4]).
View
2  src/basho_bench_driver_innostore.erl
@@ -21,6 +21,8 @@
%% -------------------------------------------------------------------
-module(basho_bench_driver_innostore).
+-compile({parse_transform, basho_bench_provide}).
+
-export([new/1,
run/4]).
View
2  src/basho_bench_driver_null.erl
@@ -21,6 +21,8 @@
%% -------------------------------------------------------------------
-module(basho_bench_driver_null).
+-compile({parse_transform, basho_bench_provide}).
+
-export([new/1,
run/4]).
View
2  src/basho_bench_driver_riakc_java.erl
@@ -21,6 +21,8 @@
%% -------------------------------------------------------------------
-module(basho_bench_driver_riakc_java).
+-compile({parse_transform, basho_bench_provide}).
+
-export([new/1,
run/4]).
View
47 src/basho_bench_driver_riakc_pb.erl
@@ -21,6 +21,8 @@
%% -------------------------------------------------------------------
-module(basho_bench_driver_riakc_pb).
+-compile({parse_transform, basho_bench_provide}).
+
-export([new/1,
run/4,
mapred_valgen/2,
@@ -127,6 +129,16 @@ run(get, KeyGen, _ValueGen, State) ->
{error, Reason} ->
{error, Reason, State}
end;
+run(get_provided, Key, _ValueGen, State) ->
+ case riakc_pb_socket:get(State#state.pid, State#state.bucket, Key,
+ [{r, State#state.r}]) of
+ {ok, _} ->
+ {ok, State};
+ {error, notfound} ->
+ {ok, State};
+ {error, Reason} ->
+ {error, Reason, State}
+ end;
run(get_existing, KeyGen, _ValueGen, State) ->
Key = KeyGen(),
case riakc_pb_socket:get(State#state.pid, State#state.bucket, Key,
@@ -148,6 +160,16 @@ run(put, KeyGen, ValueGen, State) ->
{error, Reason} ->
{error, Reason, State}
end;
+run(put_provided, Key, Value, State) ->
+ Robj0 = riakc_obj:new(State#state.bucket, Key),
+ Robj = riakc_obj:update_value(Robj0, Value),
+ case riakc_pb_socket:put(State#state.pid, Robj, [{w, State#state.w},
+ {dw, State#state.dw}]) of
+ ok ->
+ {ok, State};
+ {error, Reason} ->
+ {error, Reason, State}
+ end;
run(update, KeyGen, ValueGen, State) ->
Key = KeyGen(),
case riakc_pb_socket:get(State#state.pid, State#state.bucket,
@@ -172,6 +194,31 @@ run(update, KeyGen, ValueGen, State) ->
{error, Reason, State}
end
end;
+run(update_provided, Key, Value, State) ->
+ case riakc_pb_socket:get(State#state.pid, State#state.bucket,
+ Key, [{r, State#state.r}]) of
+ {ok, Robj} ->
+ Robj2 = riakc_obj:update_value(Robj, Value),
+ case riakc_pb_socket:put(State#state.pid,
+ Robj2, [{w, State#state.w},
+ {dw, State#state.dw}]) of
+ ok ->
+ {ok, State};
+ {error, Reason} ->
+ {error, Reason, State}
+ end;
+ {error, notfound} ->
+ Robj0 = riakc_obj:new(State#state.bucket, Key),
+ Robj = riakc_obj:update_value(Robj0, Value),
+ case riakc_pb_socket:put(State#state.pid,
+ Robj, [{w, State#state.w},
+ {dw, State#state.dw}]) of
+ ok ->
+ {ok, State};
+ {error, Reason} ->
+ {error, Reason, State}
+ end
+ end;
run(update_existing, KeyGen, ValueGen, State) ->
Key = KeyGen(),
case riakc_pb_socket:get(State#state.pid, State#state.bucket,
View
2  src/basho_bench_driver_riakclient.erl
@@ -21,6 +21,8 @@
%% -------------------------------------------------------------------
-module(basho_bench_driver_riakclient).
+-compile({parse_transform, basho_bench_provide}).
+
-export([new/1,
run/4]).
View
3  src/basho_bench_valgen.erl
@@ -30,6 +30,9 @@
%% API
%% ====================================================================
+new({fixed_bin, var_size}, _Id) ->
+ Source = init_source(),
+ fun(Size) -> data_block(Source, Size) end;
new({fixed_bin, Size}, _Id) ->
Source = init_source(),
fun() -> data_block(Source, Size) end;
View
154 src/basho_bench_worker.erl
@@ -38,6 +38,7 @@
driver,
driver_state,
shutdown_on_error,
+ patterns,
ops,
ops_len,
rng_seed,
@@ -45,6 +46,11 @@
worker_pid,
sup_id}).
+-record(pattern, {name,
+ valgen,
+ objects=[],
+ operations=[]}).
+
-include("basho_bench.hrl").
%% ====================================================================
@@ -70,30 +76,37 @@ init([SupChild, Id]) ->
%% Setup RNG seed for worker sub-process to use; incorporate the ID of
%% the worker to ensure consistency in load-gen
%%
- %% NOTE: If the worker process dies, this obviously introduces some entroy
+ %% NOTE: If the worker process dies, this obviously introduces some entropy
%% into the equation since you'd be restarting the RNG all over.
process_flag(trap_exit, true),
- {A1, A2, A3} = basho_bench_config:get(rng_seed),
+ {A1, A2, A3} = case basho_bench_config:get(rng_seed, now()) of
+ {AA, AB, AC} -> {AA, AB, AC};
+ now -> now()
+ end,
RngSeed = {A1+Id, A2+Id, A3+Id},
%% Pull all config settings from environment
Driver = basho_bench_config:get(driver),
- Ops = ops_tuple(),
+ Patterns = parse_patterns(basho_bench_config:get(patterns, [])),
+ Ops = ops_tuple(basho_bench_config:get(operations),
+ Patterns, Driver),
+
ShutdownOnError = basho_bench_config:get(shutdown_on_error, false),
%% Finally, initialize key and value generation. We pass in our ID to the
%% initialization to enable (optional) key/value space partitioning
KeyGen = basho_bench_keygen:new(basho_bench_config:get(key_generator), Id),
ValGen = basho_bench_valgen:new(basho_bench_config:get(value_generator), Id),
-
- State = #state { id = Id, keygen = KeyGen, valgen = ValGen,
- driver = Driver,
- shutdown_on_error = ShutdownOnError,
- ops = Ops, ops_len = size(Ops),
- rng_seed = RngSeed,
- parent_pid = self(),
- sup_id = SupChild},
-
+ State =
+ #state { id = Id, keygen = KeyGen, valgen = ValGen,
+ driver = Driver,
+ shutdown_on_error = ShutdownOnError,
+ ops = Ops, ops_len = size(Ops),
+ patterns = Patterns,
+ rng_seed = RngSeed,
+ parent_pid = self(),
+ sup_id = SupChild},
+
%% Use a dedicated sub-process to do the actual work. The work loop may need
%% to sleep or otherwise delay in a way that would be inappropriate and/or
%% inefficient for a gen_server. Furthermore, we want the loop to be as
@@ -118,7 +131,7 @@ init([SupChild, Id]) ->
false ->
ok
end,
-
+
{ok, State#state { worker_pid = WorkerPid }}.
handle_call(run, _From, State) ->
@@ -175,16 +188,71 @@ stop_worker(SupChild) ->
%%
%% Expand operations list into tuple suitable for weighted, random draw
%%
-ops_tuple() ->
+ops_tuple(List0, Patterns, Driver) ->
F =
fun({OpTag, Count}) ->
lists:duplicate(Count, {OpTag, OpTag});
({Label, OpTag, Count}) ->
lists:duplicate(Count, {Label, OpTag})
end,
- Ops = [F(X) || X <- basho_bench_config:get(operations, [])],
+ List = [OoP || OoP <- List0,
+ begin
+ valid_op_or_pattern(OoP, Patterns, Driver)
+ end],
+ case (List0 -- List) of
+ [] -> ok;
+ Rem -> lager:info("Some invalid operations or patterns"
+ " were discarded: ~p~n", [Rem])
+ end,
+ Ops = [F(X) || X <- List],
list_to_tuple(lists:flatten(Ops)).
+parse_patterns(PatternList) ->
+ lists:map(fun(P0) ->
+ V = basho_bench_valgen:new({fixed_bin,
+ var_size}, ign),
+ {Name, P} = P0,
+ Pattern = #pattern{valgen=V, name=Name},
+ parse_pattern_loop(P, Pattern)
+ end, PatternList).
+
+parse_pattern_loop([], Record) ->
+ Record;
+parse_pattern_loop([Pat|Rest], Record) ->
+ NewRecord =
+ case Pat of
+ {obj, Name, Size} ->
+ lager:info("defining object ~p", [Name]),
+ Objs = Record#pattern.objects,
+ Record#pattern{objects=[{Name, Size}|Objs]};
+ {op, delay, Length} ->
+ lager:info("adding delay of ~p", [Length]),
+ Ops = Record#pattern.operations,
+ Record#pattern{operations=Ops ++ [{delay, Length}]};
+ {op, Op, Obj} ->
+ lager:info("adding operation ~p on object ~p", [Op, Obj]),
+ case lists:filter(fun({Name,_}) ->
+ Name == Obj
+ end, Record#pattern.objects) of
+ [] ->
+ error("pattern referenced non-existent "
+ "variable: " ++ atom_to_list(Obj));
+ _ ->
+ Ops = Record#pattern.operations,
+ Record#pattern{operations=Ops ++ [{Op, Obj}]}
+ end
+ end,
+ parse_pattern_loop(Rest, NewRecord).
+
+valid_op_or_pattern({OoP, _}, Patterns, Driver) ->
+ P = case lists:filter(fun(X) ->
+ X#pattern.name == OoP
+ end, Patterns) of
+ [] -> false;
+ _ -> true
+ end,
+ O = lists:member(OoP, Driver:provides()),
+ O orelse P.
worker_init(State) ->
%% Trap exits from linked parent process; use this to ensure the driver
@@ -221,14 +289,62 @@ worker_idle_loop(State) ->
end
end.
-worker_next_op2(State, OpTag) ->
- catch (State#state.driver):run(OpTag, State#state.keygen, State#state.valgen,
- State#state.driver_state).
+do_pattern(State, PatternName) ->
+ Pattern = get_pattern(State#state.patterns, PatternName),
+ Env = create_objects(Pattern#pattern.objects,
+ State#state.keygen,
+ Pattern#pattern.valgen,
+ orddict:new()),
+ execute_pattern(State, Pattern#pattern.operations, Env).
+
+get_pattern(Patterns, Name) ->
+ [Pat] = lists:filter(fun(X) ->
+ X#pattern.name == Name
+ end, Patterns),
+ Pat.
+
+create_objects([], _, _, Acc) ->
+ Acc;
+create_objects([O|T], KeyGen, ValGen, Acc) ->
+ {Name, Size} = O,
+ K = KeyGen(),
+ V = ValGen(Size),
+ NewAcc = orddict:store(Name, {K,V}, Acc),
+ create_objects(T, KeyGen, ValGen, NewAcc).
+
+execute_pattern(State, [Step|T], Env) ->
+ lager:debug("executing step: ~p", [Step]),
+ Res = case Step of
+ {delay, Usec} -> timer:sleep(Usec);
+ {Tag, Obj} ->
+ {ok, O} = orddict:find(Obj, Env),
+ do_op_on_obj(State, Tag, O)
+ end,
+ case T of
+ [] -> Res;
+ _ -> execute_pattern(State, T, Env)
+ end.
+
+do_op_on_obj(State, OpTag0, Obj) ->
+ OpTag = list_to_atom(atom_to_list(OpTag0) ++ "_provided"),
+ {Key, Val} = Obj,
+ catch (State#state.driver):run(OpTag, Key, Val,
+ State#state.driver_state).
+
+do_op(State, OpTag) ->
+ catch (State#state.driver):run(OpTag,
+ State#state.keygen,
+ State#state.valgen,
+ State#state.driver_state).
+
worker_next_op(State) ->
Next = element(random:uniform(State#state.ops_len), State#state.ops),
{_Label, OpTag} = Next,
Start = os:timestamp(),
- Result = worker_next_op2(State, OpTag),
+ Result = case lists:member(OpTag, (State#state.driver):provides()) of
+ true -> do_op(State, OpTag);
+ false -> do_pattern(State, OpTag)
+ end,
ElapsedUs = erlang:max(0, timer:now_diff(os:timestamp(), Start)),
case Result of
{Res, DriverState} when Res == ok orelse element(1, Res) == ok ->
Please sign in to comment.
Something went wrong with that request. Please try again.