Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge branch 'pipe'

  • Loading branch information...
commit 36c5dbb24da06f5d57cc0ab75c1f0f1f00469cc3 2 parents e91319c + 88b9abe
@dizzyd dizzyd authored
Showing with 78 additions and 5 deletions.
  1. +33 −0 examples/riakc_mr.config
  2. +45 −5 src/basho_bench_driver_riakc_pb.erl
View
33 examples/riakc_mr.config
@@ -0,0 +1,33 @@
+% -*- mode: erlang -*-
+{driver, basho_bench_driver_riakc_pb}.
+
+{code_paths, ["deps/stats",
+ "deps/riakc",
+ "deps/protobuffs"]}.
+
+{riakc_pb_ips, [{127,0,0,1}]}.
+
+{riakc_pb_replies, 1}.
+
+{riakc_pb_bucket, <<"bryanitbs">>}.
+
+%% load
+
+%% {mode, max}.
+%% {duration, 10000}.
+%% {concurrent, 1}.
+%% {operations, [{put, 1}]}.
+%% {key_generator, {int_to_str, {sequential_int, 10000}}}.
+%% {value_generator,
+%% {function, basho_bench_driver_riakc_pb, mapred_valgen, [10000]}}.
+
+%% test
+
+{mode, max}.
+{duration, 1}.
+{concurrent, 1}.
+{operations, [{mr_keylist_js, 1}]}.
+{key_generator, {int_to_str, {uniform_int, 9999}}}.
+{value_generator, {fixed_bin, 1}}.
+{riakc_pb_keylist_length, 1000}.
+
View
50 src/basho_bench_driver_riakc_pb.erl
@@ -22,7 +22,8 @@
-module(basho_bench_driver_riakc_pb).
-export([new/1,
- run/4]).
+ run/4,
+ mapred_valgen/2]).
-include("basho_bench.hrl").
@@ -31,7 +32,15 @@
r,
w,
dw,
- rw}).
+ rw,
+ keylist_length}).
+
+-define(ERLANG_MR,
+ [{map, {modfun, riak_kv_mapreduce, map_object_value}, none, false},
+ {reduce, {modfun, riak_kv_mapreduce, reduce_count_inputs}, none, true}]).
+-define(JS_MR,
+ [{map, {jsfun, <<"Riak.mapValuesJson">>}, none, false},
+ {reduce, {jsfun, <<"Riak.reduceSum">>}, none, true}]).
%% ====================================================================
%% API
@@ -57,6 +66,7 @@ new(Id) ->
DW = basho_bench_config:get(riakc_pb_dw, Replies),
RW = basho_bench_config:get(riakc_pb_rw, Replies),
Bucket = basho_bench_config:get(riakc_pb_bucket, <<"test">>),
+ KeylistLength = basho_bench_config:get(riakc_pb_keylist_length, 1000),
%% Choose the node using our ID as a modulus
TargetIp = lists:nth((Id rem length(Ips)+1), Ips),
@@ -69,7 +79,8 @@ new(Id) ->
r = R,
w = W,
dw = DW,
- rw = RW
+ rw = RW,
+ keylist_length = KeylistLength
}};
{error, Reason2} ->
?FAIL_MSG("Failed to connect riakc_pb_socket to ~p port ~p: ~p\n",
@@ -166,10 +177,39 @@ run(listkeys, _KeyGen, _ValueGen, State) ->
{ok, State};
{error, Reason} ->
{error, Reason, State}
- end.
-
+ end;
+run(mr_bucket_erlang, _KeyGen, _ValueGen, State) ->
+ mapred(State, State#state.bucket, ?ERLANG_MR);
+run(mr_bucket_js, _KeyGen, _ValueGen, State) ->
+ mapred(State, State#state.bucket, ?JS_MR);
+run(mr_keylist_erlang, KeyGen, _ValueGen, State) ->
+ Keylist = make_keylist(State#state.bucket, KeyGen,
+ State#state.keylist_length),
+ mapred(State, Keylist, ?ERLANG_MR);
+run(mr_keylist_js, KeyGen, _ValueGen, State) ->
+ Keylist = make_keylist(State#state.bucket, KeyGen,
+ State#state.keylist_length),
+ mapred(State, Keylist, ?JS_MR).
%% ====================================================================
%% Internal functions
%% ====================================================================
+mapred(State, Input, Query) ->
+ case riakc_pb_socket:mapred(State#state.pid, Input, Query) of
+ {ok, _Result} ->
+ {ok, State};
+ {error, Reason} ->
+ {error, Reason, State}
+ end.
+
+make_keylist(_Bucket, _KeyGen, 0) ->
+ [];
+make_keylist(Bucket, KeyGen, Count) ->
+ [{Bucket, list_to_binary(KeyGen())}
+ |make_keylist(Bucket, KeyGen, Count-1)].
+
+mapred_valgen(_Id, MaxRand) ->
+ fun() ->
+ list_to_binary(integer_to_list(random:uniform(MaxRand)))
+ end.
Please sign in to comment.
Something went wrong with that request. Please try again.