-
Notifications
You must be signed in to change notification settings - Fork 51
/
riak_search_op_range_worker.erl
66 lines (57 loc) · 2.31 KB
/
riak_search_op_range_worker.erl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2012 Basho Technologies, Inc. All Rights Reserved.
%%
%% -------------------------------------------------------------------
-module(riak_search_op_range_worker).
-export([
chain_op/4,
chain_op/5
]).
-include("riak_search.hrl").
-include_lib("lucene_parser/include/lucene_parser.hrl").
chain_op(Op, OutputPid, OutputRef, State) ->
F = fun() ->
erlang:link(State#search_state.parent),
start_loop(Op, OutputPid, OutputRef, none, State)
end,
erlang:spawn_link(F),
{ok, 1}.
chain_op(Op, OutputPid, OutputRef, CandidateSet, State) ->
F = fun() ->
erlang:link(State#search_state.parent),
start_loop(Op, OutputPid, OutputRef, CandidateSet, State)
end,
erlang:spawn_link(F),
{ok, 1}.
start_loop(Op, OutputPid, OutputRef, CandidateSet, State) ->
%% Start streaming the results...
IndexName = State#search_state.index,
FieldName = State#search_state.field,
%% Create the start term and end term...
case Op#range_worker.from of
{inclusive, OldStartTerm} ->
StartTerm = OldStartTerm;
{exclusive, OldStartTerm} ->
StartTerm = riak_search_utils:binary_inc(OldStartTerm, +1)
end,
case Op#range_worker.to of
{inclusive, OldEndTerm} ->
EndTerm = OldEndTerm;
{exclusive, OldEndTerm} ->
EndTerm = riak_search_utils:binary_inc(OldEndTerm, -1)
end,
Size = Op#range_worker.size,
VNode = Op#range_worker.vnode,
Filter = riak_search_op_utils:wrap_filter(CandidateSet,
State#search_state.filter),
TransformFun = fun({DocID, Props}) ->
{IndexName, DocID, Props}
end,
{ok, Ref} = range(VNode, IndexName, FieldName, StartTerm, EndTerm, Size, Filter),
riak_search_op_utils:gather_stream_results(Ref, OutputPid, OutputRef, TransformFun).
range(VNode, Index, Field, StartTerm, EndTerm, Size, Filter) ->
riak_search_vnode:range(VNode, Index, Field,
riak_search_utils:to_binary(StartTerm),
riak_search_utils:to_binary(EndTerm),
Size, Filter, self()).