Skip to content
Browse files

First pass at sorted 2i results (PROOF OF CONCEPT)

  • Loading branch information...
1 parent 0ce405d commit d375a23982fbe3d9f3cc868642564d67a62150c6 @reiddraper reiddraper committed
Showing with 77 additions and 18 deletions.
  1. +1 −1 rebar.config
  2. +25 −17 src/riak_kv_index_fsm.erl
  3. +51 −0 src/sms.erl
View
2 rebar.config
@@ -9,7 +9,7 @@
]}.
{deps, [
- {riak_core, ".*", {git, "git://github.com/basho/riak_core", "master"}},
+ {riak_core, ".*", {git, "git://github.com/basho/riak_core", "wrd-sorted-2i"}},
{riakc, ".*", {git, "git://github.com/basho/riak-erlang-client",
"master"}},
{luke, ".*", {git, "git://github.com/basho/luke", "master"}},
View
42 src/riak_kv_index_fsm.erl
@@ -40,6 +40,7 @@
-include_lib("riak_kv_vnode.hrl").
-export([init/2,
+ plan/2,
process_results/2,
finish/2]).
@@ -47,6 +48,7 @@
-type req_id() :: non_neg_integer().
-record(state, {client_type :: plain | mapred,
+ merge_sort_buffer :: term(),
from :: from()}).
%% @doc Return a tuple containing the ModFun to call per vnode,
@@ -71,13 +73,29 @@ init(From={_, _, ClientPid}, [Bucket, ItemFilter, Query, Timeout, ClientType]) -
{Req, all, NVal, 1, riak_kv, riak_kv_vnode_master, Timeout,
#state{client_type=ClientType, from=From}}.
+plan(CoverageVnodes, State) ->
+ State2 = State#state{merge_sort_buffer=sms:new(CoverageVnodes)},
+ {ok, State2}.
+
process_results({error, Reason}, _State) ->
{error, Reason};
-process_results({Bucket, Results},
- StateData=#state{client_type=ClientType,
+process_results({Vnode, {_Bucket, Results}},
+ StateData=#state{client_type=_ClientType,
+ merge_sort_buffer=MergeSortBuffer,
from={raw, ReqId, ClientPid}}) ->
- process_query_results(ClientType, Bucket, Results, ReqId, ClientPid),
- {ok, StateData};
+ %% TODO: this isn't compatible with mapreduce
+
+ %% add new results to buffer
+ BufferWithNewResults = sms:add_results(Vnode, Results, MergeSortBuffer),
+ ProcessBuffer = sms:sms(BufferWithNewResults),
+ NewBuffer = case ProcessBuffer of
+ {[], BufferWithNewResults} ->
+ BufferWithNewResults;
+ {ToSend, NewBuff} ->
+ ClientPid ! {ReqId, {results, ToSend}},
+ NewBuff
+ end,
+ {ok, StateData#state{merge_sort_buffer=NewBuffer}};
process_results(done, StateData) ->
{done, StateData}.
@@ -98,24 +116,14 @@ finish({error, Error},
{stop, normal, StateData};
finish(clean,
StateData=#state{from={raw, ReqId, ClientPid},
+ merge_sort_buffer=MergeSortBuffer,
client_type=ClientType}) ->
case ClientType of
mapred ->
luke_flow:finish_inputs(ClientPid);
plain ->
+ LastResults = sms:done(MergeSortBuffer),
+ ClientPid ! {ReqId, {results, LastResults}},
ClientPid ! {ReqId, done}
end,
{stop, normal, StateData}.
-
-%% ===================================================================
-%% Internal functions
-%% ===================================================================
-
-process_query_results(plain, _Bucket, Results, ReqId, ClientPid) ->
- ClientPid ! {ReqId, {results, Results}};
-process_query_results(mapred, Bucket, Results, _ReqId, ClientPid) ->
- try
- luke_flow:add_inputs(ClientPid, [{Bucket, Result} || Result <- Results])
- catch _:_ ->
- exit(self(), normal)
- end.
View
51 src/sms.erl
@@ -0,0 +1,51 @@
+-module(sms).
+
+-define(DICTMODULE, orddict).
+
+-export([new/1,
+ add_results/3,
+ done/1,
+ sms/1]).
+
+new(Vnodes) ->
+ DictList = [{VnodeID, []} || VnodeID <- Vnodes],
+ ?DICTMODULE:from_list(DictList).
+
+add_results(VnodeID, Results, Data) ->
+ UpdateFun = fun (Prev) -> Prev ++ Results end,
+ update(VnodeID, UpdateFun, Data).
+
+update(VnodeID, UpdateFun, Data) ->
+ ?DICTMODULE:update(VnodeID, UpdateFun, Data).
+
+done(Data) ->
+ lists:merge(values(Data)).
+
+sms(Data) ->
+ case any_empty(values(Data)) of
+ true ->
+ {[], Data};
+ false ->
+ unsafe_sms(Data)
+ end.
+
+unsafe_sms(Data) ->
+ MinOfLastsOfLists = lists:min([lists:last(List) || List <- values(Data)]),
+ SplitFun = fun (Elem) -> Elem =< MinOfLastsOfLists end,
+ Split = ?DICTMODULE:map(fun (_Key, V) -> lists:splitwith(SplitFun, V) end, Data),
+ %%SplitLists = [lists:splitwith(SplitFun, List) || List <- ListOfLists],
+ LessThan = ?DICTMODULE:map(fun (_Key, V) -> element(1, V) end, Split),
+ %%LessThan = [element(1, Tuple) || Tuple <- SplitLists],
+ GreaterThan = ?DICTMODULE:map(fun (_Key, V) -> element(2, V) end, Split),
+ %%GreaterThan = [element(2, Tuple) || Tuple <- SplitLists],
+ Merged = lists:merge(values(LessThan)),
+ {Merged, GreaterThan}.
+
+values(Data) ->
+ [V || {_Key, V} <- ?DICTMODULE:to_list(Data)].
+
+empty([]) -> true;
+empty(_) -> false.
+
+any_empty(Lists) ->
+ lists:any(fun empty/1, Lists).

0 comments on commit d375a23

Please sign in to comment.
Something went wrong with that request. Please try again.