Skip to content

Commit

Permalink
Fix _local_docs end-point
Browse files Browse the repository at this point in the history
This is a second attempt to fix _local_docs end-point. The previous one didn't
work on big enough btree_local, because local btree doesn't have reduction fun,
so reuse of couch_db_updater:btree_by_id_reduce/2 was crashing on a bad match
as soon as btree_local was getting kp_node. Also using full fold to calculate
total_rows value turned out to be resource expensive when a database have
significant number of local documents.

This fix avoids calculating of total_rows and offset instead always setting
them to null and also setting to null update_seq when requested, since it
doesn't have meaning in context of local documents.

A fabric module fabric_view_all_docs.erl was copied and modified as
fabric_view_local_docs.erl, because re-using it for processing of both types of
the documents was getting rather convoluted.

Jira: COUCHDB-3337
  • Loading branch information
eiri authored and nickva committed Jul 17, 2017
1 parent 7bee63f commit a29df57
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 47 deletions.
34 changes: 26 additions & 8 deletions src/couch_mrview/src/couch_mrview.erl
Original file line number Diff line number Diff line change
Expand Up @@ -403,33 +403,35 @@ cleanup(Db) ->


all_docs_fold(Db, #mrargs{keys=undefined}=Args, Callback, UAcc) ->
ReduceFun = get_reduce_fun(Args),
Total = get_total_rows(Db, Args),
UpdateSeq = couch_db:get_update_seq(Db),
UpdateSeq = get_update_seq(Db, Args),
Acc = #mracc{
db=Db,
total_rows=Total,
limit=Args#mrargs.limit,
skip=Args#mrargs.skip,
callback=Callback,
user_acc=UAcc,
reduce_fun=fun couch_mrview_util:all_docs_reduce_to_count/1,
reduce_fun=ReduceFun,
update_seq=UpdateSeq,
args=Args
},
[Opts] = couch_mrview_util:all_docs_key_opts(Args),
{ok, Offset, FinalAcc} = couch_db:enum_docs(Db, fun map_fold/3, Acc, Opts),
finish_fold(FinalAcc, [{total, Total}, {offset, Offset}]);
all_docs_fold(Db, #mrargs{direction=Dir, keys=Keys0}=Args, Callback, UAcc) ->
ReduceFun = get_reduce_fun(Args),
Total = get_total_rows(Db, Args),
UpdateSeq = couch_db:get_update_seq(Db),
UpdateSeq = get_update_seq(Db, Args),
Acc = #mracc{
db=Db,
total_rows=Total,
limit=Args#mrargs.limit,
skip=Args#mrargs.skip,
callback=Callback,
user_acc=UAcc,
reduce_fun=fun couch_mrview_util:all_docs_reduce_to_count/1,
reduce_fun=ReduceFun,
update_seq=UpdateSeq,
args=Args
},
Expand Down Expand Up @@ -653,18 +655,34 @@ make_meta(Args, UpdateSeq, Base) ->
end.


get_total_rows(#db{local_tree = LocalTree} = Db, #mrargs{extra = Extra}) ->
get_reduce_fun(#mrargs{extra = Extra}) ->
case couch_util:get_value(namespace, Extra) of
<<"_local">> ->
FoldFun = fun(_, _, Acc) -> {ok, Acc + 1} end,
{ok, _, Total} = couch_btree:foldl(LocalTree, FoldFun, 0),
Total;
fun(_) -> null end;
_ ->
fun couch_mrview_util:all_docs_reduce_to_count/1
end.


get_total_rows(#db{} = Db, #mrargs{extra = Extra}) ->
case couch_util:get_value(namespace, Extra) of
<<"_local">> ->
null;
_ ->
{ok, Info} = couch_db:get_db_info(Db),
couch_util:get_value(doc_count, Info)
end.


get_update_seq(#db{} = Db, #mrargs{extra = Extra}) ->
case couch_util:get_value(namespace, Extra) of
<<"_local">> ->
null;
_ ->
couch_db:get_update_seq(Db)
end.


default_cb(complete, Acc) ->
{ok, lists:reverse(Acc)};
default_cb({final, Info}, []) ->
Expand Down
2 changes: 2 additions & 0 deletions src/couch_mrview/src/couch_mrview_http.erl
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,8 @@ view_cb({meta, Meta}, #vacc{meta_sent=false, row_sent=false}=Acc) ->
Offset -> [io_lib:format("\"offset\":~p", [Offset])]
end ++ case couch_util:get_value(update_seq, Meta) of
undefined -> [];
null ->
["\"update_seq\":null"];
UpdateSeq when is_integer(UpdateSeq) ->
[io_lib:format("\"update_seq\":~B", [UpdateSeq])];
UpdateSeq when is_binary(UpdateSeq) ->
Expand Down
13 changes: 1 addition & 12 deletions src/couch_mrview/src/couch_mrview_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,7 @@ get_row_count(#mrview{btree=Bt}) ->
{ok, Count}.


all_docs_reduce_to_count(Reductions0) ->
Reductions = maybe_convert_reductions(Reductions0),
all_docs_reduce_to_count(Reductions) ->
Reduce = fun couch_db_updater:btree_by_id_reduce/2,
{Count, _, _} = couch_btree:final_reduce(Reduce, Reductions),
Count.
Expand Down Expand Up @@ -866,16 +865,6 @@ maybe_load_doc(Db, Id, Val, #mrargs{doc_options=Opts}) ->
doc_row(couch_index_util:load_doc(Db, docid_rev(Id, Val), []), Opts).


maybe_convert_reductions({KVs0, UserReductions}) ->
KVs = lists:map(fun maybe_convert_kv/1, KVs0),
{KVs, UserReductions}.

maybe_convert_kv({<<"_local/", _/binary>> = DocId, _}) ->
#full_doc_info{id = DocId};
maybe_convert_kv(DocInfo) ->
DocInfo.


doc_row(null, _Opts) ->
[{doc, null}];
doc_row(Doc, Opts) ->
Expand Down
24 changes: 18 additions & 6 deletions src/couch_mrview/test/couch_mrview_local_docs_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ all_docs_test_() ->
fun should_query_with_range/1,
fun should_query_with_range_rev/1,
fun should_query_with_limit_and_skip/1,
fun should_query_with_include_docs/1
fun should_query_with_include_docs/1,
fun should_query_with_update_seq/1
]
}
}
Expand All @@ -53,7 +54,7 @@ all_docs_test_() ->
should_query(Db) ->
Result = run_query(Db, []),
Expect = {ok, [
{meta, [{total, 10}, {offset, 0}]},
{meta, [{total, null}, {offset, null}]},
mk_row(1),
mk_row(10),
mk_row(2),
Expand All @@ -73,7 +74,7 @@ should_query_with_range(Db) ->
{end_key, <<"_local/5">>}
]),
Expect = {ok, [
{meta, [{total, 10}, {offset, 3}]},
{meta, [{total, null}, {offset, null}]},
mk_row(3),
mk_row(4),
mk_row(5)
Expand All @@ -87,7 +88,7 @@ should_query_with_range_rev(Db) ->
{inclusive_end, true}
]),
Expect = {ok, [
{meta, [{total, 10}, {offset, 4}]},
{meta, [{total, null}, {offset, null}]},
mk_row(5),
mk_row(4),
mk_row(3)
Expand All @@ -101,7 +102,7 @@ should_query_with_limit_and_skip(Db) ->
{skip, 3}
]),
Expect = {ok, [
{meta, [{total, 10}, {offset, 5}]},
{meta, [{total, null}, {offset, null}]},
mk_row(5),
mk_row(6),
mk_row(7)
Expand All @@ -117,11 +118,22 @@ should_query_with_include_docs(Db) ->
{row, Doc0} = mk_row(8),
Doc = Doc0 ++ [{doc, {[{<<"val">>, 8}]}}],
Expect = {ok, [
{meta, [{total, 10}, {offset, 8}]},
{meta, [{total, null}, {offset, null}]},
{row, Doc}
]},
?_assertEqual(Expect, Result).

should_query_with_update_seq(Db) ->
Result = run_query(Db, [
{start_key, <<"_local/2">>},
{limit, 1},
{update_seq, true}
]),
Expect = {ok, [
{meta, [{total, null}, {offset, null}, {update_seq, null}]},
mk_row(2)
]},
?_assertEqual(Expect, Result).

mk_row(IntId) ->
Id = list_to_binary(io_lib:format("_local/~b", [IntId])),
Expand Down
59 changes: 38 additions & 21 deletions src/fabric/src/fabric_view_all_docs.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ go(DbName, Options, QueryArgs, Callback, Acc0) ->
limit = Limit,
conflicts = Conflicts,
skip = Skip,
keys = Keys0
keys = Keys0,
extra = Extra
} = QueryArgs,
{_, Ref0} = spawn_monitor(fun() -> exit(fabric:get_doc_count(DbName)) end),
DocOptions1 = case Conflicts of
true -> [conflicts|DocOptions0];
_ -> DocOptions0
Expand All @@ -81,20 +81,31 @@ go(DbName, Options, QueryArgs, Callback, Acc0) ->
true -> lists:sublist(Keys2, Limit);
false -> Keys2
end,
Timeout = fabric_util:all_docs_timeout(),
receive {'DOWN', Ref0, _, _, Result} ->
case Result of
{ok, TotalRows} ->
{ok, Acc1} = Callback({meta, [{total, TotalRows}]}, Acc0),
{ok, Acc2} = doc_receive_loop(
Keys3, queue:new(), SpawnFun, MaxJobs, Callback, Acc1
),
Callback(complete, Acc2);
Error ->
Callback({error, Error}, Acc0)
end
after Timeout ->
Callback(timeout, Acc0)
Resp = case couch_util:get_value(namespace, Extra, <<"_all_docs">>) of
<<"_local">> ->
{ok, null};
_ ->
Timeout = fabric_util:all_docs_timeout(),
{_, Ref0} = spawn_monitor(fun() ->
exit(fabric:get_doc_count(DbName))
end),
receive {'DOWN', Ref0, _, _, Result} ->
Result
after Timeout ->
timeout
end
end,
case Resp of
{ok, TotalRows} ->
{ok, Acc1} = Callback({meta, [{total, TotalRows}]}, Acc0),
{ok, Acc2} = doc_receive_loop(
Keys3, queue:new(), SpawnFun, MaxJobs, Callback, Acc1
),
Callback(complete, Acc2);
timeout ->
Callback(timeout, Acc0);
Error ->
Callback({error, Error}, Acc0)
end.

go(DbName, _Options, Workers, QueryArgs, Callback, Acc0) ->
Expand Down Expand Up @@ -142,10 +153,11 @@ handle_message({meta, Meta0}, {Worker, From}, State) ->
0 = fabric_dict:lookup_element(Worker, Counters0),
rexi:stream_ack(From),
Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
Total = Total0 + Tot,
Offset = Offset0 + Off,
UpdateSeq = case UpdateSeq0 of
nil -> nil;
Total = if Tot == null -> null; true -> Total0 + Tot end,
Offset = if Off == null -> null; true -> Offset0 + Off end,
UpdateSeq = case {UpdateSeq0, Seq} of
{nil, _} -> nil;
{_, null} -> null;
_ -> [{Worker, Seq} | UpdateSeq0]
end,
case fabric_dict:any(0, Counters1) of
Expand All @@ -157,11 +169,16 @@ handle_message({meta, Meta0}, {Worker, From}, State) ->
offset = Offset
}};
false ->
FinalOffset = erlang:min(Total, Offset+State#collector.skip),
FinalOffset = case Offset of
null -> null;
_ -> erlang:min(Total, Offset+State#collector.skip)
end,
Meta = [{total, Total}, {offset, FinalOffset}] ++
case UpdateSeq of
nil ->
[];
null ->
[{update_seq, null}];
_ ->
[{update_seq, fabric_view_changes:pack_seqs(UpdateSeq)}]
end,
Expand Down

0 comments on commit a29df57

Please sign in to comment.