From 0671c12fadbd235cdb5d50e5d541723040d002c9 Mon Sep 17 00:00:00 2001 From: Christopher Meiklejohn Date: Wed, 16 Jul 2014 15:46:29 -0400 Subject: [PATCH] Add a benchmark for concurrent map updates. Update a series of maps representing teams which are computed from cumulative updates to a series of individual player scores. Add and remove members of the team, while concurrent score changes are occurring. --- examples/teams-crdt-map.config | 26 +++++ src/basho_bench_driver_riakc_pb.erl | 144 ++++++++++++++++++++++++++++ 2 files changed, 170 insertions(+) create mode 100644 examples/teams-crdt-map.config diff --git a/examples/teams-crdt-map.config b/examples/teams-crdt-map.config new file mode 100644 index 000000000..3c7d50732 --- /dev/null +++ b/examples/teams-crdt-map.config @@ -0,0 +1,26 @@ +{mode,{rate,max}}. +{duration,10}. +{concurrent,150}. +{rng_seed,now}. + +%% This bucket type must be created and set to be datatype, maps. +{riakc_pb_bucket,{<<"maps">>,<<"testbucket">>}}. + +{key_generator, {uniform_int, 100}}. +{value_generator, {uniform_int, 1000}}. + +{operations,[{{game,completed},10}, + {{team,player,addition},3}, + {{team,player,removal},3}, + {{team,read},100}, + {{team,write},1}]}. + +{riakc_pb_ips,[{"riak101.aws",10017}, + {"riak102.aws",10017}, + {"riak103.aws",10017}, + {"riak104.aws",10017}, + {"riak105.aws",10017}]}. + +{riakc_pb_replies,default}. + +{driver,basho_bench_driver_riakc_pb}. diff --git a/src/basho_bench_driver_riakc_pb.erl b/src/basho_bench_driver_riakc_pb.erl index 1f9533056..f8f1c38f9 100644 --- a/src/basho_bench_driver_riakc_pb.erl +++ b/src/basho_bench_driver_riakc_pb.erl @@ -146,6 +146,150 @@ warn_bucket_mr_correctness(_) -> %% preload is specified, so no warning necessary ok. +%% Write information about the team. +run({team, write}, KeyGen, _ValueGen, State) -> + Key = integer_to_list(KeyGen()), + Result = riakc_pb_socket:modify_type(State#state.pid, + fun(M) -> + riakc_map:update( + {<<"name">>, register}, + fun(R) -> + riakc_register:set( + list_to_binary("Team " ++ Key), R) + end, M) + end, + State#state.bucket, Key, [create]), + case Result of + ok -> + lager:info("Team write succeeded."), + {ok, State}; + {ok, _} -> + lager:info("Team write succeeded."), + {ok, State}; + {error, Reason} -> + lager:info("Team write failed, error: ~p", [Reason]), + {error, Reason, State} + end; + +%% Read information about the team. +run({team, read}, KeyGen, ValueGen, State) -> + Key = integer_to_list(KeyGen()), + Options = [{r,2}, {notfound_ok, true}, {timeout, 5000}], + Result = riakc_pb_socket:fetch_type(State#state.pid, + State#state.bucket, + Key, + Options), + case Result of + {ok, _} -> + lager:info("Team read succeeded."), + {ok, State}; + {error, {notfound, _}} -> + lager:info("Team does not exist yet."), + run({team, write}, KeyGen, ValueGen, State); + {error, Reason} -> + lager:info("Team read failed, error: ~p", [Reason]), + {error, Reason, State} + end; + +%% Remove a player from the team. +run({team, player, removal}, KeyGen, ValueGen, State) -> + Key = integer_to_list(KeyGen()), + Options = [{r,2}, {notfound_ok, true}, {timeout, 5000}], + Result = riakc_pb_socket:fetch_type(State#state.pid, + State#state.bucket, + Key, + Options), + case Result of + {ok, M0} -> + M = riakc_map:value(M0), + Members = proplists:get_value({<<"members">>, set}, M, []), + case length(Members) > 0 of + true -> + Value = hd(Members), + lager:info("Team read succeeded"), + Result2 = riakc_pb_socket:modify_type(State#state.pid, + fun(M2) -> + riakc_map:update( + {<<"members">>, set}, + fun(R) -> + riakc_set:del_element( + Value, R) + end, M2) + end, + State#state.bucket, Key, [create]), + case Result2 of + ok -> + lager:info("Team player removal succeeded."), + {ok, State}; + {ok, _} -> + lager:info("Team player removal succeeded."), + {ok, State}; + {error, Reason} -> + lager:info("Team player removal failed, error: ~p", [Reason]), + {error, Reason, State} + end; + false -> + lager:info("Team player removal success, no members."), + {ok, State} + end; + {error, {notfound, _}} -> + lager:info("Team does not exist yet."), + run({team, write}, KeyGen, ValueGen, State); + {error, Reason} -> + lager:info("Team read failed, error: ~p", [Reason]), + {error, Reason, State} + end; + +%% Add a player to the team. +run({team, player, addition}, KeyGen, ValueGen, State) -> + Key = integer_to_list(KeyGen()), + Value = "Team member " ++ integer_to_list(ValueGen()), + Result = riakc_pb_socket:modify_type(State#state.pid, + fun(M) -> + riakc_map:update( + {<<"members">>, set}, + fun(S) -> + riakc_set:add_element(list_to_binary(Value), S) + end, M) + end, + State#state.bucket, Key, [create]), + case Result of + ok -> + lager:info("Team player addition succeeded."), + {ok, State}; + {ok, _} -> + lager:info("Team player addition succeeded."), + {ok, State}; + {error, Reason} -> + lager:info("Team player addition failed, error: ~p", [Reason]), + {error, Reason, State} + end; + +%% Mark a game as completed. +run({game, completed}, KeyGen, ValueGen, State) -> + Key = integer_to_list(KeyGen()), + Value = ValueGen(), + Result = riakc_pb_socket:modify_type(State#state.pid, + fun(M) -> + riakc_map:update( + {<<"score">>, counter}, + fun(C) -> + riakc_counter:increment(Value, C) + end, M) + end, + State#state.bucket, Key, [create]), + case Result of + ok -> + lager:info("Score change succeeded."), + {ok, State}; + {ok, _} -> + lager:info("Score change succeeded."), + {ok, State}; + {error, Reason} -> + lager:info("Score change failed, error: ~p", [Reason]), + {error, Reason, State} + end; + run(get, KeyGen, _ValueGen, State) -> Key = KeyGen(), case riakc_pb_socket:get(State#state.pid, State#state.bucket, Key,