From 28528ace698e46996487d174a2017ecf1a4b9082 Mon Sep 17 00:00:00 2001 From: Russell Branca Date: Wed, 26 Mar 2014 17:11:26 -0700 Subject: [PATCH 1/3] Switch to using couch_mrview data structures and utility functions This updates the data structures expected for view callbacks, mainly switching total_and_offset to the new meta response, and also slight modifications to the expected row format. Additionally this removes view logic from fabric that is better handled in couch_mrview. --- src/fabric.erl | 42 +++++++++++++++++++++++++----------- src/fabric_group_info.erl | 5 ++--- src/fabric_rpc.erl | 12 ----------- src/fabric_view.erl | 19 +++++++++------- src/fabric_view_all_docs.erl | 9 +++++--- src/fabric_view_map.erl | 11 ++++++---- 6 files changed, 56 insertions(+), 42 deletions(-) diff --git a/src/fabric.erl b/src/fabric.erl index 1f05ed6..5be6f49 100644 --- a/src/fabric.erl +++ b/src/fabric.erl @@ -278,15 +278,31 @@ query_view(DbName, DesignName, ViewName, QueryArgs) -> -spec query_view(dbname(), #doc{} | binary(), iodata(), callback(), any(), #mrargs{}) -> any(). -query_view(DbName, Design, ViewName, Callback, Acc0, QueryArgs) -> +query_view(DbName, GroupId, ViewName, Callback, Acc0, QueryArgs) + when is_binary(GroupId) -> + {ok, DDoc} = ddoc_cache:open(DbName, <<"_design/", GroupId/binary>>), + query_view(DbName, DDoc, ViewName, Callback, Acc0, QueryArgs); +query_view(DbName, DDoc, ViewName, Callback, Acc0, QueryArgs0) -> Db = dbname(DbName), View = name(ViewName), - case is_reduce_view(Db, Design, View, QueryArgs) of - true -> - Mod = fabric_view_reduce; - false -> - Mod = fabric_view_map - end, - Mod:go(Db, Design, View, QueryArgs, Callback, Acc0). + {ok, #mrst{views=Views, language=Lang}} = + couch_mrview_util:ddoc_to_mrst(Db, DDoc), + QueryArgs1 = couch_mrview_util:set_view_type(QueryArgs0, View, Views), + QueryArgs2 = couch_mrview_util:validate_args(QueryArgs1), + VInfo = couch_mrview_util:extract_view(Lang, QueryArgs2, View, Views), + case is_reduce_view(QueryArgs2) of + true -> + fabric_view_reduce:go( + Db, + DDoc, + View, + QueryArgs2, + Callback, + Acc0, + VInfo + ); + false -> + fabric_view_map:go(Db, DDoc, View, QueryArgs2, Callback, Acc0) + end. %% @doc retrieve info about a view group, disk size, language, whether compaction %% is running and so forth @@ -313,9 +329,9 @@ design_docs(DbName) -> end_key = <<"_design0">>, include_docs=true }, - Callback = fun({total_and_offset, _, _}, []) -> + Callback = fun({meta, _}, []) -> {ok, []}; - ({row, {Props}}, Acc) -> + ({row, Props}, Acc) -> case couch_util:get_value(id, Props) of <<"_design/", _/binary>> -> {ok, [couch_util:get_value(doc, Props) | Acc]}; @@ -437,8 +453,10 @@ default_callback(complete, Acc) -> default_callback(Row, Acc) -> {ok, [Row | Acc]}. -is_reduce_view(_, _, _, #mrargs{view_type=Reduce}) -> - Reduce =:= reduce. +is_reduce_view(#mrargs{view_type=ViewType}) -> + ViewType =:= red; +is_reduce_view({Reduce, _, _}) -> + Reduce =:= red. %% @doc convenience method for use in the shell, converts a keylist %% to a `changes_args' record diff --git a/src/fabric_group_info.erl b/src/fabric_group_info.erl index 135090f..5325f76 100644 --- a/src/fabric_group_info.erl +++ b/src/fabric_group_info.erl @@ -22,10 +22,9 @@ go(DbName, GroupId) when is_binary(GroupId) -> {ok, DDoc} = fabric:open_doc(DbName, GroupId, []), go(DbName, DDoc); -go(DbName, #doc{} = DDoc) -> - Group = couch_view_group:design_doc_to_view_group(DDoc), +go(DbName, #doc{id=DDocId}) -> Shards = mem3:shards(DbName), - Workers = fabric_util:submit_jobs(Shards, group_info, [Group]), + Workers = fabric_util:submit_jobs(Shards, group_info, [DDocId]), RexiMon = fabric_util:create_monitors(Shards), Acc0 = {fabric_dict:init(Workers, nil), []}, try diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl index 3f4f8d2..cee3c55 100644 --- a/src/fabric_rpc.erl +++ b/src/fabric_rpc.erl @@ -24,18 +24,6 @@ -include_lib("couch/include/couch_db.hrl"). -include_lib("couch_mrview/include/couch_mrview.hrl"). --record (view_acc, { - db, - limit, - include_docs, - conflicts, - doc_info = nil, - offset = nil, - total_rows, - reduce_fun = fun couch_db:enum_docs_reduce_to_count/1, - group_level = 0 -}). - %% rpc endpoints %% call to with_db will supply your M:F with a #db{} and then remaining args diff --git a/src/fabric_view.erl b/src/fabric_view.erl index d0c6e45..31c25d9 100644 --- a/src/fabric_view.erl +++ b/src/fabric_view.erl @@ -224,7 +224,10 @@ get_next_row(State) -> Counters1 = fabric_dict:update_counter(Worker, -1, Counters0), {Row, State#collector{rows = Rest, counters=Counters1}}. +%% TODO: rectify nil <-> undefined discrepancies find_next_key(nil, Dir, RowDict) -> + find_next_key(undefined, Dir, RowDict); +find_next_key(undefined, Dir, RowDict) -> case lists:sort(sort_fun(Dir), dict:fetch_keys(RowDict)) of [] -> throw(complete); @@ -237,21 +240,21 @@ find_next_key([Key|Rest], _, _) -> {Key, Rest}. transform_row(#view_row{key=Key, id=reduced, value=Value}) -> - {row, {[{key,Key}, {value,Value}]}}; + {row, [{key,Key}, {value,Value}]}; transform_row(#view_row{key=Key, id=undefined}) -> - {row, {[{key,Key}, {error,not_found}]}}; + {row, [{key,Key}, {id,error}, {value,not_found}]}; transform_row(#view_row{key=Key, id=Id, value=Value, doc=undefined}) -> - {row, {[{id,Id}, {key,Key}, {value,Value}]}}; -transform_row(#view_row{key=Key, id=Id, value=Value, doc={error,Reason}}) -> - {row, {[{id,Id}, {key,Key}, {value,Value}, {error,Reason}]}}; + {row, [{id,Id}, {key,Key}, {value,Value}]}; +transform_row(#view_row{key=Key, id=_Id, value=_Value, doc={error,Reason}}) -> + {row, [{id,error}, {key,Key}, {value,Reason}]}; transform_row(#view_row{key=Key, id=Id, value=Value, doc=Doc}) -> - {row, {[{id,Id}, {key,Key}, {value,Value}, {doc,Doc}]}}. + {row, [{id,Id}, {key,Key}, {value,Value}, {doc,Doc}]}. sort_fun(fwd) -> - fun(A,A) -> true; (A,B) -> couch_view:less_json(A,B) end; + fun(A,A) -> true; (A,B) -> couch_ejson_compare:less_json(A,B) end; sort_fun(rev) -> - fun(A,A) -> true; (A,B) -> couch_view:less_json(B,A) end. + fun(A,A) -> true; (A,B) -> couch_ejson_compare:less_json(B,A) end. extract_view(Pid, ViewName, [], _ViewType) -> couch_log:error("missing_named_view ~p", [ViewName]), diff --git a/src/fabric_view_all_docs.erl b/src/fabric_view_all_docs.erl index 1415c82..701afa7 100644 --- a/src/fabric_view_all_docs.erl +++ b/src/fabric_view_all_docs.erl @@ -72,7 +72,7 @@ go(DbName, QueryArgs, Callback, Acc0) -> false -> Keys2 end, receive {'DOWN', Ref0, _, _, {ok, TotalRows}} -> - {ok, Acc1} = Callback({total_and_offset, TotalRows, 0}, Acc0), + {ok, Acc1} = Callback({meta, [{total, TotalRows}]}, Acc0), {ok, Acc2} = doc_receive_loop( Keys3, queue:new(), SpawnFun, MaxJobs, Callback, Acc1 ), @@ -95,7 +95,9 @@ handle_message({rexi_EXIT, Reason}, Worker, State) -> {error, Resp} end; -handle_message({total_and_offset, Tot, Off}, {Worker, From}, State) -> +handle_message({meta, Meta0}, {Worker, From}, State) -> + Tot = couch_util:get_value(total, Meta0, 0), + Off = couch_util:get_value(offset, Meta0, 0), #collector{ callback = Callback, counters = Counters0, @@ -123,7 +125,8 @@ handle_message({total_and_offset, Tot, Off}, {Worker, From}, State) -> }}; false -> FinalOffset = erlang:min(Total, Offset+State#collector.skip), - {Go, Acc} = Callback({total_and_offset, Total, FinalOffset}, AccIn), + Meta = [{total, Total}, {offset, FinalOffset}], + {Go, Acc} = Callback({meta, Meta}, AccIn), {Go, State#collector{ counters = fabric_dict:decrement_all(Counters2), total_rows = Total, diff --git a/src/fabric_view_map.erl b/src/fabric_view_map.erl index 9e41c11..eb30179 100644 --- a/src/fabric_view_map.erl +++ b/src/fabric_view_map.erl @@ -66,7 +66,9 @@ handle_message({rexi_EXIT, Reason}, Worker, State) -> {error, Resp} end; -handle_message({total_and_offset, Tot, Off}, {Worker, From}, State) -> +handle_message({meta, Meta0}, {Worker, From}, State) -> + Tot = couch_util:get_value(total, Meta0, 0), + Off = couch_util:get_value(offset, Meta0, 0), #collector{ callback = Callback, counters = Counters0, @@ -94,7 +96,8 @@ handle_message({total_and_offset, Tot, Off}, {Worker, From}, State) -> }}; false -> FinalOffset = erlang:min(Total, Offset+State#collector.skip), - {Go, Acc} = Callback({total_and_offset, Total, FinalOffset}, AccIn), + Meta = [{total, Total}, {offset, FinalOffset}], + {Go, Acc} = Callback({meta, Meta}, AccIn), {Go, State#collector{ counters = fabric_dict:decrement_all(Counters2), total_rows = Total, @@ -133,11 +136,11 @@ handle_message(complete, Worker, State) -> merge_row(fwd, undefined, Row, Rows) -> lists:merge(fun(#view_row{key=KeyA, id=IdA}, #view_row{key=KeyB, id=IdB}) -> - couch_view:less_json([KeyA, IdA], [KeyB, IdB]) + couch_ejson_compare:less_json_ids({KeyA, IdA}, {KeyB, IdB}) end, [Row], Rows); merge_row(rev, undefined, Row, Rows) -> lists:merge(fun(#view_row{key=KeyA, id=IdA}, #view_row{key=KeyB, id=IdB}) -> - couch_view:less_json([KeyB, IdB], [KeyA, IdA]) + couch_ejson_compare:less_json_ids({KeyB, IdB}, {KeyA, IdA}) end, [Row], Rows); merge_row(_, KeyDict, Row, Rows) -> lists:merge(fun(#view_row{key=A, id=IdA}, #view_row{key=B, id=IdB}) -> From efddaf1d14b19187f31a2c5c19a40061b3635e9a Mon Sep 17 00:00:00 2001 From: Russell Branca Date: Wed, 26 Mar 2014 17:25:40 -0700 Subject: [PATCH 2/3] Update fabric_rpc to use couch_mrview This modifies fabric_rpc to use couch_mrview for map views, reduce views, and all docs queries. This removes the majority of view logic from within fabric as it's better handled now in couch_mrview, and actually provides pretty drastic decrease in logic in these functions. The {view,reduce}_cb functions are also updated to use the new line format of couch_mrview, switching total_and_offset to meta and updating the row callbacks as well. --- src/fabric_rpc.erl | 282 ++++++++----------------------------- src/fabric_view_reduce.erl | 24 +--- 2 files changed, 67 insertions(+), 239 deletions(-) diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl index cee3c55..d2e6486 100644 --- a/src/fabric_rpc.erl +++ b/src/fabric_rpc.erl @@ -27,38 +27,6 @@ %% rpc endpoints %% call to with_db will supply your M:F with a #db{} and then remaining args -all_docs(DbName, #mrargs{keys=undefined} = QueryArgs) -> - {ok, Db} = get_or_create_db(DbName, []), - #mrargs{ - start_key = StartKey, - start_key_docid = StartDocId, - end_key = EndKey, - end_key_docid = EndDocId, - limit = Limit, - skip = Skip, - include_docs = IncludeDocs, - direction = Dir, - inclusive_end = Inclusive, - extra = Extra - } = QueryArgs, - set_io_priority(DbName, Extra), - {ok, Total} = couch_db:get_doc_count(Db), - Acc0 = #view_acc{ - db = Db, - include_docs = IncludeDocs, - conflicts = proplists:get_value(conflicts, Extra, false), - limit = Limit+Skip, - total_rows = Total - }, - EndKeyType = if Inclusive -> end_key; true -> end_key_gt end, - Options = [ - {dir, Dir}, - {start_key, if is_binary(StartKey) -> StartKey; true -> StartDocId end}, - {EndKeyType, if is_binary(EndKey) -> EndKey; true -> EndDocId end} - ], - {ok, _, Acc} = couch_db:enum_docs(Db, fun view_fold/3, Acc0, Options), - final_response(Total, Acc#view_acc.offset). - changes(DbName, #changes_args{} = Args, StartSeq) -> changes(DbName, [Args], StartSeq); changes(DbName, Options, StartSeq) -> @@ -80,104 +48,20 @@ changes(DbName, Options, StartSeq) -> rexi:reply(Error) end. -map_view(DbName, DDoc, ViewName, QueryArgs) -> +all_docs(DbName, #mrargs{keys=undefined} = Args) -> {ok, Db} = get_or_create_db(DbName, []), - #mrargs{ - limit = Limit, - skip = Skip, - keys = Keys, - include_docs = IncludeDocs, - stale = Stale, - view_type = ViewType, - extra = Extra - } = QueryArgs, - set_io_priority(DbName, Extra), - {LastSeq, MinSeq} = calculate_seqs(Db, Stale), - Group0 = couch_view_group:design_doc_to_view_group(DDoc), - {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}), - {ok, Group} = couch_view_group:request_group(Pid, MinSeq), - maybe_update_view_group(Pid, LastSeq, Stale), - erlang:monitor(process, couch_view_group:get_fd(Group)), - Views = couch_view_group:get_views(Group), - View = fabric_view:extract_view(Pid, ViewName, Views, ViewType), - {ok, Total} = couch_view:get_row_count(View), - Acc0 = #view_acc{ - db = Db, - include_docs = IncludeDocs, - conflicts = proplists:get_value(conflicts, Extra, false), - limit = Limit+Skip, - total_rows = Total, - reduce_fun = fun couch_view:reduce_to_count/1 - }, - case Keys of - undefined -> - Options = couch_httpd_view:make_key_options(QueryArgs), - {ok, _, Acc} = couch_view:fold(View, fun view_fold/3, Acc0, Options); - _ -> - Acc = lists:foldl(fun(Key, AccIn) -> - KeyArgs = QueryArgs#mrargs{start_key=Key, end_key=Key}, - Options = couch_httpd_view:make_key_options(KeyArgs), - {_Go, _, Out} = couch_view:fold(View, fun view_fold/3, AccIn, - Options), - Out - end, Acc0, Keys) - end, - final_response(Total, Acc#view_acc.offset). + VAcc0 = #vacc{db=Db}, + couch_mrview:query_all_docs(Db, Args, fun view_cb/2, VAcc0). -reduce_view(DbName, #doc{} = DDoc, ViewName, QueryArgs) -> - Group = couch_view_group:design_doc_to_view_group(DDoc), - reduce_view(DbName, Group, ViewName, QueryArgs); -reduce_view(DbName, Group0, ViewName, QueryArgs) -> - erlang:put(io_priority, {interactive, DbName}), +map_view(DbName, DDoc, ViewName, Args) -> {ok, Db} = get_or_create_db(DbName, []), - #mrargs{ - group_level = GroupLevel, - limit = Limit, - skip = Skip, - keys = Keys, - stale = Stale, - extra = Extra - } = QueryArgs, - set_io_priority(DbName, Extra), - GroupFun = group_rows_fun(GroupLevel), - {LastSeq, MinSeq} = calculate_seqs(Db, Stale), - {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}), - {ok, Group} = couch_view_group:request_group(Pid, MinSeq), - maybe_update_view_group(Pid, LastSeq, Stale), - Lang = couch_view_group:get_language(Group), - Views = couch_view_group:get_views(Group), - erlang:monitor(process, couch_view_group:get_fd(Group)), - {NthRed, View} = fabric_view:extract_view(Pid, ViewName, Views, reduce), - ReduceView = {reduce, NthRed, Lang, View}, - Acc0 = #view_acc{group_level = GroupLevel, limit = Limit+Skip}, - case Keys of - undefined -> - Options0 = couch_httpd_view:make_key_options(QueryArgs), - Options = [{key_group_fun, GroupFun} | Options0], - couch_view:fold_reduce(ReduceView, fun reduce_fold/3, Acc0, Options); - _ -> - lists:map(fun(Key) -> - KeyArgs = QueryArgs#mrargs{start_key=Key, end_key=Key}, - Options0 = couch_httpd_view:make_key_options(KeyArgs), - Options = [{key_group_fun, GroupFun} | Options0], - couch_view:fold_reduce(ReduceView, fun reduce_fold/3, Acc0, Options) - end, Keys) - end, - rexi:reply(complete). + VAcc0 = #vacc{db=Db}, + couch_mrview:query_view(Db, DDoc, ViewName, Args, fun view_cb/2, VAcc0). -calculate_seqs(Db, Stale) -> - LastSeq = couch_db:get_update_seq(Db), - if - Stale == ok orelse Stale == update_after -> - {LastSeq, 0}; - true -> - {LastSeq, LastSeq} - end. - -maybe_update_view_group(GroupPid, LastSeq, update_after) -> - couch_view_group:trigger_group_update(GroupPid, LastSeq); -maybe_update_view_group(_, _, _) -> - ok. +reduce_view(DbName, DDoc, ViewName, Args) -> + {ok, Db} = get_or_create_db(DbName, []), + VAcc0 = #vacc{db=Db}, + couch_mrview:query_view(Db, DDoc, ViewName, Args, fun reduce_cb/2, VAcc0). create_db(DbName) -> rexi:reply(case couch_server:create(DbName, []) of @@ -252,9 +136,8 @@ update_docs(DbName, Docs0, Options) -> Docs = make_att_readers(Docs0), with_db(DbName, Options, {couch_db, update_docs, [Docs, Options, X]}). -group_info(DbName, Group0) -> - {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}), - rexi:reply(couch_view_group:request_group_info(Pid)). +group_info(DbName, DDocId) -> + with_db(DbName, [], {couch_mrview, get_info, [DDocId]}). reset_validation_funs(DbName) -> case get_or_create_db(DbName, []) of @@ -294,109 +177,64 @@ get_or_create_db(DbName, Options) -> Else end. -view_fold(#full_doc_info{} = FullDocInfo, OffsetReds, Acc) -> - % matches for _all_docs and translates #full_doc_info{} -> KV pair - case couch_doc:to_doc_info(FullDocInfo) of - #doc_info{id=Id, revs=[#rev_info{deleted=false, rev=Rev}|_]} = DI -> - Value = {[{rev,couch_doc:rev_to_str(Rev)}]}, - view_fold({{Id,Id}, Value}, OffsetReds, Acc#view_acc{doc_info=DI}); - #doc_info{revs=[#rev_info{deleted=true}|_]} -> - {ok, Acc} - end; -view_fold(KV, OffsetReds, #view_acc{offset=nil, total_rows=Total} = Acc) -> - % calculates the offset for this shard - #view_acc{reduce_fun=Reduce} = Acc, - Offset = Reduce(OffsetReds), - case rexi:sync_reply({total_and_offset, Total, Offset}) of - ok -> - view_fold(KV, OffsetReds, Acc#view_acc{offset=Offset}); - stop -> - exit(normal); - timeout -> - exit(timeout) + +view_cb({meta, Meta}, Acc) -> + % Map function starting + case rexi:sync_reply({meta, Meta}) of + ok -> + {ok, Acc}; + stop -> + exit(normal); + timeout -> + exit(timeout) end; -view_fold(_KV, _Offset, #view_acc{limit=0} = Acc) -> - % we scanned through limit+skip local rows - {stop, Acc}; -view_fold({{Key,Id}, Value}, _Offset, Acc) -> - % the normal case - #view_acc{ - db = Db, - doc_info = DocInfo, - limit = Limit, - conflicts = Conflicts, - include_docs = IncludeDocs - } = Acc, - case Value of {Props} -> - LinkedDocs = (couch_util:get_value(<<"_id">>, Props) =/= undefined); - _ -> - LinkedDocs = false - end, - if LinkedDocs -> - % we'll embed this at a higher level b/c the doc may be non-local - Doc = undefined; - IncludeDocs -> - IdOrInfo = if DocInfo =/= nil -> DocInfo; true -> Id end, - Options = if Conflicts -> [conflicts]; true -> [] end, - case couch_db:open_doc(Db, IdOrInfo, Options) of - {not_found, deleted} -> - Doc = null; - {not_found, missing} -> - Doc = undefined; - {ok, Doc0} -> - Doc = couch_doc:to_json_obj(Doc0, []) - end; - true -> - Doc = undefined - end, - case rexi:stream(#view_row{key=Key, id=Id, value=Value, doc=Doc}) of +view_cb({row, Row}, Acc) -> + % Adding another row + ViewRow = #view_row{ + id = couch_util:get_value(id, Row), + key = couch_util:get_value(key, Row), + value = couch_util:get_value(value, Row), + doc = couch_util:get_value(doc, Row) + }, + case rexi:stream(ViewRow) of ok -> - {ok, Acc#view_acc{limit=Limit-1}}; + {ok, Acc}; timeout -> exit(timeout) - end. - -final_response(Total, nil) -> - case rexi:sync_reply({total_and_offset, Total, Total}) of ok -> - rexi:reply(complete); - stop -> - ok; - timeout -> - exit(timeout) end; -final_response(_Total, _Offset) -> - rexi:reply(complete). - -%% TODO: handle case of bogus group level -group_rows_fun(exact) -> - fun({Key1,_}, {Key2,_}) -> Key1 == Key2 end; -group_rows_fun(0) -> - fun(_A, _B) -> true end; -group_rows_fun(GroupLevel) when is_integer(GroupLevel) -> - fun({[_|_] = Key1,_}, {[_|_] = Key2,_}) -> - lists:sublist(Key1, GroupLevel) == lists:sublist(Key2, GroupLevel); - ({Key1,_}, {Key2,_}) -> - Key1 == Key2 - end. +view_cb(complete, Acc) -> + % Finish view output + rexi:reply(complete), + {ok, Acc}. -reduce_fold(_Key, _Red, #view_acc{limit=0} = Acc) -> - {stop, Acc}; -reduce_fold(_Key, Red, #view_acc{group_level=0} = Acc) -> - send(null, Red, Acc); -reduce_fold(Key, Red, #view_acc{group_level=exact} = Acc) -> - send(Key, Red, Acc); -reduce_fold(K, Red, #view_acc{group_level=I} = Acc) when I > 0, is_list(K) -> - send(lists:sublist(K, I), Red, Acc); -reduce_fold(K, Red, #view_acc{group_level=I} = Acc) when I > 0 -> - send(K, Red, Acc). - -send(Key, Value, #view_acc{limit=Limit} = Acc) -> +reduce_cb({meta, Meta}, Acc) -> + % Map function starting + case rexi:sync_reply({meta, Meta}) of + ok -> + {ok, Acc}; + stop -> + exit(normal); + timeout -> + exit(timeout) + end; +reduce_cb({row, Row}, Acc) -> + % Adding another row + Key = couch_util:get_value(key, Row), + Value = couch_util:get_value(value, Row), + send(Key, Value, Acc); +reduce_cb(complete, Acc) -> + % Finish view output + rexi:reply(complete), + {ok, Acc}. + + +send(Key, Value, Acc) -> case put(fabric_sent_first_row, true) of undefined -> case rexi:sync_reply(#view_row{key=Key, value=Value}) of ok -> - {ok, Acc#view_acc{limit=Limit-1}}; + {ok, Acc}; stop -> exit(normal); timeout -> @@ -405,7 +243,7 @@ send(Key, Value, #view_acc{limit=Limit} = Acc) -> true -> case rexi:stream(#view_row{key=Key, value=Value}) of ok -> - {ok, Acc#view_acc{limit=Limit-1}}; + {ok, Acc}; timeout -> exit(timeout) end diff --git a/src/fabric_view_reduce.erl b/src/fabric_view_reduce.erl index c922a7f..d2ea464 100644 --- a/src/fabric_view_reduce.erl +++ b/src/fabric_view_reduce.erl @@ -12,29 +12,19 @@ -module(fabric_view_reduce). --export([go/6]). +-export([go/7]). -include_lib("fabric/include/fabric.hrl"). -include_lib("mem3/include/mem3.hrl"). -include_lib("couch/include/couch_db.hrl"). -include_lib("couch_mrview/include/couch_mrview.hrl"). -go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) -> - {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []), - go(DbName, DDoc, View, Args, Callback, Acc0); - -go(DbName, DDoc, VName, Args, Callback, Acc0) -> - Group = couch_view_group:design_doc_to_view_group(DDoc), - Lang = couch_view_group:get_language(Group), - Views = couch_view_group:get_views(Group), - {NthRed, View} = fabric_view:extract_view(nil, VName, Views, reduce), - {VName, RedSrc} = lists:nth(NthRed, View#mrview.reduce_funs), - Workers = lists:map(fun(#shard{name=Name, node=N} = Shard) -> - Ref = rexi:cast(N, {fabric_rpc, reduce_view, [Name,DDoc,VName,Args]}), - Shard#shard{ref = Ref} - end, fabric_view:get_shards(DbName, Args)), +go(DbName, DDoc, VName, Args, Callback, Acc0, {red, {_, Lang, _}, _}=VInfo) -> + Shards = fabric_view:get_shards(DbName, Args), + Workers = fabric_util:submit_jobs(Shards, reduce_view, [DDoc, VName, Args]), + RedSrc = couch_mrview_util:extract_view_reduce(VInfo), RexiMon = fabric_util:create_monitors(Workers), - #mrargs{limit = Limit, skip = Skip} = Args, + #mrargs{limit = Limit, skip = Skip, keys = Keys} = Args, OsProc = case os_proc_needed(RedSrc) of true -> couch_query_servers:get_os_process(Lang); _ -> nil @@ -44,7 +34,7 @@ go(DbName, DDoc, VName, Args, Callback, Acc0) -> query_args = Args, callback = Callback, counters = fabric_dict:init(Workers, 0), - keys = Args#mrargs.keys, + keys = Keys, skip = Skip, limit = Limit, lang = Lang, From aeac2dfe5014dc9fe1f10300669ea9244cae3a67 Mon Sep 17 00:00:00 2001 From: Russell Branca Date: Thu, 27 Mar 2014 15:48:16 -0700 Subject: [PATCH 3/3] Call fabric view reduce callback with meta information This is a temporary hack to allow passing meta through fabric_view_reduce:handle_message. The meta rows were not present on reduce functions in the previous views implementation, but they provide an excellent place to determine which a view is about to start streaming, which is essential for the multi query view implementation. The hack is that we don't do the proper row collection worker dance like we do in the handle_message callback for #view_row. For now this sets a process dict flag to indicate meta has been sent so that we only send it once. The proper fix is to send meta through fabric_view:maybe_send_row, but that gets ugly in a hurry and can be addressed separately. COUCHDB-523 COUCHDB-1993 --- src/fabric_view.erl | 2 ++ src/fabric_view_reduce.erl | 24 ++++++++++++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/src/fabric_view.erl b/src/fabric_view.erl index 31c25d9..09e13f2 100644 --- a/src/fabric_view.erl +++ b/src/fabric_view.erl @@ -98,6 +98,7 @@ maybe_send_row(#collector{limit=0} = State) -> % we still need to send the total/offset header {ok, State}; false -> + erase(meta_sent), {_, Acc} = Callback(complete, AccIn), {stop, State#collector{user_acc=Acc}} end; @@ -124,6 +125,7 @@ maybe_send_row(State) -> maybe_send_row(NewState#collector{user_acc=Acc, limit=Limit-1}) end catch complete -> + erase(meta_sent), {_, Acc} = Callback(complete, AccIn), {stop, State#collector{user_acc=Acc}} end diff --git a/src/fabric_view_reduce.erl b/src/fabric_view_reduce.erl index d2ea464..6b40a6a 100644 --- a/src/fabric_view_reduce.erl +++ b/src/fabric_view_reduce.erl @@ -74,6 +74,30 @@ handle_message({rexi_EXIT, Reason}, Worker, State) -> {error, Resp} end; +%% HACK: this just sends meta once. Instead we should move the counter logic +%% from the #view_row handle_message below into this function and and pass the +%% meta call through maybe_send_row. This will also be more efficient doing it +%% here as it's one less worker round trip reply. +%% Prior to switching to couch_mrview, the fabric_view_reduce implementation +%% did not get a total_and_offset call, whereas now we do. We now use this +%% message as a clean way to indicate to couch_mrview_http:view_cb that the +%% reduce response is starting. +handle_message({meta, Meta}, {_Worker, From}, State) -> + gen_server:reply(From, ok), + #collector{ + callback = Callback, + user_acc = AccIn + } = State, + + {Go, Acc} = case get(meta_sent) of + undefined -> + put(meta_sent, true), + Callback({meta, Meta}, AccIn); + _ -> + {ok, AccIn} + end, + {Go, State#collector{user_acc = Acc}}; + handle_message(#view_row{key=Key} = Row, {Worker, From}, State) -> #collector{counters = Counters0, rows = Rows0} = State, case fabric_dict:lookup_element(Worker, Counters0) of