Skip to content

Commit

Permalink
Update secondary indexes to use the core coverage code.
Browse files Browse the repository at this point in the history
Fixes: az528

Add an index fsm to handle secondary index queries directly instead
of riding on key listing and update riak_client:get_index to call
the index fsm. Also add necessary plumbing to the index backend
modules and riak_kv_vnode.
  • Loading branch information
kellymclaughlin committed Aug 5, 2011
1 parent 29bf149 commit 84b15e6
Show file tree
Hide file tree
Showing 11 changed files with 311 additions and 59 deletions.
2 changes: 2 additions & 0 deletions ebin/riak_kv.app
Expand Up @@ -33,6 +33,8 @@
riak_kv_get_fsm,
riak_kv_get_fsm_sup,
riak_kv_index_backend,
riak_kv_index_fsm,
riak_kv_index_fsm_sup,
riak_kv_js_manager,
riak_kv_js_sup,
riak_kv_js_vm,
Expand Down
6 changes: 6 additions & 0 deletions include/riak_kv_vnode.hrl
Expand Up @@ -32,6 +32,11 @@
-record(riak_kv_listbuckets_req_v1, {
item_filter :: function()}).

-record(riak_kv_index_req_v1, {
bucket :: binary() | tuple(),
item_filter :: function(),
qry :: riak_index:query_def()}).

-record(riak_kv_delete_req_v1, {
bkey :: {binary(), binary()},
req_id :: non_neg_integer()}).
Expand All @@ -51,6 +56,7 @@
-define(KV_MGET_REQ, #riak_kv_mget_req_v1).
-define(KV_LISTBUCKETS_REQ, #riak_kv_listbuckets_req_v1).
-define(KV_LISTKEYS_REQ, #riak_kv_listkeys_req_v3).
-define(KV_INDEX_REQ, #riak_kv_index_req_v1).
-define(KV_DELETE_REQ, #riak_kv_delete_req_v1).
-define(KV_MAP_REQ, #riak_kv_map_req_v1).
-define(KV_VCLOCK_REQ, #riak_kv_vclock_req_v1).
35 changes: 31 additions & 4 deletions src/riak_client.erl
Expand Up @@ -41,7 +41,7 @@
-export([filter_buckets/1]).
-export([filter_keys/2,filter_keys/3]).
-export([list_buckets/0,list_buckets/2]).
-export([get_index/2]).
-export([get_index/3,get_index/2]).
-export([set_bucket/2,get_bucket/1]).
-export([reload_all/1]).
-export([remove_from_cluster/1]).
Expand Down Expand Up @@ -649,15 +649,29 @@ filter_buckets(Fun) ->
list_buckets(Fun, ?DEFAULT_TIMEOUT)
end.

%% @spec get_index(Bucket :: binary(), Query :: riak_index:query_def()) ->
%% @spec get_index(Bucket :: binary(),
%% Query :: riak_index:query_def()) ->
%% {ok, [Key :: riak_object:key()]} |
%% {error, timeout} |
%% {error, Err :: term()}.
%%
%% @doc Run the provided index query.
get_index(Bucket, Query) ->
FakeBucket = {index_query, Bucket, Query},
list_keys({FakeBucket, []}).
get_index(Bucket, Query, ?DEFAULT_TIMEOUT).

%% @spec get_index(Bucket :: binary(),
%% Query :: riak_index:query_def(),
%% TimeoutMillisecs :: integer()) ->
%% {ok, [Key :: riak_object:key()]} |
%% {error, timeout} |
%% {error, Err :: term()}.
%%
%% @doc Run the provided index query.
get_index(Bucket, Query, Timeout) ->
Me = self(),
ReqId = mk_reqid(),
riak_kv_index_fsm_sup:start_index_fsm(Node, [{raw, ReqId, Me}, [Bucket, none, Query, Timeout, plain]]),
wait_for_query_results(ReqId, Timeout).

%% @spec set_bucket(riak_object:bucket(), [BucketProp :: {atom(),term()}]) -> ok
%% @doc Set the given properties for Bucket.
Expand Down Expand Up @@ -734,6 +748,19 @@ wait_for_listbuckets(ReqId, Timeout) ->
{error, timeout}
end.

%% @private
wait_for_query_results(ReqId, Timeout) ->
wait_for_query_results(ReqId, Timeout, []).
%% @private
wait_for_query_results(ReqId, Timeout, Acc) ->
receive
{ReqId, done} -> {ok, lists:flatten(Acc)};
{ReqId,{results, Res}} -> wait_for_query_results(ReqId, Timeout, [Res | Acc]);
{ReqId, Error} -> {error, Error}
after Timeout ->
{error, timeout, Acc}
end.

add_inputs(_FlowPid, []) ->
ok;
add_inputs(FlowPid, Inputs) when length(Inputs) < 100 ->
Expand Down
10 changes: 5 additions & 5 deletions src/riak_index_backend.erl
@@ -1,6 +1,6 @@
%% -------------------------------------------------------------------
%%
%% riak_index_backend: Riak Index backend behaviour
%% riak_index_backend: Riak Index backend behaviour
%%
%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
%%
Expand Down Expand Up @@ -32,10 +32,10 @@
-spec behaviour_info(atom()) -> 'undefined' | [{atom(), arity()}].
behaviour_info(callbacks) ->
[{start,2}, % (Partition, Config)
{stop,1}, % (State)
{stop,1}, % (State)
{index, 2}, % (State, Postings)
{delete, 2}, % (State, Postings)
{lookup_sync, 4}, % (State, Index, Field, Term)
{lookup_sync, 4}, % (State, Index, Field, Term)
{fold_index, 6}, % (State, Index, Query, SKFun, Acc, FinalFun)
{drop,1}, % (State)
{callback,3}]; % (State, Ref, Msg) ->
Expand Down Expand Up @@ -76,15 +76,15 @@ standard_test(BackendMod, Config) ->
{<<"bucket">>, "field", "term2", "value_f", [], 1}
],
BackendMod:index(State, Postings2),

%% Retrieve what we just indexed...
Results2 = [
{"value_d", [{"field", "term2"}]},
{"value_e", [{"field", "term2"}]},
{"value_f", [{"field", "term2"}]}
],
?assertEqual(Results2, BackendMod:lookup_sync(State, <<"bucket">>, "field", "term2")),

%% Delete some postings...
Postings3 = [
{<<"bucket">>, "field", "term1", "value_a", [], 2},
Expand Down
1 change: 0 additions & 1 deletion src/riak_index_mi_backend.erl
Expand Up @@ -106,7 +106,6 @@ lookup_sync(State, Index, Field, Term) ->
FilterFun = fun(_Value, _Props) -> true end,
merge_index:lookup_sync(Pid, Index, Field, Term, FilterFun).


%% @spec fold_index(State :: state(), Bucket :: binary(), Query :: riak_index:query_elements(),
%% SKFun :: function(), Acc :: term(), FinalFun :: function()) -> term().
fold_index(State, Index, Query, SKFun, Acc, FinalFun) ->
Expand Down
36 changes: 26 additions & 10 deletions src/riak_kv_coverage_filter.erl
Expand Up @@ -34,14 +34,24 @@
-export([build_filter/3]).

-type bucket() :: binary().
-type filter() :: none | fun().
-type filter() :: none | fun((any()) -> boolean()) | [{atom(), atom(), [any()]}].
-type index() :: non_neg_integer().

%% ===================================================================
%% Public API
%% ===================================================================

%% @doc Build the list of filter functions for any required VNode indexes.
%%
%% The ItemFilterInput parameter can be the atom `none' to indicate
%% no filtering based on the request items, a function that returns
%% a boolean indicating whether or not the item should be included
%% in the final results, or a list of tuples of the form
%% {Module, Function, Args}. The latter is the form used by
%% MapReduce filters such as those in the {@link riak_kv_mapred_filters}
%% module. The list of tuples is composed into a function that is
%% used to determine if an item should be included in the final
%% result set.
-spec build_filter(bucket(), filter(), [index()]) -> filter().
build_filter(Bucket, ItemFilterInput, FilterVNode) ->
ItemFilter = build_item_filter(ItemFilterInput),
Expand Down Expand Up @@ -119,7 +129,7 @@ build_item_filter(none) ->
build_item_filter(FilterInput) when is_function(FilterInput) ->
FilterInput;
build_item_filter(FilterInput) ->
%% FilterInput is a list of MFA tuples
%% FilterInput is a list of {Module, Fun, Args} tuples
compose(FilterInput).


Expand All @@ -133,12 +143,18 @@ build_preflist_fun(Bucket, Ring) ->
compose([]) ->
none;
compose(Filters) ->
compose(Filters, fun(V) -> V end).

compose([], F0) -> F0;
compose([Filter1|Filters], F0) ->
{FilterMod, FilterFun, Args} = Filter1,
Fun1 = FilterMod:FilterFun(Args),
F1 = fun(CArgs) -> Fun1(F0(CArgs)) end,
compose(Filters, F1).
compose(Filters, []).

compose([], FilterFuns) ->
TruthFun =
fun(X) ->
X =:= true
end,
fun(Val) ->
lists:all(TruthFun, [FilterFun(Val) || FilterFun <- FilterFuns])
end;
compose([Filter | RestFilters], FilterFuns) ->
{FilterMod, FilterFun, Args} = Filter,
Fun = FilterMod:FilterFun(Args),
compose(RestFilters, [Fun | FilterFuns]).

0 comments on commit 84b15e6

Please sign in to comment.