Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge branch 'master' into bwf-check-mapred-return

  • Loading branch information...
commit 43989bc3d0ad23ab27a32a001e8541ebbe84a385 2 parents 72ab5d7 + 2850254
@beerriot beerriot authored
Showing with 844 additions and 313 deletions.
  1. +9 −0 Makefile
  2. +32 −29 examples/2i.config
  3. +1 −2  examples/bitcask.config
  4. +21 −0 examples/bitcask_expiry.config
  5. +1 −3 examples/casbench.config
  6. +0 −2  examples/httpraw.config
  7. +1 −2  examples/innostore_test.config
  8. +0 −2  examples/null_err_test.config
  9. +0 −2  examples/null_test.config
  10. +0 −2  examples/riakc_java.config
  11. +0 −4 examples/riakc_mr.config
  12. +0 −4 examples/riakc_pb.config
  13. +1 −2  examples/riakclient.config
  14. +6 −9 priv/common.r
  15. +4 −2 priv/summary.r
  16. +8 −7 rebar.config
  17. +1 −23 ebin/basho_bench.app → src/basho_bench.app.src
  18. +12 −7 src/basho_bench.erl
  19. +4 −3 src/basho_bench_app.erl
  20. +269 −166 src/basho_bench_driver_2i.erl
  21. +22 −1 src/basho_bench_driver_bitcask.erl
  22. +10 −1 src/basho_bench_driver_http_raw.erl
  23. +1 −1  src/basho_bench_driver_riakc_pb.erl
  24. +7 −4 src/basho_bench_java_client.erl
  25. +2 −0  src/basho_bench_keygen.erl
  26. +139 −0 src/basho_bench_measurement.erl
  27. +113 −0 src/basho_bench_measurement_erlangvm.erl
  28. +93 −26 src/basho_bench_stats.erl
  29. +12 −3 src/basho_bench_sup.erl
  30. +4 −1 src/basho_bench_valgen.erl
  31. +12 −5 src/basho_bench_worker.erl
  32. +59 −0 src/uuid.erl
View
9 Makefile
@@ -17,3 +17,12 @@ distclean: clean
results:
priv/summary.r -i tests/current
+
+byte_sec-results:
+ priv/summary.r --ylabel1stgraph byte/sec -i tests/current
+
+kbyte_sec-results:
+ priv/summary.r --ylabel1stgraph Kbyte/sec -i tests/current
+
+mbyte_sec-results:
+ priv/summary.r --ylabel1stgraph Mbyte/sec -i tests/current
View
61 examples/2i.config
@@ -1,45 +1,48 @@
-%% Run this once with only the 'put' operation enabled to preload
-%% data.
-%%
-%% Run again with the '_eq_query' or '_range_query' operations enabled
-%% to perform equality or range lookups.
-%%
-%% Note that with even a nominal amount of data loaded, the range
-%% operations get quite large.
+{driver,
+ basho_bench_driver_2i}.
{operations, [
- {put, 1}
- % {get, 1}
+ {get_pb, 1},
+ {{put_pb, 5}, 20},
+ {{query_http, 10}, 1},
+ {{query_mr, 10}, 1},
+ {{query_pb, 10}, 1}
+]}.
- % {int_eq_query, 1},
- % {bin_eq_query, 1}
+{measurement_driver,
+ basho_bench_measurement_erlangvm}.
- % {int_range_query, 1},
- % {bin_range_query, 1}
- ]}.
+{measurements, [
+ {memory, 1000},
+ {cpu, 1000},
+ {processes, 1000},
+ {filehandles, 1000}
+]}.
-{mode, max}.
+%%% LOAD SETTINGS %%%
+{mode, max}.
{duration, 1}.
-
{concurrent, 3}.
-{driver, basho_bench_driver_2i}.
+%%% DATA SHAPE %%%
-{code_paths, ["deps/stats",
- "deps/riakc",
- "deps/protobuffs"]}.
+{key_generator, {uniform_int, 1000}}.
+{value_generator, {fixed_bin, 1000}}.
-{key_generator, {int_to_bin, {uniform_int, 10000}}}.
+{pb_ips, [{127,0,0,1}]}.
+{pb_replies, 1}.
-{value_generator, {fixed_bin, 100}}.
+{http_hosts, ["127.0.0.1"]}.
+{http_port, 8098}.
-{riakc_pb_ips, [{127,0,0,1}]}.
+{rng_seed, {1, 2, 3}}.
-{riakc_pb_replies, 1}.
+%%% MEASUREMENT SETTINGS %%%
-{num_integer_indexes, 3}.
-{num_binary_indexes, 3}.
-{binary_index_size, 20}.
+{nodes, ['riak@127.0.0.1']}.
+{cookie, riak}.
-{rng_seed, {1, 2, 3}}.
+%%% CODE PATHS %%%
+
+{code_paths, ["/Users/rusty/Documents/Basho/RiakKV/deps/mochiweb"]}.
View
3  examples/bitcask.config
@@ -14,8 +14,7 @@
%% the second element in the list below (e.g., "../../public/bitcask") must point to
%% the relevant directory of a bitcask installation
-{code_paths, ["deps/stats",
- "../../public/bitcask"]}.
+{code_paths, ["../../public/bitcask"]}.
{bitcask_dir, "/tmp/bitcask.bench"}.
View
21 examples/bitcask_expiry.config
@@ -0,0 +1,21 @@
+{mode, {rate, 100}}.
+
+{duration, 2}.
+
+{concurrent, 1}.
+
+{driver, basho_bench_driver_bitcask}.
+
+{key_generator, uuid_v4}.
+
+{value_generator, {fixed_bin, 10000}}.
+
+{operations, [{put, 9}, {merge, 1}]}.
+
+{code_paths, ["/Users/dizzyd/src/bitcask"]}.
+
+{bitcask_dir, "/tmp/bitcask.bench"}.
+
+{bitcask_flags, [{max_file_size, 5242880},
+ {log_needs_merge, true},
+ {expiry_secs, 10}]}.
View
4 examples/casbench.config
@@ -12,6 +12,4 @@
{operations, [{get, 1}, {put, 1}]}.
-{code_paths, ["deps/stats",
- "deps/ibrowse",
- "deps/casbench"]}.
+{code_paths, []}.
View
2  examples/httpraw.config
@@ -6,8 +6,6 @@
{driver, basho_bench_driver_http_raw}.
-{code_paths, ["deps/ibrowse"]}.
-
%% Example syntax (mykeygen_seq is not defined)
%% {key_generator, {function, test, mykeygen_seq, [10000, 10, 10, 100]}}.
View
3  examples/innostore_test.config
@@ -9,8 +9,7 @@
%% the second element in the list below (e.g., "../innostore") must point to
%% the relevant directory of an innostore installation
-{code_paths, ["deps/stats",
- "../innostore"]}.
+{code_paths, ["../innostore"]}.
{key_generator, {int_to_bin, {uniform_int, 500000}}}.
View
2  examples/null_err_test.config
@@ -6,8 +6,6 @@
{driver, basho_bench_driver_null}.
-{code_paths, ["deps/stats"]}.
-
{key_generator, {int_to_bin, {sequential_int, 5000}}}.
{value_generator, {fixed_bin, 10248}}.
View
2  examples/null_test.config
@@ -6,8 +6,6 @@
{driver, basho_bench_driver_null}.
-{code_paths, ["deps/stats"]}.
-
{key_generator, {int_to_bin, {sequential_int, 5000000}}}.
{value_generator, {fixed_bin, 10248}}.
View
2  examples/riakc_java.config
@@ -6,8 +6,6 @@
{driver, basho_bench_driver_riakc_java}.
-{code_paths, ["deps/stats"]}.
-
{key_generator, {int_to_bin, {uniform_int, 10000}}}.
{value_generator, {fixed_bin, 10000}}.
View
4 examples/riakc_mr.config
@@ -1,10 +1,6 @@
% -*- mode: erlang -*-
{driver, basho_bench_driver_riakc_pb}.
-{code_paths, ["deps/stats",
- "deps/riakc",
- "deps/protobuffs"]}.
-
{riakc_pb_ips, [{127,0,0,1}]}.
{riakc_pb_replies, 1}.
View
4 examples/riakc_pb.config
@@ -6,10 +6,6 @@
{driver, basho_bench_driver_riakc_pb}.
-{code_paths, ["deps/stats",
- "deps/riakc",
- "deps/protobuffs"]}.
-
{key_generator, {int_to_bin, {uniform_int, 10000}}}.
{value_generator, {fixed_bin, 10000}}.
View
3  examples/riakclient.config
@@ -6,8 +6,7 @@
{driver, basho_bench_driver_riakclient}.
-{code_paths, ["deps/stats",
- "/Users/jmeredith/basho/riak/apps/riak_kv",
+{code_paths, ["/Users/jmeredith/basho/riak/apps/riak_kv",
"/Users/jmeredith/basho/riak/apps/riak_core"]}.
{key_generator, {int_to_bin, {uniform_int, 35000}}}.
View
15 priv/common.r
@@ -1,11 +1,11 @@
+# Load all the necessary packages, installing missing ones when necessary
+packages.to.install <- c("plyr", "grid", "getopt", "proto", "ggplot2")
-# Load a library, or attempt to install it if it's not available
-load_library <- function(Name)
+for(p in packages.to.install)
{
- if (!library(Name, character.only = TRUE, logical.return = TRUE))
- {
- install.packages(Name, repos = "http://lib.stat.cmu.edu/R/CRAN")
- }
+ print(p)
+ if (suppressWarnings(!require(p, character.only = TRUE))) install.packages(p, repos = "http://lib.stat.cmu.edu/R/CRAN")
+ if (p == "ggplot2") suppressWarnings(library(ggplot2))
}
# Load a latency file and ensure that it is appropriately tagged
@@ -43,6 +43,3 @@ load_benchmark <- function(Dir, Tstart, Tend)
latencies = latencies[latencies$elapsed >= Tstart & latencies$elapsed <= Tend,]))
}
-load_library("getopt")
-load_library("grid")
-load_library("ggplot2")
View
6 priv/summary.r
@@ -15,7 +15,8 @@ params = matrix(c(
'outfile', 'o', 2, "character",
'indir', 'i', 2, "character",
'tstart', '1', 2, "integer",
- 'tend', '2', 2, "integer"
+ 'tend', '2', 2, "integer",
+ 'ylabel1stgraph', 'Y', 2, "character"
), ncol=4, byrow=TRUE)
# Parse the parameters
@@ -32,6 +33,7 @@ if (is.null(opt$width)) { opt$width = 1024 }
if (is.null(opt$height)) { opt$height = 768 }
if (is.null(opt$indir)) { opt$indir = "current"}
if (is.null(opt$outfile)) { opt$outfile = file.path(opt$indir, "summary.png") }
+if (is.null(opt$ylabel1stgraph)) { opt$ylabel1stgraph = "Op/sec" }
# Load the benchmark data, passing the time-index range we're interested in
b = load_benchmark(opt$indir, opt$tstart, opt$tend)
@@ -47,7 +49,7 @@ png(file = opt$outfile, width = opt$width, height = opt$height)
# First plot req/sec from summary
plot1 <- qplot(elapsed, successful / window, data = b$summary,
geom = c("smooth", "point"),
- xlab = "Elapsed Secs", ylab = "Op/sec",
+ xlab = "Elapsed Secs", ylab = opt$ylabel1stgraph,
main = "Throughput") +
geom_smooth(aes(y = failed / window, colour = "Errors")) +
scale_colour_manual(name = "", values = c("red"))
View
15 rebar.config
@@ -1,11 +1,12 @@
-{deps, [
- {stats, ".*", {git, "git://github.com/dizzyd/stats.git", "HEAD"}},
- {ibrowse, ".*", {git, "git://github.com/dizzyd/ibrowse.git", "HEAD"}},
- {casbench, "0.1", {git, "git://github.com/basho/casbench", "HEAD"}},
- {riakc, ".*", {git, "git://github.com/basho/riak-erlang-client", "HEAD"}}
- ]}.
+{deps,
+ [
+ {basho_stats, ".*", {git, "git://github.com/basho/basho_stats.git", "HEAD"}},
+ {ibrowse, ".*", {git, "git://github.com/cmullaparthi/ibrowse.git", "HEAD"}},
+ {casbench, "0.1", {git, "git://github.com/basho/casbench", "HEAD"}},
+ {riakc, ".*", {git, "git://github.com/basho/riak-erlang-client", "HEAD"}}
+ ]}.
-{escript_incl_apps, [stats, ibrowse, riakc, protobuffs]}.
+{escript_incl_apps, [basho_stats, ibrowse, riakc, protobuffs]}.
%% Uncomment to use the Java client bench driver
%% {escript_emu_args, "%%! -name bb@127.0.0.1 -setcookie YOUR_ERLANG_COOKIE\n"}.
View
24 ebin/basho_bench.app → src/basho_bench.app.src
@@ -1,29 +1,7 @@
{application, basho_bench,
[{description, "Riak Benchmarking Suite"},
{vsn, "0.1"},
- {modules, [
- basho_bench,
- basho_bench_app,
- basho_bench_config,
- basho_bench_driver_2i,
- basho_bench_driver_dets,
- basho_bench_driver_http_raw,
- basho_bench_driver_innostore,
- basho_bench_driver_riakc_pb,
- basho_bench_driver_riakclient,
- basho_bench_driver_cassandra,
- basho_bench_driver_bitcask,
- basho_bench_driver_hibari,
- basho_bench_driver_null,
- basho_bench_driver_riakc_java,
- basho_bench_java_client,
- basho_bench_log,
- basho_bench_keygen,
- basho_bench_stats,
- basho_bench_sup,
- basho_bench_worker,
- basho_bench_valgen
- ]},
+ {modules, []},
{registered, [ basho_bench_sup ]},
{applications, [kernel,
stdlib,
View
19 src/basho_bench.erl
@@ -119,13 +119,18 @@ add_code_paths([]) ->
ok;
add_code_paths([Path | Rest]) ->
Absname = filename:absname(Path),
- case filename:basename(Absname) of
- "ebin" ->
- true = code:add_path(Absname);
- _ ->
- true = code:add_path(filename:join(Absname, "ebin"))
- end,
- add_code_paths(Rest).
+ CodePath = case filename:basename(Absname) of
+ "ebin" ->
+ Absname;
+ _ ->
+ filename:join(Absname, "ebin")
+ end,
+ case code:add_path(CodePath) of
+ true ->
+ add_code_paths(Rest);
+ Error ->
+ ?FAIL_MSG("Failed to add ~p to code_path: ~p\n", [CodePath, Error])
+ end.
%%
View
7 src/basho_bench_app.erl
@@ -44,7 +44,7 @@ start() ->
%% Make sure crypto is available
ok = application:start(crypto),
-
+
%% Start up our application -- mark it as permanent so that the node
%% will be killed if we go down
application:start(basho_bench, permanent).
@@ -54,7 +54,7 @@ stop() ->
is_running() ->
application:get_env(basho_bench_app, is_running) == {ok, true}.
-
+
%% ===================================================================
%% Application callbacks
@@ -64,9 +64,10 @@ start(_StartType, _StartArgs) ->
{ok, Pid} = basho_bench_sup:start_link(),
application:set_env(basho_bench_app, is_running, true),
ok = basho_bench_stats:run(),
+ ok = basho_bench_measurement:run(),
ok = basho_bench_worker:run(basho_bench_sup:workers()),
{ok, Pid}.
-
+
stop(_State) ->
ok.
View
435 src/basho_bench_driver_2i.erl
@@ -26,16 +26,12 @@
-include("basho_bench.hrl").
--record(state, { pid,
- bucket,
- r,
- w,
- dw,
- rw,
- num_integer_indexes,
- num_binary_indexes,
- binary_index_size
- }).
+-record(state, {
+ pb_pid,
+ http_host,
+ http_port,
+ bucket
+ }).
%% ====================================================================
@@ -43,57 +39,39 @@
%% ====================================================================
new(Id) ->
- %% Make sure the path is setup such that we can get at riak_client
- case code:which(riakc_pb_socket) of
- non_existing ->
- ?FAIL_MSG("~s requires riakc_pb_socket module to be available on code path.\n",
- [?MODULE]);
- _ ->
- ok
- end,
-
- Ips = basho_bench_config:get(riakc_pb_ips, [{127,0,0,1}]),
- Port = basho_bench_config:get(riakc_pb_port, 8087),
-
- %% riakc_pb_replies sets defaults for R, W, DW and RW.
- %% Each can be overridden separately
- Replies = basho_bench_config:get(riakc_pb_replies, 2),
- R = basho_bench_config:get(riakc_pb_r, Replies),
- W = basho_bench_config:get(riakc_pb_w, Replies),
- DW = basho_bench_config:get(riakc_pb_dw, Replies),
- RW = basho_bench_config:get(riakc_pb_rw, Replies),
- Bucket = basho_bench_config:get(riakc_pb_bucket, <<"test">>),
-
- %% How many indexes should we make?
- NumIntegerIndexes = basho_bench_config:get(num_integer_indexes, 3),
- NumBinaryIndexes = basho_bench_config:get(num_binary_indexes, 3),
- BinaryIndexSize = basho_bench_config:get(binary_index_size, 20),
-
- %% Choose the node using our ID as a modulus
- TargetIp = lists:nth((Id rem length(Ips)+1), Ips),
- ?INFO("Using target ip ~p for worker ~p\n", [TargetIp, Id]),
-
- case riakc_pb_socket:start_link(TargetIp, Port) of
+ %% Ensure that ibrowse is started...
+ application:start(ibrowse),
+
+ %% Ensure that riakc library is in the path...
+ ensure_module(riakc_pb_socket),
+ ensure_module(mochijson2),
+
+ %% Read config settings...
+ PBIPs = basho_bench_config:get(pb_ips, [{127,0,0,1}]),
+ PBPort = basho_bench_config:get(pb_port, 8087),
+ HTTPHosts = basho_bench_config:get(http_hosts, ["127.0.0.1"]),
+ HTTPPort = basho_bench_config:get(http_port, 8098),
+ Bucket = basho_bench_config:get(riakc_pb_bucket, <<"mybucket">>),
+
+ PBIP = choose(Id, PBIPs),
+ case riakc_pb_socket:start_link(PBIP, PBPort) of
{ok, Pid} ->
- {ok, #state { pid = Pid,
- bucket = Bucket,
- r = R,
- w = W,
- dw = DW,
- rw = RW,
- num_integer_indexes = NumIntegerIndexes,
- num_binary_indexes = NumBinaryIndexes,
- binary_index_size = BinaryIndexSize
- }};
+ {ok, #state {
+ pb_pid = Pid,
+ http_host = choose(Id, HTTPHosts),
+ http_port = HTTPPort,
+ bucket = Bucket }};
{error, Reason2} ->
?FAIL_MSG("Failed to connect riakc_pb_socket to ~p port ~p: ~p\n",
- [TargetIp, Port, Reason2])
+ [PBIP, PBPort, Reason2])
end.
-run(get, KeyGen, _ValueGen, State) ->
- Key = KeyGen(),
- case riakc_pb_socket:get(State#state.pid, State#state.bucket, Key,
- [{r, State#state.r}]) of
+%% Get a single object.
+run(get_pb, KeyGen, _ValueGen, State) ->
+ Pid = State#state.pb_pid,
+ Bucket = State#state.bucket,
+ Key = to_binary(KeyGen()),
+ case riakc_pb_socket:get(Pid, Bucket, Key) of
{ok, _Obj} ->
{ok, State};
{error, notfound} ->
@@ -101,90 +79,213 @@ run(get, KeyGen, _ValueGen, State) ->
{error, Reason} ->
{error, Reason, State}
end;
-run(put, KeyGen, ValueGen, State) ->
- %% Generate key, value, and metadata...
- Key = KeyGen(),
+
+%% Put an object with N indices.
+run({put_pb, N}, KeyGen, ValueGen, State) ->
+ Pid = State#state.pb_pid,
+ Bucket = State#state.bucket,
+ Key = to_integer(KeyGen()),
Value = ValueGen(),
- Indexes =
- generate_integer_indexes(Key, State#state.num_integer_indexes) ++
- generate_binary_indexes(Key, State#state.num_binary_indexes, State#state.binary_index_size),
+ Indexes = generate_integer_indexes_for_key(Key, N),
MetaData = dict:from_list([{<<"index">>, Indexes}]),
%% Create the object...
- Robj0 = riakc_obj:new(State#state.bucket, Key),
+ Robj0 = riakc_obj:new(Bucket, to_binary(Key)),
Robj1 = riakc_obj:update_value(Robj0, Value),
Robj2 = riakc_obj:update_metadata(Robj1, MetaData),
%% Write the object...
- case riakc_pb_socket:put(State#state.pid, Robj2, [{w, State#state.w},
- {dw, State#state.dw}]) of
+ case riakc_pb_socket:put(Pid, Robj2) of
ok ->
{ok, State};
{error, Reason} ->
{error, Reason, State}
end;
-run(int_eq_query, KeyGen, _ValueGen, State) ->
- Key = KeyGen(),
- [{Field,Term}|_] =
- generate_integer_indexes(Key,
- State#state.num_integer_indexes),
- case riakc_pb_socket:get_index(State#state.pid,
- State#state.bucket,
- Field,
- Term) of
- {ok, _Results} ->
+
+%% Query results via the HTTP interface.
+run({query_http, N}, KeyGen, _ValueGen, State) ->
+ Host = State#state.http_host,
+ Port = State#state.http_port,
+ Bucket = State#state.bucket,
+ StartKey = to_integer(KeyGen()),
+ EndKey = StartKey + N - 1,
+ URL = io_lib:format("http://~s:~p/buckets/~s/index/field1_int/~p/~p",
+ [Host, Port, Bucket, StartKey, EndKey]),
+ case json_get(URL) of
+ {ok, {struct, Proplist}} ->
+ case proplists:get_value(<<"keys">>, Proplist) of
+ Results when length(Results) == N ->
+ {ok, State};
+ Results ->
+ io:format("Not enough results for query_http: ~p/~p/~p~n", [StartKey, EndKey, Results]),
+ {ok, State}
+ end;
+ {error, Reason} ->
+ io:format("[~s:~p] ERROR - Reason: ~p~n", [?MODULE, ?LINE, Reason]),
+ {error, Reason, State}
+ end;
+
+%% Query results via the M/R interface.
+run({query_mr, 1}, KeyGen, _ValueGen, State) ->
+ Host = State#state.http_host,
+ Port = State#state.http_port,
+ Bucket = State#state.bucket,
+ Key = to_integer(KeyGen()),
+ URL = io_lib:format("http://~s:~p/mapred", [Host, Port]),
+ Body = ["
+ {
+ \"inputs\":{
+ \"bucket\":\"", to_list(Bucket), "\",
+ \"index\":\"field1_int\",
+ \"key\":\"", to_list(Key), "\"
+ },
+ \"query\":[
+ {
+ \"reduce\":{
+ \"language\":\"erlang\",
+ \"module\":\"riak_kv_mapreduce\",
+ \"function\":\"reduce_identity\",
+ \"keep\":true
+ }
+ }
+ ]
+ }
+ "],
+ case json_post(URL, Body) of
+ {ok, Results} when length(Results) == 1 ->
+ {ok, State};
+ {ok, Results} ->
+ io:format("Not enough results for query_mr: ~p/~p~n", [Key, Results]),
+ {ok, State};
+ {error, Reason} ->
+ io:format("[~s:~p] ERROR - Reason: ~p~n", [?MODULE, ?LINE, Reason]),
+ {error, Reason, State}
+ end;
+run({query_mr, N}, KeyGen, _ValueGen, State) ->
+ Host = State#state.http_host,
+ Port = State#state.http_port,
+ Bucket = State#state.bucket,
+ StartKey = to_integer(KeyGen()),
+ EndKey = StartKey + N - 1,
+ URL = io_lib:format("http://~s:~p/mapred", [Host, Port]),
+ Body = ["
+ {
+ \"inputs\":{
+ \"bucket\":\"", to_list(Bucket), "\",
+ \"index\":\"field1_int\",
+ \"start\":\"",to_list(StartKey), "\",
+ \"end\":\"", to_list(EndKey), "\"
+ },
+ \"query\":[
+ {
+ \"reduce\":{
+ \"language\":\"erlang\",
+ \"module\":\"riak_kv_mapreduce\",
+ \"function\":\"reduce_identity\",
+ \"keep\":true
+ }
+ }
+ ]
+ }
+ "],
+ case json_post(URL, Body) of
+ {ok, Results} when length(Results) == N ->
+ {ok, State};
+ {ok, Results} ->
+ io:format("Not enough results for query_mr: ~p/~p/~p~n", [StartKey, EndKey, Results]),
+ {ok, State};
+ {error, Reason} ->
+ io:format("[~s:~p] ERROR - Reason: ~p~n", [?MODULE, ?LINE, Reason]),
+ {error, Reason, State}
+ end;
+
+run({query_mr2, 1}, KeyGen, _ValueGen, State) ->
+ Host = State#state.http_host,
+ Port = State#state.http_port,
+ Bucket = State#state.bucket,
+ Key = to_integer(KeyGen()),
+ URL = io_lib:format("http://~s:~p/mapred", [Host, Port]),
+ Body = ["
+ {
+ \"inputs\":{
+ \"bucket\":\"", to_list(Bucket), "\",
+ \"index\":\"field1_int\",
+ \"key\":\"", to_list(Key), "\"
+ },
+ \"query\":[]
+ }
+ "],
+ case json_post(URL, Body) of
+ {ok, Results} when length(Results) == 1 ->
+ {ok, State};
+ {ok, Results} ->
+ io:format("Not enough results for query_mr: ~p/~p~n", [Key, Results]),
{ok, State};
{error, Reason} ->
+ io:format("[~s:~p] ERROR - Reason: ~p~n", [?MODULE, ?LINE, Reason]),
{error, Reason, State}
end;
-run(int_range_query, KeyGen, _ValueGen, State) ->
- Key = KeyGen(),
- [{Field, StartTerm},{_, EndTerm}|_] =
- generate_integer_indexes(Key,
- State#state.num_integer_indexes),
- case riakc_pb_socket:get_index(State#state.pid,
- State#state.bucket,
- Field,
- lists:min([StartTerm, EndTerm]),
- lists:max([StartTerm, EndTerm])) of
- {ok, _Results} ->
+run({query_mr2, N}, KeyGen, _ValueGen, State) ->
+ Host = State#state.http_host,
+ Port = State#state.http_port,
+ Bucket = State#state.bucket,
+ StartKey = to_integer(KeyGen()),
+ EndKey = StartKey + N - 1,
+ URL = io_lib:format("http://~s:~p/mapred", [Host, Port]),
+ Body = ["
+ {
+ \"inputs\":{
+ \"bucket\":\"", to_list(Bucket), "\",
+ \"index\":\"field1_int\",
+ \"start\":\"",to_list(StartKey), "\",
+ \"end\":\"", to_list(EndKey), "\"
+ },
+ \"query\":[]
+ }
+ "],
+ case json_post(URL, Body) of
+ {ok, Results} when length(Results) == N ->
+ {ok, State};
+ {ok, Results} ->
+ io:format("Not enough results for query_mr: ~p/~p/~p~n", [StartKey, EndKey, Results]),
{ok, State};
{error, Reason} ->
+ io:format("[~s:~p] ERROR - Reason: ~p~n", [?MODULE, ?LINE, Reason]),
{error, Reason, State}
end;
-run(bin_eq_query, KeyGen, ValueGen, State) ->
- Key = KeyGen(),
- _Value = ValueGen(),
- [{Field, Term}|_] =
- generate_binary_indexes(Key,
- State#state.num_binary_indexes,
- State#state.binary_index_size),
- case riakc_pb_socket:get_index(State#state.pid,
- State#state.bucket,
- Field,
- Term) of
- {ok, _Results} ->
+
+%% Query results via the PB interface.
+run({query_pb, 1}, KeyGen, _ValueGen, State) ->
+ Pid = State#state.pb_pid,
+ Bucket = State#state.bucket,
+ Key = to_integer(KeyGen()),
+ case riakc_pb_socket:get_index(Pid, Bucket, <<"field1_int">>, to_binary(Key)) of
+ {ok, Results} when length(Results) == 1 ->
+ {ok, State};
+ {ok, Results} ->
+ io:format("Not enough results for query_pb: ~p/~p~n", [Key, Results]),
{ok, State};
{error, Reason} ->
+ io:format("[~s:~p] ERROR - Reason: ~p~n", [?MODULE, ?LINE, Reason]),
{error, Reason, State}
end;
-run(bin_range_query, KeyGen, ValueGen, State) ->
- Key = KeyGen(),
- _Value = ValueGen(),
- [{Field, StartTerm},{_, EndTerm}|_] =
- generate_binary_indexes(Key,
- State#state.num_binary_indexes,
- State#state.binary_index_size),
- case riakc_pb_socket:get_index(State#state.pid,
- State#state.bucket,
- Field,
- lists:min([StartTerm, EndTerm]),
- lists:max([StartTerm, EndTerm])) of
- {ok, _Results} ->
+run({query_pb, N}, KeyGen, _ValueGen, State) ->
+ Pid = State#state.pb_pid,
+ Bucket = State#state.bucket,
+ StartKey = to_integer(KeyGen()),
+ EndKey = StartKey + N - 1,
+ case riakc_pb_socket:get_index(Pid, Bucket, <<"field1_int">>,
+ to_binary(StartKey), to_binary(EndKey)) of
+ {ok, Results} when length(Results) == N ->
+ {ok, State};
+ {ok, Results} ->
+ io:format("Not enough results for query_pb: ~p/~p/~p~n", [StartKey, EndKey, Results]),
{ok, State};
{error, Reason} ->
+ io:format("[~s:~p] ERROR - Reason: ~p~n", [?MODULE, ?LINE, Reason]),
{error, Reason, State}
end;
+
run(Other, _, _, _) ->
throw({unknown_operation, Other}).
@@ -192,58 +293,60 @@ run(Other, _, _, _) ->
%% Internal functions
%% ====================================================================
-generate_integer_indexes(_, 0) ->
- [];
-generate_integer_indexes(Seed, N) when is_binary(Seed) ->
- %% Pull a field value out of the binary. In this case, a 32-bit integer...
- <<V:32/integer, _/binary>> = Seed,
-
- %% Create the field name...
- K = list_to_binary("field" ++ integer_to_list(N) ++ "_int"),
-
- %% Loop.
- [{K,V}|generate_integer_indexes(erlang:md5(Seed), N - 1)];
-generate_integer_indexes(Seed, _) when is_binary(Seed) ->
- throw({invalid_value, "Seed value must be a binary."}).
-
-
-generate_binary_indexes(_, 0, _) ->
- [];
-generate_binary_indexes(Seed, N, Size) when is_binary(Seed) ->
- %% Pull a field value out of the binary...
- V1 = generate_binary_index(Seed, Size),
-
- %% Create the field name and normalize the value...
- K = list_to_binary("field" ++ integer_to_list(N) ++ "_bin"),
- V2 = normalize_binary_index(V1),
-
- %% Loop.
- [{K,V2}|generate_binary_indexes(erlang:md5(Seed), N - 1, Size)];
-generate_binary_indexes(Seed, _N, _Size) when is_binary(Seed) ->
- throw({invalid_value, "Seed value must be a binary."}).
-
-generate_binary_index(Seed, Size) ->
- iolist_to_binary(generate_binary_index_1(Seed, Size)).
-generate_binary_index_1(_Seed, 0) ->
- [];
-generate_binary_index_1(Seed, Size) when Size >= 16 ->
- NewSeed = erlang:md5(Seed),
- [NewSeed|generate_binary_index_1(NewSeed, Size - 16)];
-generate_binary_index_1(Seed, Size) ->
- NewSeed = erlang:md5(Seed),
- <<V:Size/binary, _/binary>> = NewSeed,
- [V].
-
-
-
-normalize_binary_index(Value) ->
- normalize_binary_index(Value, <<>>).
-normalize_binary_index(<<C, Rest/binary>>, Acc)
- when C >= $a andalso C =< $z;
- C >= $A andalso C =< $Z;
- C >= $0 andalso C =< $9 ->
- normalize_binary_index(Rest, <<Acc/binary, <<C>>/binary>>);
-normalize_binary_index(<<_, Rest/binary>>, Acc) ->
- normalize_binary_index(Rest, <<Acc/binary, <<$_>>/binary>>);
-normalize_binary_index(<<>>, Acc) ->
- Acc.
+generate_integer_indexes_for_key(Key, N) ->
+ F = fun(X) ->
+ {"field" ++ to_list(X) ++ "_int", Key}
+ end,
+ [F(X) || X <- lists:seq(1, N)].
+
+to_binary(B) when is_binary(B) ->
+ B;
+to_binary(I) when is_integer(I) ->
+ list_to_binary(integer_to_list(I));
+to_binary(L) when is_list(L) ->
+ list_to_binary(L).
+
+to_integer(I) when is_integer(I) ->
+ I;
+to_integer(B) when is_binary(B) ->
+ list_to_integer(binary_to_list(B));
+to_integer(L) when is_list(L) ->
+ list_to_integer(L).
+
+to_list(L) when is_list(L) ->
+ L;
+to_list(B) when is_binary(B) ->
+ binary_to_list(B);
+to_list(I) when is_integer(I) ->
+ integer_to_list(I).
+
+choose(N, L) ->
+ lists:nth((N rem length(L) + 1), L).
+
+json_get(Url) ->
+ Response = ibrowse:send_req(lists:flatten(Url), [], get),
+ case Response of
+ {ok, "200", _, Body} ->
+ {ok, mochijson2:decode(Body)};
+ Other ->
+ {error, Other}
+ end.
+
+json_post(Url, Payload) ->
+ Headers = [{"Content-Type", "application/json"}],
+ Response = ibrowse:send_req(lists:flatten(Url), Headers,
+ post, lists:flatten(Payload)),
+ case Response of
+ {ok, "200", _, Body} ->
+ {ok, mochijson2:decode(Body)};
+ Other ->
+ {error, Other}
+ end.
+
+ensure_module(Module) ->
+ case code:which(Module) of
+ non_existing ->
+ ?FAIL_MSG("~s requires " ++ atom_to_list(Module) ++ " module to be available on code path.\n", [?MODULE]);
+ _ ->
+ ok
+ end.
View
23 src/basho_bench_driver_bitcask.erl
@@ -27,6 +27,8 @@
-include("basho_bench.hrl").
-record(state, { file,
+ filename,
+ flags,
sync_interval,
last_sync }).
@@ -62,7 +64,12 @@ new(_Id) ->
{error, Reason} ->
?FAIL_MSG("Failed to open bitcask in ~s: ~p\n", [Filename, Reason]);
File ->
- {ok, #state { file = File, sync_interval = SyncInterval,
+ %% Try to start the merge worker
+ bitcask_merge_worker:start_link(),
+ {ok, #state { file = File,
+ filename = Filename,
+ flags = Flags,
+ sync_interval = SyncInterval,
last_sync = os:timestamp() }}
end.
@@ -85,6 +92,19 @@ run(put, KeyGen, ValueGen, State) ->
{ok, State1};
{error, Reason} ->
{error, Reason}
+ end;
+run(merge, _KeyGen, _ValueGen, State) ->
+ case bitcask:needs_merge(State#state.file) of
+ {true, Files} ->
+ case bitcask_merge_worker:merge(State#state.filename, State#state.flags,
+ Files) of
+ ok ->
+ {ok, State};
+ Other ->
+ {error, {merge_failed, Other}}
+ end;
+ false ->
+ {ok, State}
end.
@@ -100,3 +120,4 @@ maybe_sync(#state { sync_interval = SyncInterval } = State) ->
_ ->
State
end.
+
View
11 src/basho_bench_driver_http_raw.erl
@@ -26,7 +26,7 @@
-include("basho_bench.hrl").
--record(url, {abspath, host, port, username, password, path, protocol}).
+-record(url, {abspath, host, port, username, password, path, protocol, host_type}).
-record(state, { client_id, % Tuple client ID for HTTP requests
base_urls, % Tuple of #url -- one for each IP
@@ -182,6 +182,15 @@ run(insert, KeyGen, ValueGen, State) ->
{error, Reason} ->
{error, Reason, S2}
end;
+run(put, KeyGen, ValueGen, State) ->
+ {NextUrl, S2} = next_url(State),
+ Url = url(NextUrl, KeyGen, State#state.path_params),
+ case do_put(Url, [State#state.client_id], ValueGen) of
+ ok ->
+ {ok, S2};
+ {error, Reason} ->
+ {error, Reason, S2}
+ end;
run(search, _KeyGen, _ValueGen, State) when State#state.searchgen == undefined ->
{_NextUrl, S2} = next_url(State),
View
2  src/basho_bench_driver_riakc_pb.erl
@@ -62,7 +62,7 @@ new(Id) ->
Port = basho_bench_config:get(riakc_pb_port, 8087),
%% riakc_pb_replies sets defaults for R, W, DW and RW.
%% Each can be overridden separately
- Replies = basho_bench_config:get(riakc_pb_replies, 2),
+ Replies = basho_bench_config:get(riakc_pb_replies, quorum),
R = basho_bench_config:get(riakc_pb_r, Replies),
W = basho_bench_config:get(riakc_pb_w, Replies),
DW = basho_bench_config:get(riakc_pb_dw, Replies),
View
11 src/basho_bench_java_client.erl
@@ -23,16 +23,19 @@
-export([new/5, get/4, put/6, create_update/7, update/7, delete/4]).
+-define(TIMEOUT, 60*1000).
+
%%% Ask the java node to create a new process, and link to it
new(Node, Ip, Port, PBBuffer, Transport) ->
erlang:send({factory, Node}, {self(), {Ip, Port, PBBuffer, Transport}}),
receive
Pid when is_pid(Pid) ->
- ok
- end,
- link(Pid),
- {ok, Pid}.
+ link(Pid),
+ {ok, Pid}
+ after ?TIMEOUT ->
+ {error, timeout}
+ end.
get(Pid, Bucket, Key, R) ->
Pid ! {self(), {get, [{bucket, Bucket}, {key, Key}, {r, R}]}},
View
2  src/basho_bench_keygen.erl
@@ -62,6 +62,8 @@ new({pareto_int, MaxKey}, _Id) ->
new({truncated_pareto_int, MaxKey}, Id) ->
Pareto = new({pareto_int, MaxKey}, Id),
fun() -> erlang:min(MaxKey, Pareto()) end;
+new(uuid_v4, _Id) ->
+ fun() -> uuid:v4() end;
new({function, Module, Function, Args}, Id) ->
case code:ensure_loaded(Module) of
{module, Module} ->
View
139 src/basho_bench_measurement.erl
@@ -0,0 +1,139 @@
+%% -------------------------------------------------------------------
+%%
+%% basho_bench: Benchmarking Suite
+%%
+%% Copyright (c) 2009-2010 Basho Techonologies
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+-module(basho_bench_measurement).
+
+-behaviour(gen_server).
+
+%% API
+-export([start_link/0,
+ run/0,
+ take_measurement/1]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-record(state, { driver,
+ driver_state,
+ timer_refs = []
+ }).
+
+-include("basho_bench.hrl").
+
+%% ====================================================================
+%% API
+%% ====================================================================
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+run() ->
+ gen_server:cast(?MODULE, run).
+
+take_measurement(Measurement) ->
+ gen_server:call(?MODULE, {take_measurement, Measurement}).
+
+%% ====================================================================
+%% gen_server callbacks
+%% ====================================================================
+
+init([]) ->
+ %% Pull all config settings from environment
+ Driver = basho_bench_config:get(measurement_driver),
+ case catch(Driver:new()) of
+ {ok, DriverState} ->
+ State = #state {
+ driver = Driver,
+ driver_state = DriverState },
+ {ok, State};
+ Error ->
+ ?FAIL_MSG("Failed to initialize driver ~p: ~p\n", [Driver, Error]),
+ {stop, Error}
+ end.
+
+handle_call(run, _From, State) ->
+ NewState = restart_measurements(State),
+ {reply, ok, NewState};
+handle_call({take_measurement, Measurement}, _From, State) ->
+ Driver = State#state.driver,
+ DriverState = State#state.driver_state,
+ {_Label, MeasurementTag} = Measurement,
+ Result = (catch Driver:run(MeasurementTag, DriverState)),
+ case Result of
+ {ok, Value, NewDriverState} ->
+ basho_bench_stats:op_complete(Measurement, ok, Value),
+ {reply, ok, State#state { driver_state = NewDriverState}};
+
+ {error, Reason, NewDriverState} ->
+ %% Driver encountered a recoverable error
+ basho_bench_stats:op_complete(Measurement, {error, Reason}, 0),
+ {reply, ok, State#state { driver_state = NewDriverState}};
+
+ {'EXIT', Reason} ->
+ %% Driver crashed, generate a crash error and terminate. This will take down
+ %% the corresponding measurement which will get restarted by the appropriate supervisor.
+ basho_bench_stats:op_complete(Measurement, {error, crash}, 0),
+
+ %% Give the driver a chance to cleanup
+ (catch Driver:terminate({'EXIT', Reason}, DriverState)),
+
+ ?DEBUG("Driver ~p crashed: ~p\n", [Driver, Reason]),
+ {stop, crash, State};
+
+ {stop, Reason} ->
+ %% Driver (or something within it) has requested that this measurement
+ %% terminate cleanly.
+ ?INFO("Driver ~p (~p) has requested stop: ~p\n", [Driver, self(), Reason]),
+
+ %% Give the driver a chance to cleanup
+ (catch Driver:terminate(normal, DriverState)),
+
+ {stop, normal, State}
+ end.
+
+
+handle_cast(run, State) ->
+ NewState = restart_measurements(State),
+ {noreply, NewState}.
+
+handle_info(Msg, State) ->
+ io:format("[~s:~p] DEBUG - Msg: ~p~n", [?MODULE, ?LINE, Msg]),
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+restart_measurements(State) ->
+ [timer:cancel(X) || X <- State#state.timer_refs],
+ F =
+ fun({MeasurementTag, IntervalMS}) ->
+ {ok, TRef} = timer:apply_interval(IntervalMS, ?MODULE, take_measurement, [{MeasurementTag, MeasurementTag}]),
+ TRef;
+ ({Label, MeasurementTag, IntervalMS}) ->
+ {ok, TRef} = timer:apply_interval(IntervalMS, ?MODULE, take_measurement, [{Label, MeasurementTag}]),
+ TRef
+ end,
+ TRefs = [F(X) || X <- basho_bench_config:get(measurements, [])],
+ State#state { timer_refs = TRefs }.
View
113 src/basho_bench_measurement_erlangvm.erl
@@ -0,0 +1,113 @@
+%% -------------------------------------------------------------------
+%%
+%% basho_bench_measurement_erlangvm: Measurement Driver for Erlang VMs.
+%%
+%% Copyright (c) 2009 Basho Techonologies
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+-module(basho_bench_measurement_erlangvm).
+
+-export([new/0,
+ run/2]).
+
+-include("basho_bench.hrl").
+
+-record(state, {
+ nodes
+ }).
+
+%% ====================================================================
+%% API
+%% ====================================================================
+
+new() ->
+ %% Try to spin up net_kernel
+ MyNode = basho_bench_config:get(mynode, [basho_bench, longnames]),
+ case net_kernel:start(MyNode) of
+ {ok, _} ->
+ ?INFO("Net kernel started as ~p\n", [node()]);
+ {error, {already_started, _}} ->
+ ok;
+ {error, Reason} ->
+ ?FAIL_MSG("Failed to start net_kernel for ~p: ~p\n", [?MODULE, Reason])
+ end,
+
+ Nodes = basho_bench_config:get(nodes),
+ Cookie = basho_bench_config:get(cookie),
+
+ %% Initialize cookie for each of the nodes
+ [true = erlang:set_cookie(N, Cookie) || N <- Nodes],
+
+ %% Try to ping each of the nodes
+ ping_each(Nodes),
+
+ {ok, #state{ nodes = Nodes }}.
+
+run(memory, State) ->
+ %% Memory used: erlang:memory(total).
+ F = fun(Node) ->
+ rpc:call(Node, erlang, memory, [total])
+ end,
+ Memory = lists:sum([F(X) || X <- State#state.nodes]),
+ {ok, Memory, State};
+
+run(cpu, State) ->
+ %% CPU load: cpu_sup:avg1() / 256.
+ F = fun(Node) ->
+ rpc:call(Node, cpu_sup, avg1, []) / 256 * 100
+ end,
+ AvgLoad = trunc(lists:sum([F(X) || X <- State#state.nodes]) / length(State#state.nodes)),
+ {ok, AvgLoad, State};
+
+run(processes, State) ->
+ %% processes: length(erlang:processes()).
+ F = fun(Node) ->
+ %% We're sending back the process list, which sucks, but
+ %% not sure if there is a better way to do it.
+ length(rpc:call(Node, erlang, processes, []))
+ end,
+ Processes = lists:sum([F(X) || X <- State#state.nodes]),
+
+ {ok, Processes, State};
+
+run(filehandles, State) ->
+ %% filehandles: list_to_integer(string:strip(string:strip(os:cmd("lsof -p" ++ os:getpid() ++ " | wc -l"), both), both, $\n)).
+ F = fun(Node) ->
+ %% We're sending back the process list, which sucks, but
+ %% not sure if there is a better way to do it.
+ Pid = rpc:call(Node, os, getpid, []),
+ Cmd = io_lib:format("lsof -n -p ~p | wc -l", [Pid]),
+ S1 = rpc:call(Node, os, cmd, [Cmd]),
+ S2 = string:strip(S1, both),
+ S3 = string:strip(S2, both, $\n),
+ list_to_integer(S3)
+ end,
+ Filehandles = lists:sum([F(X) || X <- State#state.nodes]),
+ {ok, Filehandles, State};
+
+run(Measurement, _State) ->
+ {unknown_measurement, Measurement}.
+
+ping_each([]) ->
+ ok;
+ping_each([Node | Rest]) ->
+ case net_adm:ping(Node) of
+ pong ->
+ ping_each(Rest);
+ pang ->
+ ?FAIL_MSG("Failed to ping node ~p\n", [Node])
+ end.
View
119 src/basho_bench_stats.erl
@@ -35,15 +35,15 @@
-include("basho_bench.hrl").
-record(state, { ops,
- start_time,
- last_write_time,
+ start_time = now(),
+ last_write_time = now(),
report_interval,
errors_since_last_report = false,
summary_file,
errors_file}).
%% Tracks latencies up to 5 secs w/ 250 us resolution
--define(NEW_HIST, stats_histogram:new(0, 5000000, 20000)).
+-define(NEW_HIST, basho_stats_histogram:new(0, 5000000, 20000)).
%% ====================================================================
%% API
@@ -65,7 +65,7 @@ op_complete(Op, Result, ElapsedUs) ->
init([]) ->
%% Trap exits so we have a chance to flush data
process_flag(trap_exit, true),
-
+
%% Initialize an ETS table to track error and crash counters during
%% reporting interval
ets:new(basho_bench_errors, [protected, named_table]),
@@ -74,18 +74,35 @@ init([]) ->
%% the start of the run
ets:new(basho_bench_total_errors, [protected, named_table]),
+ %% Initialize an ETS table to track custom units
+ ets:new(basho_bench_units, [protected, named_table]),
+
%% Get the list of operations we'll be using for this test
- Ops = [Op || {Op, _} <- basho_bench_config:get(operations)],
+ F1 =
+ fun({OpTag, _Count}) -> {OpTag, OpTag};
+ ({Label, OpTag, _Count}) -> {Label, OpTag}
+ end,
+ Ops = [F1(X) || X <- basho_bench_config:get(operations, [])],
+
+ %% Get the list of measurements we'll be using for this test
+ F2 =
+ fun({MeasurementTag, _IntervalMS}) -> {MeasurementTag, MeasurementTag};
+ ({Label, MeasurementTag, _IntervalMS}) -> {Label, MeasurementTag}
+ end,
+ Measurements = [F2(X) || X <- basho_bench_config:get(measurements, [])],
%% Setup stats instance for each operation -- we only track latencies on
%% successful operations
%%
%% NOTE: Store the histograms in the process dictionary to avoid painful
%% copying on state updates.
- [erlang:put({latencies, Op}, ?NEW_HIST) || Op <- Ops],
+ [erlang:put({latencies, Op}, ?NEW_HIST) || Op <- Ops ++ Measurements],
+
+ %% Setup output file handles for dumping periodic CSV of histogram results.
+ [erlang:put({csv_file, X}, op_csv_file(X)) || X <- Ops],
%% Setup output file handles for dumping periodic CSV of histogram results.
- [erlang:put({csv_file, Op}, op_csv_file(Op)) || Op <- Ops],
+ [erlang:put({csv_file, X}, measurement_csv_file(X)) || X <- Measurements],
%% Setup output file w/ counters for total requests, errors, etc.
{ok, SummaryFile} = file:open("summary.csv", [raw, binary, write]),
@@ -99,7 +116,7 @@ init([]) ->
%% Schedule next write/reset of data
ReportInterval = timer:seconds(basho_bench_config:get(report_interval)),
- {ok, #state{ ops = Ops,
+ {ok, #state{ ops = Ops ++ Measurements,
report_interval = ReportInterval,
summary_file = SummaryFile,
errors_file = ErrorsFile}}.
@@ -110,10 +127,13 @@ handle_call(run, _From, State) ->
erlang:send_after(State#state.report_interval, self(), report),
{reply, ok, State#state { start_time = Now, last_write_time = Now}};
-handle_call({op, Op, ok, ElapsedUs}, _From, State) ->
+handle_call({op, Op, ok, ElapsedUs}, From, State) ->
+ handle_call({op, Op, {ok, 1}, ElapsedUs}, From, State);
+handle_call({op, Op, {ok, Units}, ElapsedUs}, _From, State) ->
%% Update the histogram for the op in question
- Hist = stats_histogram:update(ElapsedUs, erlang:get({latencies, Op})),
+ Hist = basho_stats_histogram:update(ElapsedUs, erlang:get({latencies, Op})),
erlang:put({latencies, Op}, Hist),
+ ets_increment(basho_bench_units, Op, Units),
{reply, ok, State};
handle_call({op, Op, {error, Reason}, _ElapsedUs}, _From, State) ->
increment_error_counter(Op),
@@ -150,35 +170,79 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal functions
%% ====================================================================
-op_csv_file(Op) ->
- Fname = lists:concat([Op, "_latencies.csv"]),
+op_csv_file({Label, _Op}) ->
+ Fname = normalize_label(Label) ++ "_latencies.csv",
+ {ok, F} = file:open(Fname, [raw, binary, write]),
+ ok = file:write(F, <<"elapsed, window, n, min, mean, median, 95th, 99th, 99_9th, max, errors\n">>),
+ F.
+
+measurement_csv_file({Label, _Op}) ->
+ Fname = normalize_label(Label) ++ "_measurements.csv",
{ok, F} = file:open(Fname, [raw, binary, write]),
ok = file:write(F, <<"elapsed, window, n, min, mean, median, 95th, 99th, 99_9th, max, errors\n">>),
F.
+normalize_label(Label) when is_list(Label) ->
+ replace_special_chars(Label);
+normalize_label(Label) when is_binary(Label) ->
+ normalize_label(binary_to_list(Label));
+normalize_label(Label) when is_integer(Label) ->
+ normalize_label(integer_to_list(Label));
+normalize_label(Label) when is_atom(Label) ->
+ normalize_label(atom_to_list(Label));
+normalize_label(Label) when is_tuple(Label) ->
+ Parts = [normalize_label(X) || X <- tuple_to_list(Label)],
+ string:join(Parts, "-").
+
+replace_special_chars([H|T]) when
+ (H >= $0 andalso H =< $9) orelse
+ (H >= $A andalso H =< $Z) orelse
+ (H >= $a andalso H =< $z) ->
+ [H|replace_special_chars(T)];
+replace_special_chars([_|T]) ->
+ [$-|replace_special_chars(T)];
+replace_special_chars([]) ->
+ [].
+
increment_error_counter(Key) ->
ets_increment(basho_bench_errors, Key, 1).
-ets_increment(Tab, Key, Incr) ->
+ets_increment(Tab, Key, Incr) when is_integer(Incr) ->
%% Increment the counter for this specific key. We have to deal with
%% missing keys, so catch the update if it fails and init as necessary
case catch(ets:update_counter(Tab, Key, Incr)) of
Value when is_integer(Value) ->
ok;
{'EXIT', _} ->
- true = ets:insert_new(Tab, {Key, Incr}),
- ok
- end.
+ case ets:insert_new(Tab, {Key, Incr}) of
+ true ->
+ ok;
+ _ ->
+ %% Race with another load gen proc, so retry
+ ets_increment(Tab, Key, Incr)
+ end
+ end;
+ets_increment(Tab, Key, Incr) when is_float(Incr) ->
+ Old = case ets:lookup(Tab, Key) of
+ [{_, Val}] -> Val;
+ [] -> 0
+ end,
+ true = ets:insert(Tab, {Key, Old + Incr}).
error_counter(Key) ->
- case catch(ets:lookup_element(basho_bench_errors, Key, 2)) of
+ lookup_or_zero(basho_bench_errors, Key).
+
+units_counter(Key) ->
+ lookup_or_zero(basho_bench_units, Key).
+
+lookup_or_zero(Tab, Key) ->
+ case catch(ets:lookup_element(Tab, Key, 2)) of
{'EXIT', _} ->
0;
Value ->
Value
end.
-
process_stats(Now, State) ->
%% Determine how much time has elapsed (seconds) since our last report
@@ -195,6 +259,8 @@ process_stats(Now, State) ->
%% Reset latency histograms
[erlang:put({latencies, Op}, ?NEW_HIST) || Op <- State#state.ops],
+ %% Reset units
+ [ets:insert(basho_bench_units, {Op, 0}) || Op <- State#state.ops],
%% Write summary
file:write(State#state.summary_file,
@@ -225,19 +291,20 @@ process_stats(Now, State) ->
report_latency(Elapsed, Window, Op) ->
Hist = erlang:get({latencies, Op}),
Errors = error_counter(Op),
- case stats_histogram:observations(Hist) > 0 of
+ Units = units_counter(Op),
+ case basho_stats_histogram:observations(Hist) > 0 of
true ->
- {Min, Mean, Max, _, _} = stats_histogram:summary_stats(Hist),
+ {Min, Mean, Max, _, _} = basho_stats_histogram:summary_stats(Hist),
Line = io_lib:format("~w, ~w, ~w, ~w, ~.1f, ~.1f, ~.1f, ~.1f, ~.1f, ~w, ~w\n",
[Elapsed,
Window,
- stats_histogram:observations(Hist),
+ Units,
Min,
Mean,
- stats_histogram:quantile(0.500, Hist),
- stats_histogram:quantile(0.950, Hist),
- stats_histogram:quantile(0.990, Hist),
- stats_histogram:quantile(0.999, Hist),
+ basho_stats_histogram:quantile(0.500, Hist),
+ basho_stats_histogram:quantile(0.950, Hist),
+ basho_stats_histogram:quantile(0.990, Hist),
+ basho_stats_histogram:quantile(0.999, Hist),
Max,
Errors]);
false ->
@@ -248,7 +315,7 @@ report_latency(Elapsed, Window, Op) ->
Errors])
end,
ok = file:write(erlang:get({csv_file, Op}), Line),
- {stats_histogram:observations(Hist), Errors}.
+ {Units, Errors}.
report_total_errors(State) ->
case ets:tab2list(basho_bench_total_errors) of
View
15 src/basho_bench_sup.erl
@@ -16,7 +16,7 @@
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
-%% under the License.
+%% under the License.
%%
%% -------------------------------------------------------------------
-module(basho_bench_sup).
@@ -59,9 +59,18 @@ init([]) ->
%% Get the number concurrent workers we're expecting and generate child
%% specs for each
Workers = worker_specs(basho_bench_config:get(concurrent), []),
- {ok, {{one_for_one, 5, 10}, [?CHILD(basho_bench_log, worker),
- ?CHILD(basho_bench_stats, worker)] ++ Workers}}.
+ MeasurementDriver =
+ case basho_bench_config:get(measurement_driver, undefined) of
+ undefined -> [];
+ _Driver -> [?CHILD(basho_bench_measurement, worker)]
+ end,
+ {ok, {{one_for_one, 5, 10},
+ [?CHILD(basho_bench_log, worker)] ++
+ [?CHILD(basho_bench_stats, worker)] ++
+ Workers ++
+ MeasurementDriver
+ }}.
%% ===================================================================
%% Internal functions
View
5 src/basho_bench_valgen.erl
@@ -33,9 +33,12 @@
new({fixed_bin, Size}, _Id) ->
Source = init_source(),
fun() -> data_block(Source, Size) end;
+new({fixed_bin, Size, Val}, _Id) ->
+ Data = list_to_binary(lists:duplicate(Size, Val)),
+ fun() -> Data end;
new({exponential_bin, MinSize, Mean}, _Id) ->
Source = init_source(),
- fun() -> data_block(Source, MinSize + trunc(stats_rv:exponential(1 / Mean))) end;
+ fun() -> data_block(Source, MinSize + trunc(basho_stats_rv:exponential(1 / Mean))) end;
new({uniform_bin, MinSize, MaxSize}, _Id) ->
Source = init_source(),
Diff = MaxSize - MinSize,
View
17 src/basho_bench_worker.erl
@@ -172,7 +172,13 @@ stop_worker(SupChild) ->
%% Expand operations list into tuple suitable for weighted, random draw
%%
ops_tuple() ->
- Ops = [lists:duplicate(Count, Op) || {Op, Count} <- basho_bench_config:get(operations)],
+ F =
+ fun({OpTag, Count}) ->
+ lists:duplicate(Count, {OpTag, OpTag});
+ ({Label, OpTag, Count}) ->
+ lists:duplicate(Count, {Label, OpTag})
+ end,
+ Ops = [F(X) || X <- basho_bench_config:get(operations, [])],
list_to_tuple(lists:flatten(Ops)).
@@ -213,14 +219,15 @@ worker_idle_loop(State) ->
worker_next_op(State) ->
Next = element(random:uniform(State#state.ops_len), State#state.ops),
+ {_Label, OpTag} = Next,
Start = now(),
- Result = (catch (State#state.driver):run(Next, State#state.keygen, State#state.valgen,
+ Result = (catch (State#state.driver):run(OpTag, State#state.keygen, State#state.valgen,
State#state.driver_state)),
ElapsedUs = timer:now_diff(now(), Start),
case Result of
- {ok, DriverState} ->
+ {Res, DriverState} when Res == ok orelse element(1, Res) == ok ->
%% Success
- basho_bench_stats:op_complete(Next, ok, ElapsedUs),
+ basho_bench_stats:op_complete(Next, Res, ElapsedUs),
{ok, State#state { driver_state = DriverState}};
{error, Reason, DriverState} ->
@@ -278,7 +285,7 @@ max_worker_run_loop(State) ->
rate_worker_run_loop(State, Lambda) ->
%% Delay between runs using exponentially distributed delays to mimic
%% queue.
- timer:sleep(trunc(stats_rv:exponential(Lambda))),
+ timer:sleep(trunc(basho_stats_rv:exponential(Lambda))),
case worker_next_op(State) of
{ok, State2} ->
case needs_shutdown(State2) of
View
59 src/uuid.erl
@@ -0,0 +1,59 @@
+% Copyright (c) 2008, Travis Vachon
+% All rights reserved.
+%
+% Redistribution and use in source and binary forms, with or without
+% modification, are permitted provided that the following conditions are
+% met:
+%
+% * Redistributions of source code must retain the above copyright
+% notice, this list of conditions and the following disclaimer.
+%
+% * Redistributions in binary form must reproduce the above copyright
+% notice, this list of conditions and the following disclaimer in the
+% documentation and/or other materials provided with the distribution.
+%
+% * Neither the name of the author nor the names of its contributors
+% may be used to endorse or promote products derived from this
+% software without specific prior written permission.
+%
+% THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+% "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+% LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+% A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+% OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+% SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
+% TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+% PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+% LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+% NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+% SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+%
+-module(uuid).
+-export([v4/0, to_string/1, get_parts/1, to_binary/1]).
+-import(random).
+
+% Generates a random binary UUID.
+v4() ->
+ v4(random:uniform(round(math:pow(2, 48))) - 1, random:uniform(round(math:pow(2, 12))) - 1, random:uniform(round(math:pow(2, 32))) - 1, random:uniform(round(math:pow(2, 30))) - 1).
+v4(R1, R2, R3, R4) ->
+ <<R1:48, 4:4, R2:12, 2:2, R3:32, R4: 30>>.
+
+% Returns a string representation of a binary UUID.
+to_string(U) ->
+ lists:flatten(io_lib:format("~8.16.0b-~4.16.0b-~4.16.0b-~2.16.0b~2.16.0b-~12.16.0b", get_parts(U))).
+
+% Returns the 32, 16, 16, 8, 8, 48 parts of a binary UUID.
+get_parts(<<TL:32, TM:16, THV:16, CSR:8, CSL:8, N:48>>) ->
+ [TL, TM, THV, CSR, CSL, N].
+
+% Converts a UUID string in the format of 550e8400-e29b-41d4-a716-446655440000
+% (with or without the dashes) to binary.
+to_binary(U)->
+ convert(lists:filter(fun(Elem) -> Elem /= $- end, U), []).
+
+% Converts a list of pairs of hex characters (00-ff) to bytes.
+convert([], Acc)->
+ list_to_binary(lists:reverse(Acc));
+convert([X, Y | Tail], Acc)->
+ {ok, [Byte], _} = io_lib:fread("~16u", [X, Y]),
+ convert(Tail, [Byte | Acc]).
Please sign in to comment.
Something went wrong with that request. Please try again.