Skip to content

Commit

Permalink
Remove legacy_keylisting references and deprecated key listing functi…
Browse files Browse the repository at this point in the history
…ons from riak_kv_app and riak_client.
  • Loading branch information
kellymclaughlin committed Nov 14, 2012
1 parent 28c6bdf commit d0bd3f9
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 127 deletions.
160 changes: 41 additions & 119 deletions src/riak_client.erl
Expand Up @@ -31,7 +31,7 @@
-export([delete_vclock/3,delete_vclock/4,delete_vclock/5]).
-export([list_keys/1,list_keys/2,list_keys/3]).
-export([stream_list_keys/1,stream_list_keys/2,stream_list_keys/3,
stream_list_keys/4,stream_list_keys/5]).
stream_list_keys/4]).
-export([filter_buckets/1]).
-export([filter_keys/2,filter_keys/3]).
-export([list_buckets/0,list_buckets/2]).
Expand Down Expand Up @@ -60,7 +60,7 @@
%% @doc Fetch the object at Bucket/Key. Return a value as soon as the default
%% R-value for the nodes have responded with a value or error.
%% @equiv get(Bucket, Key, R, default_timeout())
get(Bucket, Key) ->
get(Bucket, Key) ->
get(Bucket, Key, []).

%% @spec get(riak_object:bucket(), riak_object:key(), options()) ->
Expand Down Expand Up @@ -91,7 +91,7 @@ get(Bucket, Key, Options) when is_list(Options) ->
%% @doc Fetch the object at Bucket/Key. Return a value as soon as R
%% nodes have responded with a value or error.
%% @equiv get(Bucket, Key, R, default_timeout())
get(Bucket, Key, R) ->
get(Bucket, Key, R) ->
get(Bucket, Key, [{r, R}]).

%% @spec get(riak_object:bucket(), riak_object:key(), R :: integer(),
Expand Down Expand Up @@ -304,34 +304,18 @@ list_keys(Bucket) ->
list_keys(Bucket, Timeout) ->
list_keys(Bucket, none, Timeout).

%% @deprecated Only in place for backwards compatibility.
list_keys(Bucket, Timeout, ErrorTolerance) when is_integer(Timeout) ->
%% @TODO This code is only here to support
%% rolling upgrades and will be removed.
Me = self(),
ReqId = mk_reqid(),
FSM_Timeout = trunc(Timeout / 8),
riak_kv_keys_fsm_legacy_sup:start_keys_fsm(Node, [ReqId, Bucket, FSM_Timeout, ErrorTolerance, Me]),
wait_for_listkeys(ReqId, Timeout);
%% @spec list_keys(riak_object:bucket(), TimeoutMillisecs :: integer()) ->
%% {ok, [Key :: riak_object:key()]} |
%% {error, timeout} |
%% {error, Err :: term()}
%% @doc List the keys known to be present in Bucket.
%% Key lists are updated asynchronously, so this may be slightly
%% out of date if called immediately after a put or delete.
list_keys(Bucket, Filter, Timeout) ->
case riak_core_capability:get({riak_kv, legacy_keylisting}, true) of
true ->
%% @TODO This code is only here to support
%% rolling upgrades and will be removed.
list_keys(Bucket, Timeout, ?DEFAULT_ERRTOL);
_ ->
Me = self(),
ReqId = mk_reqid(),
riak_kv_keys_fsm_sup:start_keys_fsm(Node, [{raw, ReqId, Me}, [Bucket, Filter, Timeout]]),
wait_for_listkeys(ReqId, Timeout)
end.
list_keys(Bucket, Filter, Timeout) ->
Me = self(),
ReqId = mk_reqid(),
riak_kv_keys_fsm_sup:start_keys_fsm(Node, [{raw, ReqId, Me}, [Bucket, Filter, Timeout, plain]]),
wait_for_listkeys(ReqId, Timeout).

stream_list_keys(Bucket) ->
stream_list_keys(Bucket, ?DEFAULT_TIMEOUT).
Expand All @@ -341,21 +325,7 @@ stream_list_keys(Bucket, Timeout) ->
stream_list_keys(Bucket, Timeout, Me).

stream_list_keys(Bucket, Timeout, Client) when is_pid(Client) ->
stream_list_keys(Bucket, Timeout, Client, plain);
%% @deprecated Only in place for backwards compatibility.
stream_list_keys(Bucket, Timeout, _) ->
stream_list_keys(Bucket, Timeout).

%% @deprecated Only in place for backwards compatibility.
stream_list_keys(Bucket0, Timeout, ErrorTolerance, Client, _ClientType) ->
ReqId = mk_reqid(),
case build_filter(Bucket0) of
{ok, Filter} ->
riak_kv_keys_fsm_legacy_sup:start_keys_fsm(Node, [ReqId, Filter, Timeout, ErrorTolerance, Client]),
{ok, ReqId};
Error ->
Error
end.
stream_list_keys(Bucket, Timeout, Client, plain).

%% @spec stream_list_keys(riak_object:bucket(),
%% TimeoutMillisecs :: integer(),
Expand All @@ -373,40 +343,32 @@ stream_list_keys(Bucket0, Timeout, ErrorTolerance, Client, _ClientType) ->
%% If ClientType is set to 'mapred' instead of 'plain', then the
%% messages will be sent in the form of a MR input stream.
stream_list_keys(Input, Timeout, Client, ClientType) when is_pid(Client) ->
case riak_core_capability:get({riak_kv, legacy_keylisting}, true) of
true ->
%% @TODO This code is only here to support
%% rolling upgrades and will be removed.
stream_list_keys(Input, Timeout, ?DEFAULT_ERRTOL, Client, ClientType);
_ ->
ReqId = mk_reqid(),
case Input of
{Bucket, FilterInput} ->
case riak_kv_mapred_filters:build_filter(FilterInput) of
{error, _Error} ->
{error, _Error};
{ok, FilterExprs} ->
riak_kv_keys_fsm_sup:start_keys_fsm(Node,
[{raw,
ReqId,
Client},
[Bucket,
FilterExprs,
Timeout]]),
{ok, ReqId}
end;
Bucket ->
riak_kv_keys_fsm_sup:start_keys_fsm(Node,
[{raw, ReqId, Client},
ReqId = mk_reqid(),
case Input of
{Bucket, FilterInput} ->
case riak_kv_mapred_filters:build_filter(FilterInput) of
{error, _Error} ->
{error, _Error};
{ok, FilterExprs} ->
riak_kv_keys_fsm_sup:start_keys_fsm(Node,
[{raw,
ReqId,
Client},
[Bucket,
none,
Timeout]]),
FilterExprs,
Timeout,
ClientType]]),
{ok, ReqId}
end
end;
%% @deprecated Only in place for backwards compatibility.
stream_list_keys(Bucket, Timeout, ErrorTolerance, Client) ->
stream_list_keys(Bucket, Timeout, ErrorTolerance, Client, plain).
end;
Bucket ->
riak_kv_keys_fsm_sup:start_keys_fsm(Node,
[{raw, ReqId, Client},
[Bucket,
none,
Timeout,
ClientType]]),
{ok, ReqId}
end.

%% @spec filter_keys(riak_object:bucket(), Fun :: function()) ->
%% {ok, [Key :: riak_object:key()]} |
Expand All @@ -418,14 +380,7 @@ stream_list_keys(Bucket, Timeout, ErrorTolerance, Client) ->
%% out of date if called immediately after a put or delete.
%% @equiv filter_keys(Bucket, Fun, default_timeout())
filter_keys(Bucket, Fun) ->
case riak_core_capability:get({riak_kv, legacy_keylisting}, true) of
true ->
%% @TODO This code is only here to support
%% rolling upgrades and will be removed.
list_keys({filter, Bucket, Fun}, ?DEFAULT_TIMEOUT*8);
_ ->
list_keys(Bucket, Fun, ?DEFAULT_TIMEOUT)
end.
list_keys(Bucket, Fun, ?DEFAULT_TIMEOUT).

%% @spec filter_keys(riak_object:bucket(), Fun :: function(), TimeoutMillisecs :: integer()) ->
%% {ok, [Key :: riak_object:key()]} |
Expand All @@ -436,14 +391,7 @@ filter_keys(Bucket, Fun) ->
%% Key lists are updated asynchronously, so this may be slightly
%% out of date if called immediately after a put or delete.
filter_keys(Bucket, Fun, Timeout) ->
case riak_core_capability:get({riak_kv, legacy_keylisting}, true) of
true ->
%% @TODO This code is only here to support
%% rolling upgrades and will be removed.
list_keys({filter, Bucket, Fun}, Timeout);
_ ->
list_keys(Bucket, Fun, Timeout)
end.
list_keys(Bucket, Fun, Timeout).

%% @spec list_buckets() ->
%% {ok, [Bucket :: riak_object:bucket()]} |
Expand All @@ -468,32 +416,18 @@ list_buckets() ->
%% either adds the first key or removes the last remaining key from
%% a bucket.
list_buckets(Filter, Timeout) ->
case riak_core_capability:get({riak_kv, legacy_keylisting}, true) of
true ->
%% @TODO This code is only here to support
%% rolling upgrades and will be removed.
list_keys('_', Timeout);
_ ->
Me = self(),
ReqId = mk_reqid(),
riak_kv_buckets_fsm_sup:start_buckets_fsm(Node, [{raw, ReqId, Me}, [Filter, Timeout]]),
wait_for_listbuckets(ReqId, Timeout)
end.
Me = self(),
ReqId = mk_reqid(),
riak_kv_buckets_fsm_sup:start_buckets_fsm(Node, [{raw, ReqId, Me}, [Filter, Timeout, plain]]),
wait_for_listbuckets(ReqId, Timeout).

%% @spec filter_buckets(Fun :: function()) ->
%% {ok, [Bucket :: riak_object:bucket()]} |
%% {error, timeout} |
%% {error, Err :: term()}
%% @doc Return a list of filtered buckets.
filter_buckets(Fun) ->
case riak_core_capability:get({riak_kv, legacy_keylisting}, true) of
true ->
%% @TODO This code is only here to support
%% rolling upgrades and will be removed.
list_keys('_', ?DEFAULT_TIMEOUT);
_ ->
list_buckets(Fun, ?DEFAULT_TIMEOUT)
end.
list_buckets(Fun, ?DEFAULT_TIMEOUT).

%% @spec get_index(Bucket :: binary(),
%% Query :: riak_index:query_def()) ->
Expand Down Expand Up @@ -615,7 +549,7 @@ wait_for_listkeys(ReqId,Timeout,Acc) ->

%% @private
wait_for_listbuckets(ReqId, Timeout) ->
receive
receive
{ReqId,{buckets, Buckets}} -> {ok, Buckets};
{ReqId, Error} -> {error, Error}
after Timeout ->
Expand All @@ -635,18 +569,6 @@ wait_for_query_results(ReqId, Timeout, Acc) ->
{error, timeout}
end.

%% @deprecated This function is only here to support
%% rolling upgrades and will be removed.
build_filter({Bucket, Exprs}) ->
case riak_kv_mapred_filters:build_filter(Exprs) of
{ok, Filters} ->
{ok, {Bucket, Filters}};
Error ->
Error
end;
build_filter(Bucket) when is_binary(Bucket) ->
{ok, {Bucket, []}}.

recv_timeout(Options) ->
case proplists:get_value(recv_timeout, Options) of
undefined ->
Expand Down
8 changes: 0 additions & 8 deletions src/riak_kv_app.erl
Expand Up @@ -97,13 +97,6 @@ start(_Type, _StartArgs) ->
vnode_vclocks,
[{true, true}, {false, false}]}),

riak_core_capability:register({riak_kv, legacy_keylisting},
[false, true],
true,
{riak_kv,
legacy_keylisting,
[{true, true}, {false, false}]}),

riak_core_capability:register({riak_kv, listkeys_backpressure},
[true, false],
false,
Expand Down Expand Up @@ -178,4 +171,3 @@ check_epoch() ->
"but your system says the epoch is ~p", [Epoch]),
ok
end.

0 comments on commit d0bd3f9

Please sign in to comment.