Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

micro optimization #101

Merged
merged 4 commits into from

2 participants

@rzezeski

This PR includes two very small micro optimizations. More importantly it cleans up code around extracting scoring properties and exposes preflist calculation which is handy for debugging/profiling at the console.

rzezeski added some commits
@rzezeski rzezeski Remove trailing space 7a7b6e3
@rzezeski rzezeski Add better scoring property extraction
This avoids calling `lists:flatten` on unnecessairly deeps lists.  It
also has the added bonus on keeping op logic where it belongs -- in
the ops themselves.

Below are the relevant fprof results.  Basically, in the case tested
(a single term query, which is best case for this function), the new
code cut the number of function calls to a third of the original.

    {ok, Ops} = RC:parse_query(<<"tweets">>, <<"created_at:20100113T032128">>).
    fprof:apply(RC, get_scoring_info, [Ops], [start]).

Before
------

%                                               CNT       ACC       OWN
[{ totals,                                      152,    3.082,    3.082}].  %%%

{[{{riak_search_client,get_scoring_info2,2},      1,    2.599,    0.027}],
 { {riak_search_client,get_scoring_props,2},      1,    2.599,    0.027},     %
 [{{lists,flatten,1},                             1,    1.289,    0.008},
  {{riak_search_client,get_scoring_props_1,2},    1,    1.283,    0.005}]}.

After
-----

%                                               CNT       ACC       OWN
[{ totals,                                       48,    0.694,    0.667}].  %%%

{[{{fprof,apply_start_stop,4},                    1,    0.672,    0.038}],
 { {riak_search_client,get_scoring_info,2},       1,    0.672,    0.038},     %
 [{{riak_search_op,extract_scoring_props,1},      1,    0.522,    0.012},
  {{lists,flatten,1},                             1,    0.064,    0.018},
  {{riak_search_client,'-get_scoring_info/2-lc$^0/1-0-',1},   1,    0.025,    0.011},
  {{lists,foldl,3},                               1,    0.015,    0.004},
  {{lists,sum,1},                                 1,    0.007,    0.002},
  {{math,pow,2},                                  1,    0.001,    0.001}]}.
6a8e211
@rzezeski rzezeski Don't call `ensure_loaded`
Search nodes shouldn't see ops they don't understand.  They code with
a mixed cluster but this code won't help.  It's cost, added latency
per request, is not worth what it provides.
6f418c9
@rzezeski rzezeski Centralize and expose preflist calculation
This will help when testing/debugging/benchmarking.  You need this
info if you want to manually target a specific index instance.
7239503
@rzezeski rzezeski was assigned
@russelldb russelldb was assigned
@russelldb russelldb commented on the diff
src/riak_search_op.erl
@@ -42,11 +49,4 @@ chain_op(Op, OutputPid, Ref, SearchState) ->
op_to_module(Op) ->
ModuleString = "riak_search_op_" ++ atom_to_list(element(1, Op)),
@russelldb Owner

You've done away with the code to ensure the Mod is loaded, is this 'cos you consider it defensive programming? Doh! Just read the commit comment. Scratch this, I know why you did away with it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@russelldb russelldb commented on the diff
src/riak_search_op.erl
@@ -42,11 +49,4 @@ chain_op(Op, OutputPid, Ref, SearchState) ->
op_to_module(Op) ->
ModuleString = "riak_search_op_" ++ atom_to_list(element(1, Op)),
- Module = list_to_atom(ModuleString),
- case code:ensure_loaded(Module) of
- {module, Module} ->
- Module;
- {error, _} ->
- ?PRINT({unknown_op, Op}),
- throw({unknown_op, Op})
- end.
+ list_to_atom(ModuleString).
@russelldb Owner

Won't 'list_to_existing_atom/1' do here? If the module is loaded then it will exist as an atom and any crash/error from a bad Op name will be closer to the cause.

Yea list_to_existing_atom would probably be the right thing. I plan to refactor the query engine to realize this stuff one time up-front so I'll keep this in the back of my head.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@russelldb

I don't understand why.

op_range_sized is a leaf on the operator tree (like op_term) so it must produce some value for scoring props unlike the inner nodes that just recursively call extract_scoring_props/1 for their nested ops.

The reason for {0, 1} is to use the addition/multiplication identities so that the NumDocs and SumOfSquaredWeights are not affected by a range op in the following code: https://github.com/basho/riak_search/blob/master/src/riak_search_client.erl#L426

However, it looks like my change will produce a different value for NumTerms since the {0, 1} will not be filtered out by the lists:flatten whereas the old code just inserted an empty list. But a quick glance at the code shows no use of num_terms anywhere.

Did I answer your question or confuse matters more?

@russelldb
Owner

A fair few white space only changes worked in with the functional changes. Built, ran, tests pass +1 to merge.

@rzezeski rzezeski was assigned
@rzezeski rzezeski merged commit 7239503 into basho:master
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Mar 3, 2012
  1. @rzezeski

    Remove trailing space

    rzezeski authored
Commits on Mar 8, 2012
  1. @rzezeski

    Add better scoring property extraction

    rzezeski authored
    This avoids calling `lists:flatten` on unnecessairly deeps lists.  It
    also has the added bonus on keeping op logic where it belongs -- in
    the ops themselves.
    
    Below are the relevant fprof results.  Basically, in the case tested
    (a single term query, which is best case for this function), the new
    code cut the number of function calls to a third of the original.
    
        {ok, Ops} = RC:parse_query(<<"tweets">>, <<"created_at:20100113T032128">>).
        fprof:apply(RC, get_scoring_info, [Ops], [start]).
    
    Before
    ------
    
    %                                               CNT       ACC       OWN
    [{ totals,                                      152,    3.082,    3.082}].  %%%
    
    {[{{riak_search_client,get_scoring_info2,2},      1,    2.599,    0.027}],
     { {riak_search_client,get_scoring_props,2},      1,    2.599,    0.027},     %
     [{{lists,flatten,1},                             1,    1.289,    0.008},
      {{riak_search_client,get_scoring_props_1,2},    1,    1.283,    0.005}]}.
    
    After
    -----
    
    %                                               CNT       ACC       OWN
    [{ totals,                                       48,    0.694,    0.667}].  %%%
    
    {[{{fprof,apply_start_stop,4},                    1,    0.672,    0.038}],
     { {riak_search_client,get_scoring_info,2},       1,    0.672,    0.038},     %
     [{{riak_search_op,extract_scoring_props,1},      1,    0.522,    0.012},
      {{lists,flatten,1},                             1,    0.064,    0.018},
      {{riak_search_client,'-get_scoring_info/2-lc$^0/1-0-',1},   1,    0.025,    0.011},
      {{lists,foldl,3},                               1,    0.015,    0.004},
      {{lists,sum,1},                                 1,    0.007,    0.002},
      {{math,pow,2},                                  1,    0.001,    0.001}]}.
  2. @rzezeski

    Don't call `ensure_loaded`

    rzezeski authored
    Search nodes shouldn't see ops they don't understand.  They code with
    a mixed cluster but this code won't help.  It's cost, added latency
    per request, is not worth what it provides.
  3. @rzezeski

    Centralize and expose preflist calculation

    rzezeski authored
    This will help when testing/debugging/benchmarking.  You need this
    info if you want to manually target a specific index instance.
This page is out of date. Refresh to see the latest.
View
21 src/riak_search_client.erl
@@ -40,7 +40,7 @@
mapred(DefaultIndex, SearchQuery, SearchFilter, MRQuery, ResultTransformer, Timeout) ->
{ok, ReqID} = mapred_stream(DefaultIndex, SearchQuery, SearchFilter, MRQuery, self(), ResultTransformer, Timeout),
luke_flow:collect_output(ReqID, Timeout).
-
+
mapred_stream(DefaultIndex, SearchQuery, SearchFilter, MRQuery, ClientPid,
ResultTransformer, Timeout) ->
InputDef = {modfun, riak_search, mapred_search,
@@ -77,8 +77,8 @@ parse_filter(IndexOrSchema, Filter) ->
riak_search_utils:to_list(Filter)),
{ok, Ops}.
-
-
+
+
%% Run the Query, return the list of keys.
%% Timeout is in milliseconds.
%% Return the {Length, Results}.
@@ -365,7 +365,7 @@ stream_search(IndexOrSchema, QueryOps, FilterOps) ->
%% Get the total number of terms and weight in query...
{NumTerms, NumDocs, QueryNorm} = get_scoring_info(QueryOps),
-
+
%% Create the inline field filter fun...
FilterFun = fun(_Value, Props) ->
riak_search_inlines:passes_inlines(Schema, Props, FilterOps)
@@ -425,7 +425,7 @@ collect_result(#riak_search_ref{id=Id, inputcount=InputCount}=SearchRef, Timeout
%% http://lucene.apache.org/java/2_4_0/api/org/apache/lucene/search/Similarity.html
get_scoring_info(QueryOps) ->
%% Calculate num terms...
- ScoringProps = get_scoring_props(QueryOps),
+ ScoringProps = lists:flatten([riak_search_op:extract_scoring_props(QueryOps)]),
NumTerms = length(ScoringProps),
NumDocs = lists:sum([0] ++ [DocFrequency || {DocFrequency, _} <- ScoringProps]),
@@ -438,17 +438,6 @@ get_scoring_info(QueryOps) ->
QueryNorm = 1 / math:pow(SumOfSquaredWeights + 1, 0.5),
{NumTerms, NumDocs, QueryNorm}.
-get_scoring_props(Ops) ->
- lists:flatten([get_scoring_props_1(Ops)]).
-get_scoring_props_1(Ops) when is_list(Ops) ->
- [get_scoring_props_1(X) || X <- Ops];
-get_scoring_props_1(#term { doc_freq=DocFrequency, boost=Boost }) ->
- {DocFrequency, Boost};
-get_scoring_props_1(Op) when is_tuple(Op) ->
- get_scoring_props_1(tuple_to_list(Op));
-get_scoring_props_1(_) ->
- [].
-
sort_by_key(SearchRef, Results) ->
sort_results(SearchRef, Results, 3).
View
18 src/riak_search_op.erl
@@ -1,11 +1,12 @@
%% -------------------------------------------------------------------
%%
-%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
+%% Copyright (c) 2007-2012 Basho Technologies, Inc. All Rights Reserved.
%%
%% -------------------------------------------------------------------
-module(riak_search_op).
-export([
+ extract_scoring_props/1,
preplan/1,
preplan/2,
chain_op/4,
@@ -14,6 +15,12 @@
-include("riak_search.hrl").
-include_lib("lucene_parser/include/lucene_parser.hrl").
+extract_scoring_props(Ops) when is_list(Ops) ->
+ [extract_scoring_props(Op) || Op <- Ops];
+extract_scoring_props(Op) when is_tuple(Op) ->
+ Mod = riak_search_op:op_to_module(Op),
+ Mod:extract_scoring_props(Op).
+
preplan(Op) ->
preplan(Op, #search_state {}).
@@ -42,11 +49,4 @@ chain_op(Op, OutputPid, Ref, SearchState) ->
op_to_module(Op) ->
ModuleString = "riak_search_op_" ++ atom_to_list(element(1, Op)),
@russelldb Owner

You've done away with the code to ensure the Mod is loaded, is this 'cos you consider it defensive programming? Doh! Just read the commit comment. Scratch this, I know why you did away with it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
- Module = list_to_atom(ModuleString),
- case code:ensure_loaded(Module) of
- {module, Module} ->
- Module;
- {error, _} ->
- ?PRINT({unknown_op, Op}),
- throw({unknown_op, Op})
- end.
+ list_to_atom(ModuleString).
@russelldb Owner

Won't 'list_to_existing_atom/1' do here? If the module is loaded then it will exist as an atom and any crash/error from a bad Op name will be closer to the cause.

Yea list_to_existing_atom would probably be the right thing. I plan to refactor the query engine to realize this stuff one time up-front so I'll keep this in the back of my head.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
View
6 src/riak_search_op_group.erl
@@ -1,11 +1,12 @@
%% -------------------------------------------------------------------
%%
-%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
+%% Copyright (c) 2007-2012 Basho Technologies, Inc. All Rights Reserved.
%%
%% -------------------------------------------------------------------
-module(riak_search_op_group).
-export([
+ extract_scoring_props/1,
preplan/2,
chain_op/4
]).
@@ -13,6 +14,9 @@
-include("riak_search.hrl").
-include_lib("lucene_parser/include/lucene_parser.hrl").
+extract_scoring_props(Op) ->
+ riak_search_op:extract_scoring_props(Op#group.ops).
+
preplan(Op, State) ->
case Op#group.ops of
SingleOp when is_tuple(SingleOp) ->
View
14 src/riak_search_op_intersection.erl
@@ -1,11 +1,12 @@
%% -------------------------------------------------------------------
%%
-%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
+%% Copyright (c) 2007-2012 Basho Technologies, Inc. All Rights Reserved.
%%
%% -------------------------------------------------------------------
-module(riak_search_op_intersection).
-export([
+ extract_scoring_props/1,
preplan/2,
chain_op/4,
make_filter_iterator/1
@@ -14,9 +15,12 @@
-include_lib("lucene_parser/include/lucene_parser.hrl").
-define(INDEX_DOCID(Term), ({element(1, Term), element(2, Term)})).
+extract_scoring_props(Op) ->
+ riak_search_op:extract_scoring_props(Op#intersection.ops).
+
preplan(Op, State) ->
case riak_search_op:preplan(Op#intersection.ops, State) of
- [ChildOp] ->
+ [ChildOp] ->
ChildOp;
ChildOps ->
NewOp = Op#intersection { ops=ChildOps },
@@ -31,9 +35,9 @@ chain_op(Op, OutputPid, OutputRef, State) ->
Iterator2 = make_filter_iterator(Iterator1),
%% Spawn up pid to gather and send results...
- F = fun() ->
+ F = fun() ->
erlang:link(State#search_state.parent),
- riak_search_op_utils:gather_iterator_results(OutputPid, OutputRef, Iterator2())
+ riak_search_op_utils:gather_iterator_results(OutputPid, OutputRef, Iterator2())
end,
erlang:spawn_link(F),
@@ -44,7 +48,7 @@ chain_op(Op, OutputPid, OutputRef, State) ->
%% negated results.
make_filter_iterator(Iterator) ->
fun() -> filter_iterator(Iterator()) end.
-filter_iterator({_, Op, Iterator})
+filter_iterator({_, Op, Iterator})
when (is_tuple(Op) andalso is_record(Op, negation)) orelse Op == true ->
%% Term is negated, so skip it.
filter_iterator(Iterator());
View
4 src/riak_search_op_negation.erl
@@ -6,6 +6,7 @@
-module(riak_search_op_negation).
-export([
+ extract_scoring_props/1,
preplan/2,
chain_op/4
]).
@@ -13,6 +14,9 @@
-include("riak_search.hrl").
-include_lib("lucene_parser/include/lucene_parser.hrl").
+extract_scoring_props(Op) ->
+ riak_search_op:extract_scoring_props(Op#negation.op).
+
preplan(Op, State) ->
ChildOp = riak_search_op:preplan(Op#negation.op, State),
Op#negation { op=ChildOp }.
View
9 src/riak_search_op_node.erl
@@ -1,16 +1,20 @@
%% -------------------------------------------------------------------
%%
-%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
+%% Copyright (c) 2007-2012 Basho Technologies, Inc. All Rights Reserved.
%%
%% -------------------------------------------------------------------
-module(riak_search_op_node).
-export([
+ extract_scoring_props/1,
preplan/2,
chain_op/4
]).
-include("riak_search.hrl").
+extract_scoring_props(Op) ->
+ riak_search_op:extract_scoring_props(Op#node.ops).
+
preplan(Op, _State) ->
ChildOps = Op#node.ops,
Node = get_target_node(ChildOps),
@@ -40,7 +44,7 @@ get_target_node(Ops) ->
%% Sort in descending order by count...
F1 = fun({_, Weight1}, {_, Weight2}) ->
- Weight1 >= Weight2
+ Weight1 >= Weight2
end,
NodeWeights3 = lists:sort(F1, NodeWeights2),
@@ -58,7 +62,6 @@ get_target_node(Ops) ->
NodeWeights4 = lists:takewhile(F2, NodeWeights3),
{Node, _} = riak_search_utils:choose(NodeWeights4),
Node.
-
get_term_weights(Ops) ->
lists:flatten(get_term_weights_1(Ops)).
View
4 src/riak_search_op_proximity.erl
@@ -6,6 +6,7 @@
-module(riak_search_op_proximity).
-export([
+ extract_scoring_props/1,
preplan/2,
chain_op/4
]).
@@ -45,6 +46,9 @@
%%% that we find in a sublist, and then check the min and maximum
%%% value across *all* sublists. If max-min < N then we have a match.
+extract_scoring_props(Op) ->
+ riak_search_op:extract_scoring_props(Op#proximity.ops).
+
preplan(Op, State) ->
ChildOps = riak_search_op:preplan(Op#proximity.ops, State),
Op#proximity { ops=ChildOps }.
View
20 src/riak_search_op_range_sized.erl
@@ -6,6 +6,7 @@
-module(riak_search_op_range_sized).
-export([
+ extract_scoring_props/1,
preplan/2,
chain_op/4,
correct_term_order/2
@@ -16,13 +17,18 @@
-include_lib("lucene_parser/include/lucene_parser.hrl").
-define(INDEX_DOCID(Term), ({element(1, Term), element(2, Term)})).
-preplan(Op, _State) ->
+extract_scoring_props(_Op) ->
+ %% Don't alter the scoring props, return addition/multiplication
+ %% identities.
+ {0,1}.
+
+preplan(Op, _State) ->
Op.
chain_op(Op, OutputPid, OutputRef, State) ->
- F = fun() ->
+ F = fun() ->
erlang:link(State#search_state.parent),
- start_loop(Op, OutputPid, OutputRef, State)
+ start_loop(Op, OutputPid, OutputRef, State)
end,
erlang:spawn_link(F),
{ok, 1}.
@@ -31,7 +37,7 @@ start_loop(Op, OutputPid, OutputRef, State) ->
%% Figure out how many extra nodes to add to make the groups even.
IndexName = State#search_state.index,
{ok, Schema} = riak_search_config:get_schema(IndexName),
- NVal = Schema:n_val(),
+ NVal = Schema:n_val(),
{ok, Preflist} = riak_search_ring_utils:get_covering_preflist(NVal),
%% Create a #range_worker for each entry in the preflist...
@@ -45,9 +51,9 @@ start_loop(Op, OutputPid, OutputRef, State) ->
Iterator2 = make_dedup_iterator(Iterator1),
%% Spawn up pid to gather and send results...
- F = fun() ->
+ F = fun() ->
erlang:link(State#search_state.parent),
- riak_search_op_utils:gather_iterator_results(OutputPid, OutputRef, Iterator2())
+ riak_search_op_utils:gather_iterator_results(OutputPid, OutputRef, Iterator2())
end,
erlang:spawn_link(F),
@@ -57,7 +63,7 @@ start_loop(Op, OutputPid, OutputRef, State) ->
%% Given two range boundaries, make sure to return the smaller one
%% first.
-correct_term_order(From = {_, FromTerm}, To = {_, ToTerm}) ->
+correct_term_order(From = {_, FromTerm}, To = {_, ToTerm}) ->
case FromTerm =< ToTerm of
true -> {From, To};
false -> {To, From}
View
6 src/riak_search_op_scope.erl
@@ -9,6 +9,7 @@
-module(riak_search_op_scope).
-export([
+ extract_scoring_props/1,
preplan/2,
chain_op/4
]).
@@ -16,7 +17,10 @@
-include("riak_search.hrl").
-include_lib("lucene_parser/include/lucene_parser.hrl").
-preplan(Op, State) ->
+extract_scoring_props(Op) ->
+ riak_search_op:extract_scoring_props(Op#scope.ops).
+
+preplan(Op, State) ->
NewState = update_state(Op, State),
ChildOps = riak_search_op:preplan(#group { ops=Op#scope.ops }, NewState),
Op#scope { ops=ChildOps }.
View
31 src/riak_search_op_term.erl
@@ -6,6 +6,7 @@
-module(riak_search_op_term).
-export([
+ extract_scoring_props/1,
preplan/2,
chain_op/4,
default_filter/2
@@ -21,6 +22,9 @@
-include("riak_search.hrl").
-include_lib("lucene_parser/include/lucene_parser.hrl").
+extract_scoring_props(#term{doc_freq=Frequency, boost=Boost}) ->
+ {Frequency, Boost}.
+
%% Need term count for node planning. Used in #intersection and
%% #union. Calculate this during preplan based on where the most
%% results come from.
@@ -41,18 +45,18 @@ preplan(Op, State) ->
Weights2 = [{Node, Count} || {_, Node, Count} <- Weights1],
TotalCount = lists:sum([Count || {_, _, Count} <- Weights1]),
case length(Weights1) == 0 of
- true ->
+ true ->
throw({error, data_not_available, {IndexName, FieldName, Term}}),
DocFrequency = undefined; %% Make compiler happy.
- false ->
+ false ->
DocFrequency = TotalCount / length(Weights1)
end,
Op#term { weights=Weights2, doc_freq=DocFrequency }.
chain_op(Op, OutputPid, OutputRef, State) ->
- F = fun() ->
+ F = fun() ->
erlang:link(State#search_state.parent),
- start_loop(Op, OutputPid, OutputRef, State)
+ start_loop(Op, OutputPid, OutputRef, State)
end,
erlang:spawn_link(F),
{ok, 1}.
@@ -77,10 +81,7 @@ stream(Index, Field, Term, FilterFun) ->
%% Get the primary preflist, minus any down nodes. (We don't use
%% secondary nodes since we ultimately read results from one node
%% anyway.)
- DocIdx = riak_search_ring_utils:calc_partition(Index, Field, Term),
- {ok, Schema} = riak_search_config:get_schema(Index),
- NVal = Schema:n_val(),
- Preflist = get_preflist(DocIdx, NVal),
+ Preflist = riak_search_utils:preflist(Index, Field, Term),
%% Try to use the local node if possible. Otherwise choose
%% randomly.
@@ -99,11 +100,8 @@ info(Index, Field, Term) ->
%% Get the primary preflist, minus any down nodes. (We don't use
%% secondary nodes since we ultimately read results from one node
%% anyway.)
- DocIdx = riak_search_ring_utils:calc_partition(Index, Field, Term),
- {ok, Schema} = riak_search_config:get_schema(Index),
- NVal = Schema:n_val(),
- Preflist = get_preflist(DocIdx, NVal),
-
+ Preflist = riak_search_utils:preflist(Index, Field, Term),
+
{ok, Ref} = riak_search_vnode:info(Preflist, Index, Field, Term, self()),
{ok, Results} = riak_search_backend:collect_info_response(length(Preflist), Ref, []),
Results.
@@ -140,7 +138,7 @@ calculate_score(ScoringVars, Props) ->
TF = math:pow(Frequency, 0.5),
IDF = (1 + math:log(NumDocs/DocFrequency)),
Norm = DocFieldBoost,
-
+
Score = TF * math:pow(IDF, 2) * TermBoost * Norm,
ScoreList = case lists:keyfind(score, 1, Props) of
{score, OldScores} ->
@@ -149,8 +147,3 @@ calculate_score(ScoringVars, Props) ->
[Score]
end,
lists:keystore(score, 1, Props, {score, ScoreList}).
-
--spec get_preflist(binary(), pos_integer()) -> list().
-get_preflist(DocIdx, NVal) ->
- lists:map(fun({IdxNode, _}) -> IdxNode end,
- riak_core_apl:get_primary_apl(DocIdx, NVal, riak_search)).
View
12 src/riak_search_op_union.erl
@@ -1,12 +1,13 @@
%% -------------------------------------------------------------------
%%
-%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
+%% Copyright (c) 2007-2012 Basho Technologies, Inc. All Rights Reserved.
%%
%% -------------------------------------------------------------------
-module(riak_search_op_union).
-export([
+ extract_scoring_props/1,
preplan/2,
chain_op/4
]).
@@ -14,9 +15,12 @@
-include_lib("lucene_parser/include/lucene_parser.hrl").
-define(INDEX_DOCID(Term), ({element(1, Term), element(2, Term)})).
+extract_scoring_props(Op) ->
+ riak_search_op:extract_scoring_props(Op#union.ops).
+
preplan(Op, State) ->
case riak_search_op:preplan(Op#union.ops, State) of
- [ChildOp] ->
+ [ChildOp] ->
ChildOp;
ChildOps ->
NewOp = Op#union { ops=ChildOps },
@@ -31,9 +35,9 @@ chain_op(Op, OutputPid, OutputRef, State) ->
Iterator2 = riak_search_op_intersection:make_filter_iterator(Iterator1),
%% Spawn up pid to gather and send results...
- F = fun() ->
+ F = fun() ->
erlang:link(State#search_state.parent),
- riak_search_op_utils:gather_iterator_results(OutputPid, OutputRef, Iterator2())
+ riak_search_op_utils:gather_iterator_results(OutputPid, OutputRef, Iterator2())
end,
erlang:spawn_link(F),
View
26 src/riak_search_utils.erl
@@ -1,6 +1,6 @@
%% -------------------------------------------------------------------
%%
-%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
+%% Copyright (c) 2007-2012 Basho Technologies, Inc. All Rights Reserved.
%%
%% -------------------------------------------------------------------
@@ -8,6 +8,7 @@
-export([
combine_terms/2,
+ preflist/3,
to_atom/1,
to_binary/1,
to_utf8/1,
@@ -108,8 +109,8 @@ from_binary(L) ->
%% Return a key clock to use for revisioning IFTVPs
current_key_clock() ->
{MegaSeconds,Seconds,MilliSeconds}=erlang:now(),
- (MegaSeconds * 1000000000000) +
- (Seconds * 1000000) +
+ (MegaSeconds * 1000000000000) +
+ (Seconds * 1000000) +
MilliSeconds.
%% Choose a random element from the List.
@@ -123,7 +124,7 @@ choose(List) ->
coalesce(undefined, B) -> B;
coalesce(A, _) -> A.
-coalesce([undefined|T]) ->
+coalesce([undefined|T]) ->
coalesce(T);
coalesce([H|_]) ->
H;
@@ -157,7 +158,7 @@ ets_keys_1(Table, Key) ->
%% Given a binary, return an Erlang term.
consult(Binary) ->
case erl_scan:string(riak_search_utils:to_list(Binary)) of
- {ok, Tokens, _} ->
+ {ok, Tokens, _} ->
consult_1(Tokens);
Error ->
Error
@@ -177,6 +178,15 @@ consult_2(AST) ->
Error
end.
+%% @doc Get preflist for the given IFT.
+-spec preflist(index(), field(), s_term()) -> list().
+preflist(Index, Field, Term) ->
+ DocIdx = riak_search_ring_utils:calc_partition(Index, Field, Term),
+ {ok, Schema} = riak_search_config:get_schema(Index),
+ NVal = Schema:n_val(),
+ [IdxNode || {IdxNode, _} <- riak_core_apl:get_primary_apl(DocIdx,
+ NVal,
+ riak_search)].
%% Run a transform operation in parallel. Results are returned as a
%% list, ordering is not guaranteed in any way. This was implemented
@@ -202,7 +212,7 @@ ptransform(F, List, NumProcesses) ->
%% the number of processes. Batch size should be at least 1.
ListLength = length(List),
BatchSize = lists:max([1, ListLength div NumProcesses]),
-
+
%% Create a ref, used to prevent later interference.
Ref = make_ref(),
Pids = ptransform_spawn(F, List, ListLength, Ref, BatchSize, []),
@@ -215,7 +225,7 @@ ptransform_spawn(F, List, ListLength, Ref, BatchSize, Pids) when List /= [] ->
true ->
{Pre, Post} = {List, []},
NewListLength = 0;
- false ->
+ false ->
{Pre, Post} = lists:split(BatchSize, List),
NewListLength = ListLength - BatchSize
end,
@@ -234,7 +244,7 @@ ptransform_spawn(_, [], 0, _, _, Pids) ->
ptransform_collect(Ref, Pids, Acc) when Pids /= [] ->
%% Collect a chunk, and concat results.
- receive
+ receive
{results, Results, Pid, Ref} ->
NewPids = Pids -- [Pid],
NewAcc = Results ++ Acc,
Something went wrong with that request. Please try again.