Skip to content

Commit

Permalink
Added Riak PBC driver.
Browse files Browse the repository at this point in the history
  • Loading branch information
Jon Meredith committed May 14, 2010
1 parent b9f74c7 commit 0f0a670
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 1 deletion.
1 change: 1 addition & 0 deletions ebin/basho_bench.app
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
basho_bench_driver_dets,
basho_bench_driver_http_raw,
basho_bench_driver_innostore,
basho_bench_driver_riakc_pb,
basho_bench_driver_riakclient,
basho_bench_driver_cassandra,
basho_bench_driver_bitcask,
Expand Down
22 changes: 22 additions & 0 deletions examples/riakc_pb.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{mode, max}.

{duration, 10}.

{concurrent, 1}.

{driver, basho_bench_driver_riakc_pb}.

{code_paths, ["deps/stats",
"deps/riakc",
"deps/protobuffs"]}.

{key_generator, {uniform_int_str, 10000}}.

{value_generator, {fixed_bin, 10000}}.

{riakc_pb_ips, [{127,0,0,1}]}.

{riakc_pb_replies, 1}.

{operations, [{get, 1}, {update, 1}]}.

3 changes: 2 additions & 1 deletion rebar.config
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{deps, [
{stats, "1", {hg, "http://bitbucket.org/dizzyd/stats", "tip"}},
{ibrowse, "1.5", {git, "http://github.com/dizzyd/ibrowse.git", "HEAD"}},
{casbench, "0.1", {hg, "ssh://hg@bitbucket.org/basho/casbench", "tip"}}
{casbench, "0.1", {hg, "ssh://hg@bitbucket.org/basho/casbench", "tip"}},
{riakc, ".*", {hg, "ssh://hg@bitbucket.org/basho/riak-erlang-client riakc", "tip"}}
]}.
140 changes: 140 additions & 0 deletions src/basho_bench_driver_riakc_pb.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
%% -------------------------------------------------------------------
%%
%% basho_bench_driver_riakc_pb: Driver for riak protocol buffers client
%%
%% Copyright (c) 2009 Basho Techonologies
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(basho_bench_driver_riakc_pb).

-export([new/1,
run/4]).

-include("basho_bench.hrl").

-record(state, { pid,
bucket,
r,
w,
dw,
rw}).

%% ====================================================================
%% API
%% ====================================================================

new(Id) ->
%% Make sure the path is setup such that we can get at riak_client
case code:which(riakc_pb_socket) of
non_existing ->
?FAIL_MSG("~s requires riakc_pb_socket module to be available on code path.\n",
[?MODULE]);
_ ->
ok
end,

Ips = basho_bench_config:get(riakc_pb_ips, [{127,0,0,1}]),
Port = basho_bench_config:get(riakc_pb_port, 8087),
%% riakc_pb_replies sets defaults for R, W, DW and RW.
%% Each can be overridden separately
Replies = basho_bench_config:get(riakc_pb_replies, 2),
R = basho_bench_config:get(riakc_pb_r, Replies),
W = basho_bench_config:get(riakc_pb_w, Replies),
DW = basho_bench_config:get(riakc_pb_dw, Replies),
RW = basho_bench_config:get(riakc_pb_dw, Replies),
Bucket = basho_bench_config:get(riakc_pb_bucket, <<"test">>),

%% Choose the node using our ID as a modulus
TargetIp = lists:nth((Id rem length(Ips)+1), Ips),
?INFO("Using target ip ~p for worker ~p\n", [TargetIp, Id]),

case riakc_pb_socket:start_link(TargetIp, Port) of
{ok, Pid} ->
{ok, #state { pid = Pid,
bucket = Bucket,
r = R,
w = W,
dw = DW,
rw = RW
}};
{error, Reason2} ->
?FAIL_MSG("Failed to connect riakc_pb_socket to ~p port ~p: ~p\n",
[TargetIp, Port, Reason2])
end.

run(get, KeyGen, _ValueGen, State) ->
Key = KeyGen(),
case riakc_pb_socket:get(State#state.pid, State#state.bucket, Key,
[{r, State#state.r}]) of
{ok, _} ->
{ok, State};
{error, notfound} ->
{ok, State};
{error, Reason} ->
{error, Reason, State}
end;
run(put, KeyGen, ValueGen, State) ->
Robj0 = riakc_obj:new(State#state.bucket, KeyGen()),
Robj = riakc_obj:update_value(Robj0, ValueGen()),
case riakc_pb_socket:put(State#state.pid, Robj, [{w, State#state.w},
{dw, State#state.dw}]) of
ok ->
{ok, State};
{error, Reason} ->
{error, Reason, State}
end;
run(update, KeyGen, ValueGen, State) ->
Key = KeyGen(),
case riakc_pb_socket:get(State#state.pid, State#state.bucket,
Key, [{r, State#state.r}]) of
{ok, Robj} ->
Robj2 = riakc_obj:update_value(Robj, ValueGen()),
case riakc_pb_socket:put(State#state.pid, Robj2, [{w, State#state.w},
{dw, State#state.dw}]) of
ok ->
{ok, State};
{error, Reason} ->
{error, Reason, State}
end;
{error, notfound} ->
Robj0 = riakc_obj:new(State#state.bucket, KeyGen()),
Robj = riakc_obj:update_value(Robj0, ValueGen()),
case riakc_pb_socket:put(State#state.pid, Robj, [{w, State#state.w},
{dw, State#state.dw}]) of
ok ->
{ok, State};
{error, Reason} ->
{error, Reason, State}
end
end;
run(delete, KeyGen, _ValueGen, State) ->
%% Pass on rw
case riakc_pb_socket:delete(State#state.pid, State#state.bucket, KeyGen(),
[{rw, State#state.rw}]) of
ok ->
{ok, State};
{error, notfound} ->
{ok, State};
{error, Reason} ->
{error, Reason, State}
end.


%% ====================================================================
%% Internal functions
%% ====================================================================

0 comments on commit 0f0a670

Please sign in to comment.