Permalink
Browse files

Update to use new riak_core capability system

  • Loading branch information...
1 parent 0ce405d commit 07a8c0a6be399746241eaa7bcf707b43936bcb61 @jtuple jtuple committed Apr 27, 2012
Showing with 59 additions and 26 deletions.
  1. +11 −2 src/riak.erl
  2. +6 −6 src/riak_client.erl
  3. +36 −0 src/riak_kv_app.erl
  4. +1 −1 src/riak_kv_keys_fsm.erl
  5. +2 −2 src/riak_kv_mrc_pipe.erl
  6. +2 −2 src/riak_kv_pb_socket.erl
  7. +1 −13 src/riak_kv_util.erl
View
13 src/riak.erl
@@ -107,8 +107,7 @@ client_connect(Node, ClientId= <<_:32>>) ->
%% or the new vnode based vclocks should be used.
%% N.B. all nodes must be upgraded to 1.0 before
%% this can be enabled.
- case rpc:call(Node, app_helper, get_env,
- [riak_kv, vnode_vclocks, false]) of
+ case vnode_vclocks(Node) of
{badrpc, _Reason} ->
{error, {could_not_reach_node, Node}};
true ->
@@ -122,6 +121,16 @@ client_connect(Node, undefined) ->
client_connect(Node, Other) ->
client_connect(Node, <<(erlang:phash2(Other)):32>>).
+vnode_vclocks(Node) ->
+ case rpc:call(Node, riak_core_capability, get,
+ [{riak_kv, vnode_vclocks}]) of
+ {badrpc, {'EXIT', {undef, _}}} ->
+ rpc:call(Node, app_helper, get_env,
+ [riak_kv, vnode_vclocks, false]);
+ Result ->
+ Result
+ end.
+
%%
%% @doc Validate that a specified node is accessible and functional.
%%
View
12 src/riak_client.erl
@@ -479,7 +479,7 @@ list_keys(Bucket, Timeout, ErrorTolerance) when is_integer(Timeout) ->
%% Key lists are updated asynchronously, so this may be slightly
%% out of date if called immediately after a put or delete.
list_keys(Bucket, Filter, Timeout) ->
- case app_helper:get_env(riak_kv, legacy_keylisting) of
+ case riak_core_capability:get({riak_kv, legacy_keylisting}) of
true ->
%% @TODO This code is only here to support
%% rolling upgrades and will be removed.
@@ -531,7 +531,7 @@ stream_list_keys(Bucket0, Timeout, ErrorTolerance, Client, ClientType) ->
%% If ClientType is set to 'mapred' instead of 'plain', then the
%% messages will be sent in the form of a MR input stream.
stream_list_keys(Input, Timeout, Client, ClientType) when is_pid(Client) ->
- case app_helper:get_env(riak_kv, legacy_keylisting) of
+ case riak_core_capability:get({riak_kv, legacy_keylisting}) of
true ->
%% @TODO This code is only here to support
%% rolling upgrades and will be removed.
@@ -578,7 +578,7 @@ stream_list_keys(Bucket, Timeout, ErrorTolerance, Client) ->
%% out of date if called immediately after a put or delete.
%% @equiv filter_keys(Bucket, Fun, default_timeout())
filter_keys(Bucket, Fun) ->
- case app_helper:get_env(riak_kv, legacy_keylisting) of
+ case riak_core_capability:get({riak_kv, legacy_keylisting}) of
true ->
%% @TODO This code is only here to support
%% rolling upgrades and will be removed.
@@ -596,7 +596,7 @@ filter_keys(Bucket, Fun) ->
%% Key lists are updated asynchronously, so this may be slightly
%% out of date if called immediately after a put or delete.
filter_keys(Bucket, Fun, Timeout) ->
- case app_helper:get_env(riak_kv, legacy_keylisting) of
+ case riak_core_capability:get({riak_kv, legacy_keylisting}) of
true ->
%% @TODO This code is only here to support
%% rolling upgrades and will be removed.
@@ -628,7 +628,7 @@ list_buckets() ->
%% either adds the first key or removes the last remaining key from
%% a bucket.
list_buckets(Filter, Timeout) ->
- case app_helper:get_env(riak_kv, legacy_keylisting) of
+ case riak_core_capability:get({riak_kv, legacy_keylisting}) of
true ->
%% @TODO This code is only here to support
%% rolling upgrades and will be removed.
@@ -646,7 +646,7 @@ list_buckets(Filter, Timeout) ->
%% {error, Err :: term()}
%% @doc Return a list of filtered buckets.
filter_buckets(Fun) ->
- case app_helper:get_env(riak_kv, legacy_keylisting) of
+ case riak_core_capability:get({riak_kv, legacy_keylisting}) of
true ->
%% @TODO This code is only here to support
%% rolling upgrades and will be removed.
View
36 src/riak_kv_app.erl
@@ -82,6 +82,42 @@ start(_Type, _StartArgs) ->
%% Spin up supervisor
case riak_kv_sup:start_link() of
{ok, Pid} ->
+ %% Register capabilities
+ riak_core_capability:register({riak_kv, vnode_vclocks},
+ [true, false],
+ false,
+ {riak_kv,
+ vnode_vclocks,
+ [{true, true}, {false, false}]}),
+
+ riak_core_capability:register({riak_kv, legacy_keylisting},
+ [false, true],
+ true,
+ {riak_kv,
+ legacy_keylisting,
+ [{true, true}, {false, false}]}),
+
+ riak_core_capability:register({riak_kv, listkeys_backpressure},
+ [true, false],
+ false,
+ {riak_kv,
+ listkeys_backpressure,
+ [{true, true}, {false, false}]}),
+
+ riak_core_capability:register({riak_kv, mapred_system},
+ [pipe, legacy],
+ legacy,
+ {riak_kv,
+ mapred_system,
+ [{pipe, pipe}, {legacy, legacy}]}),
+
+ riak_core_capability:register({riak_kv, mapred_2i_pipe},
+ [true, false],
+ false,
+ {riak_kv,
+ mapred_2i_pipe,
+ [{true, true}, {false, false}]}),
+
%% Go ahead and mark the riak_kv service as up in the node watcher.
%% The riak_core_ring_handler blocks until all vnodes have been started
%% synchronously.
View
2 src/riak_kv_keys_fsm.erl
@@ -58,7 +58,7 @@
%% environment.
-spec use_ack_backpressure() -> boolean().
use_ack_backpressure() ->
- app_helper:get_env(riak_kv, listkeys_backpressure) == true.
+ riak_core_capability:get({riak_kv, listkeys_backpressure}) == true.
%% @doc Construct the correct listkeys command record.
-spec req(binary(), term()) -> term().
View
4 src/riak_kv_mrc_pipe.erl
@@ -528,7 +528,7 @@ send_inputs(Pipe, {Bucket, FilterExprs}, Timeout) ->
end;
send_inputs(Pipe, {index, Bucket, Index, Key}, Timeout) ->
Query = {eq, Index, Key},
- case app_helper:get_env(riak_kv, mapred_2i_pipe, false) of
+ case riak_core_capability:get({riak_kv, mapred_2i_pipe}) of
true ->
riak_kv_pipe_index:queue_existing_pipe(
Pipe, Bucket, Query, Timeout);
@@ -540,7 +540,7 @@ send_inputs(Pipe, {index, Bucket, Index, Key}, Timeout) ->
end;
send_inputs(Pipe, {index, Bucket, Index, StartKey, EndKey}, Timeout) ->
Query = {range, Index, StartKey, EndKey},
- case app_helper:get_env(riak_kv, mapred_2i_pipe, false) of
+ case riak_core_capability:get({riak_kv, mapred_2i_pipe}) of
true ->
riak_kv_pipe_index:queue_existing_pipe(
Pipe, Bucket, Query, Timeout);
View
4 src/riak_kv_pb_socket.erl
@@ -243,15 +243,15 @@ process_message(rpbpingreq, State) ->
send_msg(rpbpingresp, State);
process_message(rpbgetclientidreq, #state{client=C, client_id=CID} = State) ->
- ClientId = case app_helper:get_env(riak_kv, vnode_vclocks, false) of
+ ClientId = case riak_core_capability:get({riak_kv, vnode_vclocks}) of
true -> CID;
false -> C:get_client_id()
end,
Resp = #rpbgetclientidresp{client_id = ClientId},
send_msg(Resp, State);
process_message(#rpbsetclientidreq{client_id = ClientId}, State) ->
- NewState = case app_helper:get_env(riak_kv, vnode_vclocks, false) of
+ NewState = case riak_core_capability:get({riak_kv, vnode_vclocks}) of
true -> State#state{client_id=ClientId};
false ->
{ok, C} = riak:local_client(ClientId),
View
14 src/riak_kv_util.erl
@@ -153,19 +153,7 @@ normalize_rw_value(_, _) -> error.
%% application's environment.
-spec mapred_system() -> pipe | legacy.
mapred_system() ->
- case app_helper:get_env(riak_kv, mapred_system, legacy) of
- pipe -> pipe;
- legacy -> legacy;
- Other ->
- error_logger:warning_msg(
- "Unknown value for riak_kv:mapred_system:~n ~p~n"
- "Defaulting to 'legacy'.",
- [Other]),
- %% override user's choice here so that warning doesn't
- %% print repeatedly in the log
- application:set_env(riak_kv, mapred_system, legacy),
- legacy
- end.
+ riak_core_capability:get({riak_kv, mapred_system}).
%% ===================================================================

0 comments on commit 07a8c0a

Please sign in to comment.