From fcb2882dfb12c36b1be3e9cda6b1328c324e667e Mon Sep 17 00:00:00 2001 From: benoitc Date: Sun, 2 Feb 2014 19:54:01 +0100 Subject: [PATCH 01/11] extract couch_httpd changes API in its own module --- src/couch_httpd_changes.erl | 174 ++++++++++++++++++++++++++++++++++++ 1 file changed, 174 insertions(+) create mode 100644 src/couch_httpd_changes.erl diff --git a/src/couch_httpd_changes.erl b/src/couch_httpd_changes.erl new file mode 100644 index 00000000..1e431e9c --- /dev/null +++ b/src/couch_httpd_changes.erl @@ -0,0 +1,174 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_httpd_changes). + +-export([handle_changes_req/2]). + +-include_lib("couch/include/couch_db.hrl"). + +handle_changes_req(#httpd{method='POST'}=Req, Db) -> + couch_httpd:validate_ctype(Req, "application/json"), + handle_changes_req1(Req, Db); +handle_changes_req(#httpd{method='GET'}=Req, Db) -> + handle_changes_req1(Req, Db); +handle_changes_req(#httpd{path_parts=[_,<<"_changes">>]}=Req, _Db) -> + couch_httpd:send_method_not_allowed(Req, "GET,HEAD,POST"). + +handle_changes_req1(Req, #db{name=DbName}=Db) -> + AuthDbName = ?l2b(couch_config:get("couch_httpd_auth", "authentication_db")), + case AuthDbName of + DbName -> + % in the authentication database, _changes is admin-only. + ok = couch_db:check_is_admin(Db); + _Else -> + % on other databases, _changes is free for all. + ok + end, + handle_changes_req2(Req, Db). + +handle_changes_req2(Req, Db) -> + MakeCallback = fun(Resp) -> + fun({change, {ChangeProp}=Change, _}, "eventsource") -> + Seq = proplists:get_value(<<"seq">>, ChangeProp), + couch_httpd:send_chunk(Resp, ["data: ", ?JSON_ENCODE(Change), + "\n", "id: ", ?JSON_ENCODE(Seq), + "\n\n"]); + ({change, Change, _}, "continuous") -> + couch_httpd:send_chunk(Resp, [?JSON_ENCODE(Change) | "\n"]); + ({change, Change, Prepend}, _) -> + couch_httpd:send_chunk(Resp, [Prepend, ?JSON_ENCODE(Change)]); + (start, "eventsource") -> + ok; + (start, "continuous") -> + ok; + (start, _) -> + couch_httpd:send_chunk(Resp, "{\"results\":[\n"); + ({stop, _EndSeq}, "eventsource") -> + couch_httpd:end_json_response(Resp); + ({stop, EndSeq}, "continuous") -> + couch_httpd:send_chunk( + Resp, + [?JSON_ENCODE({[{<<"last_seq">>, EndSeq}]}) | "\n"] + ), + couch_httpd:end_json_response(Resp); + ({stop, EndSeq}, _) -> + couch_httpd:send_chunk( + Resp, + io_lib:format("\n],\n\"last_seq\":~w}\n", [EndSeq]) + ), + couch_httpd:end_json_response(Resp); + (timeout, _) -> + couch_httpd:send_chunk(Resp, "\n") + end + end, + ChangesArgs = parse_changes_query(Req, Db), + ChangesFun = couch_changes:handle_changes(ChangesArgs, Req, Db), + WrapperFun = case ChangesArgs#changes_args.feed of + "normal" -> + {ok, Info} = couch_db:get_db_info(Db), + CurrentEtag = couch_httpd:make_etag(Info), + fun(FeedChangesFun) -> + couch_httpd:etag_respond( + Req, + CurrentEtag, + fun() -> + {ok, Resp} = couch_httpd:start_json_response( + Req, 200, [{"ETag", CurrentEtag}] + ), + FeedChangesFun(MakeCallback(Resp)) + end + ) + end; + "eventsource" -> + Headers = [ + {"Content-Type", "text/event-stream"}, + {"Cache-Control", "no-cache"} + ], + {ok, Resp} = couch_httpd:start_chunked_response(Req, 200, Headers), + fun(FeedChangesFun) -> + FeedChangesFun(MakeCallback(Resp)) + end; + _ -> + % "longpoll" or "continuous" + {ok, Resp} = couch_httpd:start_json_response(Req, 200), + fun(FeedChangesFun) -> + FeedChangesFun(MakeCallback(Resp)) + end + end, + couch_stats_collector:increment( + {httpd, clients_requesting_changes} + ), + try + WrapperFun(ChangesFun) + after + couch_stats_collector:decrement( + {httpd, clients_requesting_changes} + ) + end. + +parse_changes_query(Req, Db) -> + ChangesArgs = lists:foldl(fun({Key, Value}, Args) -> + case {string:to_lower(Key), Value} of + {"feed", _} -> + Args#changes_args{feed=Value}; + {"descending", "true"} -> + Args#changes_args{dir=rev}; + {"since", "now"} -> + UpdateSeq = couch_util:with_db(Db#db.name, fun(WDb) -> + couch_db:get_update_seq(WDb) + end), + Args#changes_args{since=UpdateSeq}; + {"since", _} -> + Args#changes_args{since=list_to_integer(Value)}; + {"last-event-id", _} -> + Args#changes_args{since=list_to_integer(Value)}; + {"limit", _} -> + Args#changes_args{limit=list_to_integer(Value)}; + {"style", _} -> + Args#changes_args{style=list_to_existing_atom(Value)}; + {"heartbeat", "true"} -> + Args#changes_args{heartbeat=true}; + {"heartbeat", _} -> + Args#changes_args{heartbeat=list_to_integer(Value)}; + {"timeout", _} -> + Args#changes_args{timeout=list_to_integer(Value)}; + {"include_docs", "true"} -> + Args#changes_args{include_docs=true}; + {"attachments", "true"} -> + Opts = Args#changes_args.doc_options, + Args#changes_args{doc_options=[attachments|Opts]}; + {"att_encoding_info", "true"} -> + Opts = Args#changes_args.doc_options, + Args#changes_args{doc_options=[att_encoding_info|Opts]}; + {"conflicts", "true"} -> + Args#changes_args{conflicts=true}; + {"filter", _} -> + Args#changes_args{filter=Value}; + _Else -> % unknown key value pair, ignore. + Args + end + end, #changes_args{}, couch_httpd:qs(Req)), + %% if it's an EventSource request with a Last-event-ID header + %% that should override the `since` query string, since it's + %% probably the browser reconnecting. + case ChangesArgs#changes_args.feed of + "eventsource" -> + case couch_httpd:header_value(Req, "last-event-id") of + undefined -> + ChangesArgs; + Value -> + ChangesArgs#changes_args{since=list_to_integer(Value)} + end; + _ -> + ChangesArgs + end. From 7f2af21e573abcd80b8c8412332d6439b4a777b3 Mon Sep 17 00:00:00 2001 From: benoitc Date: Fri, 7 Feb 2014 15:38:34 +0100 Subject: [PATCH 02/11] add supports of view changes in the _changes API Now when the option `seq_indexed=true` is set in the design doc, the view filter in _changes will use it to retrieve the results. Compared to the current way, using a view index will be faster to retrieve changes. It also gives the possibility to filter changes by key or get changes in a key range. All the view options can be used. Note 1: if someone is trying to filter a changes with view options when the views are not indexed by sequence, a 400 error will be returned. Note 2: The changes will only be returned when the view is updated if seq_indexed=true --- src/couch_httpd_changes.erl | 250 +++++++++++++++++++++++++++++++++++- 1 file changed, 246 insertions(+), 4 deletions(-) diff --git a/src/couch_httpd_changes.erl b/src/couch_httpd_changes.erl index 1e431e9c..56ce5597 100644 --- a/src/couch_httpd_changes.erl +++ b/src/couch_httpd_changes.erl @@ -12,7 +12,9 @@ -module(couch_httpd_changes). --export([handle_changes_req/2]). +-export([handle_changes_req/2, + handle_changes/3, + handle_view_changes/3]). -include_lib("couch/include/couch_db.hrl"). @@ -34,9 +36,7 @@ handle_changes_req1(Req, #db{name=DbName}=Db) -> % on other databases, _changes is free for all. ok end, - handle_changes_req2(Req, Db). -handle_changes_req2(Req, Db) -> MakeCallback = fun(Resp) -> fun({change, {ChangeProp}=Change, _}, "eventsource") -> Seq = proplists:get_value(<<"seq">>, ChangeProp), @@ -72,7 +72,7 @@ handle_changes_req2(Req, Db) -> end end, ChangesArgs = parse_changes_query(Req, Db), - ChangesFun = couch_changes:handle_changes(ChangesArgs, Req, Db), + ChangesFun = handle_changes(ChangesArgs, Req, Db), WrapperFun = case ChangesArgs#changes_args.feed of "normal" -> {ok, Info} = couch_db:get_db_info(Db), @@ -116,6 +116,164 @@ handle_changes_req2(Req, Db) -> ) end. + +handle_changes(ChangesArgs, Req, Db) -> + case ChangesArgs#changes_args.filter of + "_view" -> + handle_view_changes(ChangesArgs, Req, Db); + _ -> + couch_changes:handle_changes(ChangesArgs, Req, Db) + end. + +%% wrapper around couch_mrview_changes. +%% This wrapper mimic couch_changes:handle_changes/3 and return a +%% Changefun that can be used by the handle_changes_req function. Also +%% while couch_mrview_changes:handle_changes/6 is returning tha view +%% changes this function return docs corresponding to the changes +%% instead so it can be used to replace the _view filter. +handle_view_changes(ChangesArgs, Req, Db) -> + %% parse view parameter + {DDocId, VName} = parse_view_param(Req), + + %% get view options + Query = case Req of + {json_req, {Props}} -> + {Q} = couch_util:get_value(<<"query">>, Props, {[]}), + Q; + _ -> + couch_httpd:qs(Req) + end, + ViewOptions = parse_view_options(Query, []), + + {ok, Infos} = couch_mrview:get_info(Db, DDocId), + case lists:member(<<"seq_indexed">>, + proplists:get_value(update_options, Infos, [])) of + true -> + handle_view_changes(Db, DDocId, VName, ViewOptions, ChangesArgs, + Req); + false when ViewOptions /= [] -> + ?LOG_ERROR("Tried to filter a non sequence indexed view~n",[]), + throw({bad_request, seqs_not_indexed}); + false -> + %% old method we are getting changes using the btree instead + %% which is not efficient, log it + ?LOG_WARN("Get view changes with seq_indexed=false.~n", []), + couch_changes:handle_changes(ChangesArgs, Req, Db) + end. + +handle_view_changes(#db{name=DbName}=Db0, DDocId, VName, ViewOptions, + ChangesArgs, Req) -> + #changes_args{ + feed = ResponseType, + since = Since, + db_open_options = DbOptions} = ChangesArgs, + + Options0 = [{since, Since}, + {view_options, ViewOptions}], + Options = case ResponseType of + "continuous" -> [stream | Options0]; + "eventsource" -> [stream | Options0]; + "longpoll" -> [{stream, once} | Options0]; + _ -> Options0 + end, + + %% reopen the db with the db options given to the changes args + couch_db:close(Db0), + DbOptions1 = [{user_ctx, Db0#db.user_ctx} | DbOptions], + {ok, Db} = couch_db:open(DbName, DbOptions1), + + + %% initialise the changes fun + ChangesFun = fun(Callback) -> + Callback(start, ResponseType), + + Acc0 = {"", 0, Db, Callback, ChangesArgs}, + couch_mrview_changes:handle_changes(DbName, DDocId, VName, + fun view_changes_cb/2, + Acc0, Options) + end, + ChangesFun. + + +view_changes_cb(stop, {LastSeq, {_, _, _, Callback, Args}}) -> + Callback({stop, LastSeq}, Args#changes_args.feed); + +view_changes_cb(heartbeat, {_, _, _, Callback, Args}=Acc) -> + Callback(timeout, Args#changes_args.feed), + {ok, Acc}; +view_changes_cb({{Seq, _Key, DocId}, _VAl}, + {Prepend, OldLimit, Db0, Callback, Args}=Acc) -> + + #changes_args{ + feed = ResponseType, + limit = Limit} = Args, + + %% if the doc sequence is > to the one in the db record, reopen the + %% database since it means we don't have the latest db value. + Db = case Db0#db.update_seq >= Seq of + true -> Db0; + false -> + {ok, Db1} = couch_db:reopen_db(Db0), + Db1 + end, + + case couch_db:get_doc_info(Db, DocId) of + {ok, DocInfo} -> + %% get change row + ChangeRow = view_change_row(Db, DocInfo, Args), + %% emit change row + Callback({change, ChangeRow, Prepend}, ResponseType), + + %% if we achieved the limit, stop here, else continue. + NewLimit = OldLimit + 1, + if Limit > NewLimit -> + {ok, {<<",\n">>, Db, NewLimit, Callback, Args}}; + true -> + {stop, {<<"">>, Db, NewLimit, Callback, Args}} + end; + {error, not_found} -> + %% doc not found, continue + {ok, Acc}; + Error -> + throw(Error) + end. + + +view_change_row(Db, DocInfo, Args) -> + #doc_info{id = Id, high_seq = Seq, revs = Revs} = DocInfo, + [#rev_info{rev=Rev, deleted=Del} | _] = Revs, + + #changes_args{style=Style, + include_docs=InDoc, + doc_options = DocOpts, + conflicts=Conflicts}=Args, + + Changes = case Style of + main_only -> + [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}]; + all_docs -> + [{[{<<"rev">>, couch_doc:rev_to_str(R)}]} + || #rev_info{rev=R} <- Revs] + end, + + {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"changes">>, Changes}] ++ + deleted_item(Del) ++ case InDoc of + true -> + Opts = case Conflicts of + true -> [deleted, conflicts]; + false -> [deleted] + end, + Doc = couch_index_util:load_doc(Db, DocInfo, Opts), + case Doc of + null -> + [{doc, null}]; + _ -> + [{doc, couch_doc:to_json_obj(Doc, DocOpts)}] + end; + false -> + [] + end}. + parse_changes_query(Req, Db) -> ChangesArgs = lists:foldl(fun({Key, Value}, Args) -> case {string:to_lower(Key), Value} of @@ -172,3 +330,87 @@ parse_changes_query(Req, Db) -> _ -> ChangesArgs end. + +parse_view_param({json_req, {Props}}) -> + {Query} = couch_util:get_value(<<"query">>, Props), + parse_view_param1(couch_util:get_value(<<"view">>, Query, <<"">>)); +parse_view_param(Req) -> + parse_view_param1(list_to_binary(couch_httpd:qs_value(Req, "view", ""))). + +parse_view_param1(ViewParam) -> + case re:split(ViewParam, <<"/">>) of + [DName, ViewName] -> + {<< "_design/", DName/binary >>, ViewName}; + _ -> + throw({bad_request, "Invalid `view` parameter."}) + end. + +parse_view_options([], Acc) -> + Acc; +parse_view_options([{K, V} | Rest], Acc) -> + Acc1 = case couch_util:to_binary(K) of + <<"reduce">> -> + [{reduce, couch_mrview_http:parse_boolean(V)}]; + <<"key">> -> + V1 = parse_json(V), + [{start_key, V1}, {end_key, V1} | Acc]; + <<"keys">> -> + [{keys, parse_json(V)} | Acc]; + <<"startkey">> -> + [{start_key, parse_json(V)} | Acc]; + <<"start_key">> -> + [{start_key, parse_json(V)} | Acc]; + <<"startkey_docid">> -> + [{start_key_docid, couch_util:to_binary(V)} | Acc]; + <<"start_key_docid">> -> + [{start_key_docid, couch_util:to_binary(V)} | Acc]; + <<"endkey">> -> + [{end_key, parse_json(V)} | Acc]; + <<"end_key">> -> + [{end_key, parse_json(V)} | Acc]; + <<"endkey_docid">> -> + [{start_key_docid, couch_util:to_binary(V)} | Acc]; + <<"end_key_docid">> -> + [{start_key_docid, couch_util:to_binary(V)} | Acc]; + <<"limit">> -> + [{limit, couch_mrview_http:parse_pos_int(V)} | Acc]; + <<"count">> -> + throw({query_parse_error, <<"QS param `count` is not `limit`">>}); + <<"stale">> when V =:= <<"ok">> orelse V =:= "ok" -> + [{stale, ok} | Acc]; + <<"stale">> when V =:= <<"update_after">> orelse V =:= "update_after" -> + [{stale, update_after} | Acc]; + <<"stale">> -> + throw({query_parse_error, <<"Invalid value for `stale`.">>}); + <<"descending">> -> + case couch_mrview_http:parse_boolean(V) of + true -> + [{direction, rev} | Acc]; + _ -> + [{direction, fwd} | Acc] + end; + <<"skip">> -> + [{skip, couch_mrview_http:parse_pos_int(V)} | Acc]; + <<"group">> -> + case couch_mrview_http:parse_booolean(V) of + true -> + [{group_level, exact} | Acc]; + _ -> + [{group_level, 0} | Acc] + end; + <<"group_level">> -> + [{group_level, couch_mrview_http:parse_pos_int(V)} | Acc]; + <<"inclusive_end">> -> + [{inclusive_end, couch_mrview_http:parse_boolean(V)}]; + _ -> + Acc + end, + parse_view_options(Rest, Acc1). + +parse_json(V) when is_list(V) -> + ?JSON_DECODE(V); +parse_json(V) -> + V. + +deleted_item(true) -> [{<<"deleted">>, true}]; +deleted_item(_) -> []. From 071dedf1d35b6cd5f5bb0e553de7c81acc43954a Mon Sep 17 00:00:00 2001 From: benoitc Date: Sat, 8 Feb 2014 21:27:58 +0100 Subject: [PATCH 03/11] couch_httpd_changes: check removed keys from the view filter Make sure to only emit deleted document when a deleted key is passed to the view filter. Conflicts: src/couch_httpd_changes.erl --- src/couch_httpd_changes.erl | 40 +++++++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/src/couch_httpd_changes.erl b/src/couch_httpd_changes.erl index 56ce5597..b494b0dd 100644 --- a/src/couch_httpd_changes.erl +++ b/src/couch_httpd_changes.erl @@ -201,9 +201,15 @@ view_changes_cb(stop, {LastSeq, {_, _, _, Callback, Args}}) -> view_changes_cb(heartbeat, {_, _, _, Callback, Args}=Acc) -> Callback(timeout, Args#changes_args.feed), {ok, Acc}; -view_changes_cb({{Seq, _Key, DocId}, _VAl}, +view_changes_cb({{Seq, _Key, DocId}, Val}, {Prepend, OldLimit, Db0, Callback, Args}=Acc) -> + %% is the key removed from the index? + Removed = case Val of + {[{<<"_removed">>, true}]} -> true; + _ -> false + end, + #changes_args{ feed = ResponseType, limit = Limit} = Args, @@ -220,16 +226,24 @@ view_changes_cb({{Seq, _Key, DocId}, _VAl}, case couch_db:get_doc_info(Db, DocId) of {ok, DocInfo} -> %% get change row - ChangeRow = view_change_row(Db, DocInfo, Args), - %% emit change row - Callback({change, ChangeRow, Prepend}, ResponseType), - - %% if we achieved the limit, stop here, else continue. - NewLimit = OldLimit + 1, - if Limit > NewLimit -> - {ok, {<<",\n">>, Db, NewLimit, Callback, Args}}; - true -> - {stop, {<<"">>, Db, NewLimit, Callback, Args}} + {Deleted, ChangeRow} = view_change_row(Db, DocInfo, Args), + + case Removed of + true when Deleted /= true -> + %% the key has been removed from the view but the + %% document hasn't been deleted so ignore it. + {ok, Acc}; + _ -> + %% emit change row + Callback({change, ChangeRow, Prepend}, ResponseType), + + %% if we achieved the limit, stop here, else continue. + NewLimit = OldLimit + 1, + if Limit > NewLimit -> + {ok, {<<",\n">>, NewLimit, Db, Callback, Args}}; + true -> + {stop, {<<"">>, NewLimit, Db, Callback, Args}} + end end; {error, not_found} -> %% doc not found, continue @@ -256,7 +270,7 @@ view_change_row(Db, DocInfo, Args) -> || #rev_info{rev=R} <- Revs] end, - {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"changes">>, Changes}] ++ + {Del, {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"changes">>, Changes}] ++ deleted_item(Del) ++ case InDoc of true -> Opts = case Conflicts of @@ -272,7 +286,7 @@ view_change_row(Db, DocInfo, Args) -> end; false -> [] - end}. + end}}. parse_changes_query(Req, Db) -> ChangesArgs = lists:foldl(fun({Key, Value}, Args) -> From 6d6b801f8440d0fdad1f0c22261d0b64da9362d0 Mon Sep 17 00:00:00 2001 From: Benjamin Bastian Date: Fri, 22 Aug 2014 15:51:47 +0700 Subject: [PATCH 04/11] Add preliminary version of view changes --- src/couch_changes.erl | 177 +++++++++++++++++++++++++----------- src/couch_db.erl | 6 +- src/couch_httpd_changes.erl | 53 +++++------ src/couch_httpd_db.erl | 2 +- 4 files changed, 157 insertions(+), 81 deletions(-) diff --git a/src/couch_changes.erl b/src/couch_changes.erl index b5e8f89b..abe8cf95 100644 --- a/src/couch_changes.erl +++ b/src/couch_changes.erl @@ -12,15 +12,19 @@ -module(couch_changes). -include_lib("couch/include/couch_db.hrl"). +-include_lib("couch_mrview/include/couch_mrview.hrl"). -export([ - handle_changes/3, + handle_db_changes/3, + handle_changes/4, get_changes_timeout/2, - wait_db_updated/3, - get_rest_db_updated/1, + wait_updated/3, + get_rest_updated/1, configure_filter/4, filter/3, - handle_db_event/3 + handle_db_event/3, + handle_view_event/3, + view_filter/3 ]). -export([changes_enumerator/2]). @@ -31,6 +35,9 @@ -record(changes_acc, { db, + view_name, + ddoc_name, + view, seq, prepend, filter, @@ -45,8 +52,10 @@ timeout_fun }). -%% @type Req -> #httpd{} | {json_req, JsonObj()} -handle_changes(Args1, Req, Db0) -> +handle_db_changes(Args, Req, Db) -> + handle_changes(Args, Req, Db, db). + +handle_changes(Args1, Req, Db0, Type) -> #changes_args{ style = Style, filter = FilterName, @@ -54,6 +63,23 @@ handle_changes(Args1, Req, Db0) -> dir = Dir, since = Since } = Args1, + {StartListenerFun, DDocName, ViewName, View} = case Type of + {view, DDocName0, ViewName0} -> + SNFun = fun() -> + couch_event:link_listener( + ?MODULE, handle_view_event, self(), [{dbname, Db0#db.name}] + ) + end, + {ok, {_, View0, _}, _, _} = couch_mrview_util:get_view(Db0#db.name, DDocName0, ViewName0, #mrargs{}), + {SNFun, DDocName0, ViewName0, View0}; + db -> + SNFun = fun() -> + couch_event:link_listener( + ?MODULE, handle_db_event, self(), [{dbname, Db0#db.name}] + ) + end, + {SNFun, undefined, undefined, undefined} + end, Filter = configure_filter(FilterName, Style, Req, Db0), Args = Args1#changes_args{filter_fun = Filter}, Start = fun() -> @@ -78,14 +104,14 @@ handle_changes(Args1, Req, Db0) -> true -> fun(CallbackAcc) -> {Callback, UserAcc} = get_callback_acc(CallbackAcc), - {ok, Listener} = couch_event:link_listener( - ?MODULE, handle_db_event, self(), [{dbname, Db0#db.name}] - ), + {ok, Listener} = StartListenerFun(), + {Db, StartSeq} = Start(), UserAcc2 = start_sending_changes(Callback, UserAcc, Feed), {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback), Acc0 = build_acc(Args, Callback, UserAcc2, Db, StartSeq, - <<"">>, Timeout, TimeoutFun), + <<"">>, Timeout, TimeoutFun, DDocName, ViewName, + View), try keep_sending_changes( Args#changes_args{dir=fwd}, @@ -93,7 +119,7 @@ handle_changes(Args1, Req, Db0) -> true) after couch_event:stop_listener(Listener), - get_rest_db_updated(ok) % clean out any remaining update messages + get_rest_updated(ok) % clean out any remaining update messages end end; false -> @@ -103,11 +129,12 @@ handle_changes(Args1, Req, Db0) -> {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback), {Db, StartSeq} = Start(), Acc0 = build_acc(Args#changes_args{feed="normal"}, Callback, - UserAcc2, Db, StartSeq, <<>>, Timeout, TimeoutFun), + UserAcc2, Db, StartSeq, <<>>, Timeout, TimeoutFun, + DDocName, ViewName, View), {ok, #changes_acc{seq = LastSeq, user_acc = UserAcc3}} = send_changes( - Args#changes_args{feed="normal"}, Acc0, + Dir, true), end_sending_changes(Callback, UserAcc3, LastSeq, Feed) end @@ -115,13 +142,24 @@ handle_changes(Args1, Req, Db0) -> handle_db_event(_DbName, updated, Parent) -> - Parent ! db_updated, + Parent ! updated, {ok, Parent}; handle_db_event(_DbName, _Event, Parent) -> {ok, Parent}. +handle_view_event(_DbName, Msg, {Parent, DDocId}) -> + case Msg of + {index_commit, DDocId} -> + Parent ! updated; + {index_delete, DDocId} -> + Parent ! deleted; + _ -> + ok + end, + {ok, {Parent, DDocId}}. + get_callback_acc({Callback, _UserAcc} = Pair) when is_function(Callback, 3) -> Pair; get_callback_acc(Callback) when is_function(Callback, 2) -> @@ -133,7 +171,7 @@ configure_filter("_doc_ids", Style, Req, _Db) -> configure_filter("_design", Style, _Req, _Db) -> {design_docs, Style}; configure_filter("_view", Style, Req, Db) -> - ViewName = couch_httpd:qs_value(Req, "view", ""), + ViewName = get_view_qs(Req), if ViewName /= "" -> ok; true -> throw({bad_request, "`view` filter parameter is not provided."}) end, @@ -199,6 +237,11 @@ filter(Db, DocInfo, {custom, Style, Req0, DDoc, FName}) -> {ok, Passes} = couch_query_servers:filter_docs(Req, Db, DDoc, FName, Docs), filter_revs(Passes, Docs). +get_view_qs({json_req, {Props}}) -> + {Query} = couch_util:get_value(<<"query">>, Props, {[]}), + binary_to_list(couch_util:get_value(<<"view">>, Query, "")); +get_view_qs(Req) -> + couch_httpd:qs_value(Req, "view", ""). get_doc_ids({json_req, {Props}}) -> check_docids(couch_util:get_value(<<"doc_ids">>, Props)); @@ -310,7 +353,7 @@ start_sending_changes(_Callback, UserAcc, ResponseType) start_sending_changes(Callback, UserAcc, ResponseType) -> Callback(start, ResponseType, UserAcc). -build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun) -> +build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun, DDocName, ViewName, View) -> #changes_args{ include_docs = IncludeDocs, doc_options = DocOpts, @@ -332,24 +375,30 @@ build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun) - doc_options = DocOpts, conflicts = Conflicts, timeout = Timeout, - timeout_fun = TimeoutFun + timeout_fun = TimeoutFun, + ddoc_name = DDocName, + view_name = ViewName, + view = View }. -send_changes(Args, Acc0, FirstRound) -> - #changes_args{ - dir = Dir - } = Args, +send_changes(Acc, Dir, FirstRound) -> #changes_acc{ db = Db, seq = StartSeq, - filter = Filter - } = Acc0, - EnumFun = fun ?MODULE:changes_enumerator/2, + filter = Filter, + view = View + } = Acc, + EnumFun = fun changes_enumerator/2, case can_optimize(FirstRound, Filter) of {true, Fun} -> - Fun(Db, StartSeq, Dir, EnumFun, Acc0, Filter); + Fun(Db, StartSeq, Dir, EnumFun, Acc, Filter); _ -> - couch_db:changes_since(Db, StartSeq, EnumFun, [{dir, Dir}], Acc0) + case View of + undefined -> + couch_db:changes_since(Db, StartSeq, EnumFun, [{dir, Dir}], Acc); + #mrview{} -> + couch_mrview:view_changes_since(View, StartSeq, EnumFun, [{dir, Dir}], Acc) + end end. @@ -422,20 +471,20 @@ keep_sending_changes(Args, Acc0, FirstRound) -> db_open_options = DbOptions } = Args, - {ok, ChangesAcc} = send_changes( - Args#changes_args{dir=fwd}, - Acc0, - FirstRound), + {ok, ChangesAcc} = send_changes(Acc0, fwd, FirstRound), + #changes_acc{ - db = Db, callback = Callback, timeout = Timeout, timeout_fun = TimeoutFun, - seq = EndSeq, prepend = Prepend2, user_acc = UserAcc2, limit = NewLimit + db = Db, callback = Callback, + timeout = Timeout, timeout_fun = TimeoutFun, seq = EndSeq, + prepend = Prepend2, user_acc = UserAcc2, limit = NewLimit, + ddoc_name = DDocName, view_name = ViewName, view = View } = ChangesAcc, couch_db:close(Db), if Limit > NewLimit, ResponseType == "longpoll" -> end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType); true -> - case wait_db_updated(Timeout, TimeoutFun, UserAcc2) of + case wait_updated(Timeout, TimeoutFun, UserAcc2) of {updated, UserAcc4} -> DbOptions1 = [{user_ctx, Db#db.user_ctx} | DbOptions], case couch_db:open(Db#db.name, DbOptions1) of @@ -444,6 +493,7 @@ keep_sending_changes(Args, Acc0, FirstRound) -> Args#changes_args{limit=NewLimit}, ChangesAcc#changes_acc{ db = Db2, + view = maybe_refresh_view(Db2, DDocName, ViewName), user_acc = UserAcc4, seq = EndSeq, prepend = Prepend2, @@ -458,19 +508,31 @@ keep_sending_changes(Args, Acc0, FirstRound) -> end end. +maybe_refresh_view(_, undefined, undefined) -> + undefined; +maybe_refresh_view(Db, DDocName, ViewName) -> + {ok, {_, View, _}, _, _} = couch_mrview_util:get_view(Db#db.name, DDocName, ViewName, #mrargs{}), + View. + end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) -> Callback({stop, EndSeq}, ResponseType, UserAcc). -changes_enumerator(DocInfo, #changes_acc{resp_type = ResponseType} = Acc) +changes_enumerator(Value, #changes_acc{resp_type = ResponseType} = Acc) when ResponseType =:= "continuous" orelse ResponseType =:= "eventsource" -> #changes_acc{ filter = Filter, callback = Callback, user_acc = UserAcc, limit = Limit, db = Db, - timeout = Timeout, timeout_fun = TimeoutFun + timeout = Timeout, timeout_fun = TimeoutFun, + view = View } = Acc, - #doc_info{high_seq = Seq} = DocInfo, - Results0 = filter(Db, DocInfo, Filter), + {Seq, Results0} = case View of + undefined -> + {Value#doc_info.high_seq, filter(Db, Value, Filter)}; + #mrview{} -> + {{Seq0, _}, _} = Value, + {Seq0, [ok]} % TODO + end, Results = [Result || Result <- Results0, Result /= null], %% TODO: I'm thinking this should be < 1 and not =< 1 Go = if Limit =< 1 -> stop; true -> ok end, @@ -484,19 +546,24 @@ changes_enumerator(DocInfo, #changes_acc{resp_type = ResponseType} = Acc) {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}} end; _ -> - ChangesRow = changes_row(Results, DocInfo, Acc), + ChangesRow = changes_row(Results, Value, Acc), UserAcc2 = Callback({change, ChangesRow, <<>>}, ResponseType, UserAcc), reset_heartbeat(), {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2, limit = Limit - 1}} end; -changes_enumerator(DocInfo, Acc) -> +changes_enumerator(Value, Acc) -> #changes_acc{ filter = Filter, callback = Callback, prepend = Prepend, user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db, - timeout = Timeout, timeout_fun = TimeoutFun + timeout = Timeout, timeout_fun = TimeoutFun, view = View } = Acc, - #doc_info{high_seq = Seq} = DocInfo, - Results0 = filter(Db, DocInfo, Filter), + {Seq, Results0} = case View of + undefined -> + {Value#doc_info.high_seq, filter(Db, Value, Filter)}; + #mrview{} -> + {{Seq0,_}, _} = Value, + {Seq0, [ok]} % TODO view filter + end, Results = [Result || Result <- Results0, Result /= null], Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end, case Results of @@ -509,7 +576,7 @@ changes_enumerator(DocInfo, Acc) -> {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}} end; _ -> - ChangesRow = changes_row(Results, DocInfo, Acc), + ChangesRow = changes_row(Results, Value, Acc), UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc), reset_heartbeat(), {Go, Acc#changes_acc{ @@ -518,7 +585,11 @@ changes_enumerator(DocInfo, Acc) -> end. -changes_row(Results, DocInfo, Acc) -> + +changes_row(Results, SeqStuff, #changes_acc{view=#mrview{}}) -> + {{Seq, Key}, {Id, Value}} = SeqStuff, + {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"key">>, Key}, {<<"value">>, Value}, {<<"changes">>, Results}]}; +changes_row(Results, #doc_info{}=DocInfo, #changes_acc{view=undefined}=Acc) -> #doc_info{ id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _] } = DocInfo, @@ -549,25 +620,27 @@ changes_row(Results, DocInfo, Acc) -> deleted_item(true) -> [{<<"deleted">>, true}]; deleted_item(_) -> []. -% waits for a db_updated msg, if there are multiple msgs, collects them. -wait_db_updated(Timeout, TimeoutFun, UserAcc) -> +% waits for a updated msg, if there are multiple msgs, collects them. +wait_updated(Timeout, TimeoutFun, UserAcc) -> receive - db_updated -> - get_rest_db_updated(UserAcc) + updated -> + get_rest_updated(UserAcc); + deleted -> + {stop, UserAcc} after Timeout -> {Go, UserAcc2} = TimeoutFun(UserAcc), case Go of ok -> - wait_db_updated(Timeout, TimeoutFun, UserAcc2); + wait_updated(Timeout, TimeoutFun, UserAcc2); stop -> {stop, UserAcc2} end end. -get_rest_db_updated(UserAcc) -> +get_rest_updated(UserAcc) -> receive - db_updated -> - get_rest_db_updated(UserAcc) + updated -> + get_rest_updated(UserAcc) after 0 -> {updated, UserAcc} end. diff --git a/src/couch_db.erl b/src/couch_db.erl index 5d4619a5..b5ea64dd 100644 --- a/src/couch_db.erl +++ b/src/couch_db.erl @@ -1173,7 +1173,9 @@ enum_docs_reduce_to_count(Reds) -> changes_since(Db, StartSeq, Fun, Acc) -> changes_since(Db, StartSeq, Fun, [], Acc). -changes_since(Db, StartSeq, Fun, Options, Acc) -> +changes_since(Db, StartSeq, Fun, Options, Acc) when is_record(Db, db) -> + changes_since(Db#db.seq_tree, StartSeq, Fun, Options, Acc); +changes_since(SeqTree, StartSeq, Fun, Options, Acc) -> Wrapper = fun(FullDocInfo, _Offset, Acc2) -> DocInfo = case FullDocInfo of #full_doc_info{} -> @@ -1183,7 +1185,7 @@ changes_since(Db, StartSeq, Fun, Options, Acc) -> end, Fun(DocInfo, Acc2) end, - {ok, _LastReduction, AccOut} = couch_btree:fold(Db#db.seq_tree, + {ok, _LastReduction, AccOut} = couch_btree:fold(SeqTree, Wrapper, Acc, [{start_key, StartSeq + 1}] ++ Options), {ok, AccOut}. diff --git a/src/couch_httpd_changes.erl b/src/couch_httpd_changes.erl index b494b0dd..9f141d9f 100644 --- a/src/couch_httpd_changes.erl +++ b/src/couch_httpd_changes.erl @@ -12,22 +12,33 @@ -module(couch_httpd_changes). --export([handle_changes_req/2, - handle_changes/3, - handle_view_changes/3]). +-export([handle_db_changes_req/2, + handle_changes_req/4, + handle_view_filtered_changes/3, + parse_changes_query/3]). -include_lib("couch/include/couch_db.hrl"). -handle_changes_req(#httpd{method='POST'}=Req, Db) -> +handle_db_changes_req(Req, Db) -> + ChangesArgs = parse_changes_query(Req, Db, false), + ChangesFun = case ChangesArgs#changes_args.filter of + "_view" -> + handle_view_filtered_changes(ChangesArgs, Req, Db); + _ -> + couch_changes:handle_db_changes(ChangesArgs, Req, Db) + end, + handle_changes_req(Req, Db, ChangesArgs, ChangesFun). + +handle_changes_req(#httpd{method='POST'}=Req, Db, ChangesArgs, ChangesFun) -> couch_httpd:validate_ctype(Req, "application/json"), - handle_changes_req1(Req, Db); -handle_changes_req(#httpd{method='GET'}=Req, Db) -> - handle_changes_req1(Req, Db); -handle_changes_req(#httpd{path_parts=[_,<<"_changes">>]}=Req, _Db) -> + handle_changes_req1(Req, Db, ChangesArgs, ChangesFun); +handle_changes_req(#httpd{method='GET'}=Req, Db, ChangesArgs, ChangesFun) -> + handle_changes_req1(Req, Db, ChangesArgs, ChangesFun); +handle_changes_req(#httpd{}=Req, _Db, _ChangesArgs, _ChangesFun) -> couch_httpd:send_method_not_allowed(Req, "GET,HEAD,POST"). -handle_changes_req1(Req, #db{name=DbName}=Db) -> - AuthDbName = ?l2b(couch_config:get("couch_httpd_auth", "authentication_db")), +handle_changes_req1(Req, #db{name=DbName}=Db, ChangesArgs, ChangesFun) -> + AuthDbName = ?l2b(config:get("couch_httpd_auth", "authentication_db")), case AuthDbName of DbName -> % in the authentication database, _changes is admin-only. @@ -71,8 +82,6 @@ handle_changes_req1(Req, #db{name=DbName}=Db) -> couch_httpd:send_chunk(Resp, "\n") end end, - ChangesArgs = parse_changes_query(Req, Db), - ChangesFun = handle_changes(ChangesArgs, Req, Db), WrapperFun = case ChangesArgs#changes_args.feed of "normal" -> {ok, Info} = couch_db:get_db_info(Db), @@ -117,21 +126,13 @@ handle_changes_req1(Req, #db{name=DbName}=Db) -> end. -handle_changes(ChangesArgs, Req, Db) -> - case ChangesArgs#changes_args.filter of - "_view" -> - handle_view_changes(ChangesArgs, Req, Db); - _ -> - couch_changes:handle_changes(ChangesArgs, Req, Db) - end. - %% wrapper around couch_mrview_changes. -%% This wrapper mimic couch_changes:handle_changes/3 and return a +%% This wrapper mimic couch_changes:handle_db_changes/3 and return a %% Changefun that can be used by the handle_changes_req function. Also %% while couch_mrview_changes:handle_changes/6 is returning tha view %% changes this function return docs corresponding to the changes %% instead so it can be used to replace the _view filter. -handle_view_changes(ChangesArgs, Req, Db) -> +handle_view_filtered_changes(ChangesArgs, Req, Db) -> %% parse view parameter {DDocId, VName} = parse_view_param(Req), @@ -149,7 +150,7 @@ handle_view_changes(ChangesArgs, Req, Db) -> case lists:member(<<"seq_indexed">>, proplists:get_value(update_options, Infos, [])) of true -> - handle_view_changes(Db, DDocId, VName, ViewOptions, ChangesArgs, + handle_view_filtered_changes(Db, DDocId, VName, ViewOptions, ChangesArgs, Req); false when ViewOptions /= [] -> ?LOG_ERROR("Tried to filter a non sequence indexed view~n",[]), @@ -158,10 +159,10 @@ handle_view_changes(ChangesArgs, Req, Db) -> %% old method we are getting changes using the btree instead %% which is not efficient, log it ?LOG_WARN("Get view changes with seq_indexed=false.~n", []), - couch_changes:handle_changes(ChangesArgs, Req, Db) + couch_changes:handle_db_changes(ChangesArgs, Req, Db) end. -handle_view_changes(#db{name=DbName}=Db0, DDocId, VName, ViewOptions, +handle_view_filtered_changes(#db{name=DbName}=Db0, DDocId, VName, ViewOptions, ChangesArgs, Req) -> #changes_args{ feed = ResponseType, @@ -288,7 +289,7 @@ view_change_row(Db, DocInfo, Args) -> [] end}}. -parse_changes_query(Req, Db) -> +parse_changes_query(Req, Db, IsViewChanges) -> ChangesArgs = lists:foldl(fun({Key, Value}, Args) -> case {string:to_lower(Key), Value} of {"feed", _} -> diff --git a/src/couch_httpd_db.erl b/src/couch_httpd_db.erl index 4ff4d989..442e8725 100644 --- a/src/couch_httpd_db.erl +++ b/src/couch_httpd_db.erl @@ -112,7 +112,7 @@ handle_changes_req2(Req, Db) -> end end, ChangesArgs = parse_changes_query(Req, Db), - ChangesFun = couch_changes:handle_changes(ChangesArgs, Req, Db), + ChangesFun = couch_changes:handle_db_changes(ChangesArgs, Req, Db), WrapperFun = case ChangesArgs#changes_args.feed of "normal" -> {ok, Info} = couch_db:get_db_info(Db), From 20e585dd07e4da0fe67ace09f0a291e3fe759534 Mon Sep 17 00:00:00 2001 From: Benjamin Bastian Date: Fri, 22 Aug 2014 23:56:56 +0700 Subject: [PATCH 05/11] Add view filtering optimization to changes feeds --- src/couch_changes.erl | 181 ++++++++++++++++++++++-------------- src/couch_httpd_changes.erl | 180 +---------------------------------- 2 files changed, 116 insertions(+), 245 deletions(-) diff --git a/src/couch_changes.erl b/src/couch_changes.erl index abe8cf95..259f83cb 100644 --- a/src/couch_changes.erl +++ b/src/couch_changes.erl @@ -63,25 +63,45 @@ handle_changes(Args1, Req, Db0, Type) -> dir = Dir, since = Since } = Args1, - {StartListenerFun, DDocName, ViewName, View} = case Type of - {view, DDocName0, ViewName0} -> - SNFun = fun() -> - couch_event:link_listener( - ?MODULE, handle_view_event, self(), [{dbname, Db0#db.name}] - ) - end, - {ok, {_, View0, _}, _, _} = couch_mrview_util:get_view(Db0#db.name, DDocName0, ViewName0, #mrargs{}), - {SNFun, DDocName0, ViewName0, View0}; - db -> - SNFun = fun() -> - couch_event:link_listener( - ?MODULE, handle_db_event, self(), [{dbname, Db0#db.name}] - ) - end, - {SNFun, undefined, undefined, undefined} - end, Filter = configure_filter(FilterName, Style, Req, Db0), Args = Args1#changes_args{filter_fun = Filter}, + UseViewChanges = case {Type, Filter} of + {{view, _, _}, _} -> + true; + {_, {fast_view, _, _, _}} -> + true; + _ -> + false + end, + {StartListenerFun, DDocName, ViewName, View} = if UseViewChanges -> + {DDocName0, ViewName0} = case {Type, Filter} of + {{view, DDocName1, ViewName1}, _} -> + {DDocName1, ViewName1}; + {_, {fast_view, _, DDoc, ViewName1}} -> + {DDoc#doc.id, ViewName1} + end, + {ok, {_, View0, _}, _, _} = couch_mrview_util:get_view( + Db0#db.name, DDocName0, ViewName0, #mrargs{}), + case View0#mrview.seq_btree of + #btree{} -> + ok; + _ -> + throw({bad_request, "view changes not enabled"}) + end, + SNFun = fun() -> + couch_event:link_listener( + ?MODULE, handle_view_event, {self(), DDocName0}, [{dbname, Db0#db.name}] + ) + end, + {SNFun, DDocName0, ViewName0, View0}; + true -> + SNFun = fun() -> + couch_event:link_listener( + ?MODULE, handle_db_event, self(), [{dbname, Db0#db.name}] + ) + end, + {SNFun, undefined, undefined, undefined} + end, Start = fun() -> {ok, Db} = couch_db:reopen(Db0), StartSeq = case Dir of @@ -180,7 +200,15 @@ configure_filter("_view", Style, Req, Db) -> [DName, VName] -> {ok, DDoc} = open_ddoc(Db, <<"_design/", DName/binary>>), check_member_exists(DDoc, [<<"views">>, VName]), - {view, Style, DDoc, VName}; + try + true = couch_util:get_nested_json_value( + DDoc#doc.body, + [<<"options">>, <<"seq_indexed">>] + ), + {fast_view, Style, DDoc, VName} + catch _:_ -> + {view, Style, DDoc, VName} + end; [] -> Msg = "`view` must be of the form `designname/viewname`", throw({bad_request, Msg}) @@ -237,6 +265,36 @@ filter(Db, DocInfo, {custom, Style, Req0, DDoc, FName}) -> {ok, Passes} = couch_query_servers:filter_docs(Req, Db, DDoc, FName, Docs), filter_revs(Passes, Docs). +fast_view_filter(Db, {{Seq, _}, {ID, _}}, {fast_view, Style, _, _}) -> + case couch_db:get_doc_info(Db, ID) of + {ok, #doc_info{high_seq=Seq}=DocInfo} -> + Docs = open_revs(Db, DocInfo, Style), + Changes = lists:map(fun(#doc{revs={RevPos, [RevId | _]}}) -> + RevStr = couch_doc:rev_to_str({RevPos, RevId}), + {[{<<"rev">>, RevStr}]} + end, Docs), + {DocInfo, Changes}; + {ok, #doc_info{high_seq=HighSeq}} when Seq > HighSeq -> + % If the view seq tree is out of date (or if the view seq tree + % was opened before the db) seqs may come by from the seq tree + % which correspond to the not-most-current revision of a document. + % The proper thing to do is to not send this old revision, but wait + % until we reopen the up-to-date view seq tree and continue the + % fold. + % I left the Seq > HighSeq guard in so if (for some godforsaken + % reason) the seq in the view is more current than the database, + % we'll throw an error. + {ok, []}; + {error, not_found} -> + {ok, []} + end. + + + +view_filter(_Db, _KV, {default, _Style}) -> + [ok]. % TODO: make a real thing + + get_view_qs({json_req, {Props}}) -> {Query} = couch_util:get_value(<<"query">>, Props, {[]}), binary_to_list(couch_util:get_value(<<"view">>, Query, "")); @@ -477,7 +535,7 @@ keep_sending_changes(Args, Acc0, FirstRound) -> db = Db, callback = Callback, timeout = Timeout, timeout_fun = TimeoutFun, seq = EndSeq, prepend = Prepend2, user_acc = UserAcc2, limit = NewLimit, - ddoc_name = DDocName, view_name = ViewName, view = View + ddoc_name = DDocName, view_name = ViewName } = ChangesAcc, couch_db:close(Db), @@ -517,54 +575,27 @@ maybe_refresh_view(Db, DDocName, ViewName) -> end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) -> Callback({stop, EndSeq}, ResponseType, UserAcc). -changes_enumerator(Value, #changes_acc{resp_type = ResponseType} = Acc) - when ResponseType =:= "continuous" - orelse ResponseType =:= "eventsource" -> - #changes_acc{ - filter = Filter, callback = Callback, - user_acc = UserAcc, limit = Limit, db = Db, - timeout = Timeout, timeout_fun = TimeoutFun, - view = View - } = Acc, - {Seq, Results0} = case View of - undefined -> - {Value#doc_info.high_seq, filter(Db, Value, Filter)}; - #mrview{} -> - {{Seq0, _}, _} = Value, - {Seq0, [ok]} % TODO - end, - Results = [Result || Result <- Results0, Result /= null], - %% TODO: I'm thinking this should be < 1 and not =< 1 - Go = if Limit =< 1 -> stop; true -> ok end, - case Results of - [] -> - {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc), - case Done of - stop -> - {stop, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}}; - ok -> - {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}} - end; - _ -> - ChangesRow = changes_row(Results, Value, Acc), - UserAcc2 = Callback({change, ChangesRow, <<>>}, ResponseType, UserAcc), - reset_heartbeat(), - {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2, limit = Limit - 1}} - end; -changes_enumerator(Value, Acc) -> +changes_enumerator(Value0, Acc) -> #changes_acc{ filter = Filter, callback = Callback, prepend = Prepend, user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db, timeout = Timeout, timeout_fun = TimeoutFun, view = View } = Acc, - {Seq, Results0} = case View of - undefined -> - {Value#doc_info.high_seq, filter(Db, Value, Filter)}; - #mrview{} -> - {{Seq0,_}, _} = Value, - {Seq0, [ok]} % TODO view filter + {Value, Results0} = case {View, Filter} of + {_, {fast_view, _, _, _}} -> + fast_view_filter(Db, Value0, Filter); + {#mrview{}, _} -> + {Value0, view_filter(Db, Value0, Filter)}; + {_, _} -> + {Value0, filter(Db, Value0, Filter)} end, Results = [Result || Result <- Results0, Result /= null], + Seq = case Value of + #doc_info{} -> + Value#doc_info.high_seq; + {{Seq0, _}, _} -> + Seq0 + end, Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end, case Results of [] -> @@ -576,20 +607,32 @@ changes_enumerator(Value, Acc) -> {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}} end; _ -> - ChangesRow = changes_row(Results, Value, Acc), - UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc), - reset_heartbeat(), - {Go, Acc#changes_acc{ - seq = Seq, prepend = <<",\n">>, - user_acc = UserAcc2, limit = Limit - 1}} + if ResponseType =:= "continuous" orelse ResponseType =:= "eventsource" -> + ChangesRow = changes_row(Results, Value, Acc), + UserAcc2 = Callback({change, ChangesRow, <<>>}, ResponseType, UserAcc), + reset_heartbeat(), + {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2, limit = Limit - 1}}; + true -> + ChangesRow = changes_row(Results, Value, Acc), + UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc), + reset_heartbeat(), + {Go, Acc#changes_acc{ + seq = Seq, prepend = <<",\n">>, + user_acc = UserAcc2, limit = Limit - 1}} + end end. -changes_row(Results, SeqStuff, #changes_acc{view=#mrview{}}) -> - {{Seq, Key}, {Id, Value}} = SeqStuff, +changes_row(Results, DocInfo, #changes_acc{filter={fast_view,_,_,_}}=Acc) -> + format_doc_info_change(Results, DocInfo, Acc); +changes_row(Results, KV, #changes_acc{view=#mrview{}}) -> + {{Seq, Key}, {Id, Value}} = KV, {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"key">>, Key}, {<<"value">>, Value}, {<<"changes">>, Results}]}; -changes_row(Results, #doc_info{}=DocInfo, #changes_acc{view=undefined}=Acc) -> +changes_row(Results, #doc_info{}=DocInfo, Acc) -> + format_doc_info_change(Results, DocInfo, Acc). + +format_doc_info_change(Results, #doc_info{}=DocInfo, Acc) -> #doc_info{ id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _] } = DocInfo, diff --git a/src/couch_httpd_changes.erl b/src/couch_httpd_changes.erl index 9f141d9f..4963a5f0 100644 --- a/src/couch_httpd_changes.erl +++ b/src/couch_httpd_changes.erl @@ -14,19 +14,13 @@ -export([handle_db_changes_req/2, handle_changes_req/4, - handle_view_filtered_changes/3, - parse_changes_query/3]). + parse_changes_query/2]). -include_lib("couch/include/couch_db.hrl"). handle_db_changes_req(Req, Db) -> - ChangesArgs = parse_changes_query(Req, Db, false), - ChangesFun = case ChangesArgs#changes_args.filter of - "_view" -> - handle_view_filtered_changes(ChangesArgs, Req, Db); - _ -> - couch_changes:handle_db_changes(ChangesArgs, Req, Db) - end, + ChangesArgs = parse_changes_query(Req, Db), + ChangesFun = couch_changes:handle_db_changes(ChangesArgs, Req, Db), handle_changes_req(Req, Db, ChangesArgs, ChangesFun). handle_changes_req(#httpd{method='POST'}=Req, Db, ChangesArgs, ChangesFun) -> @@ -126,170 +120,7 @@ handle_changes_req1(Req, #db{name=DbName}=Db, ChangesArgs, ChangesFun) -> end. -%% wrapper around couch_mrview_changes. -%% This wrapper mimic couch_changes:handle_db_changes/3 and return a -%% Changefun that can be used by the handle_changes_req function. Also -%% while couch_mrview_changes:handle_changes/6 is returning tha view -%% changes this function return docs corresponding to the changes -%% instead so it can be used to replace the _view filter. -handle_view_filtered_changes(ChangesArgs, Req, Db) -> - %% parse view parameter - {DDocId, VName} = parse_view_param(Req), - - %% get view options - Query = case Req of - {json_req, {Props}} -> - {Q} = couch_util:get_value(<<"query">>, Props, {[]}), - Q; - _ -> - couch_httpd:qs(Req) - end, - ViewOptions = parse_view_options(Query, []), - - {ok, Infos} = couch_mrview:get_info(Db, DDocId), - case lists:member(<<"seq_indexed">>, - proplists:get_value(update_options, Infos, [])) of - true -> - handle_view_filtered_changes(Db, DDocId, VName, ViewOptions, ChangesArgs, - Req); - false when ViewOptions /= [] -> - ?LOG_ERROR("Tried to filter a non sequence indexed view~n",[]), - throw({bad_request, seqs_not_indexed}); - false -> - %% old method we are getting changes using the btree instead - %% which is not efficient, log it - ?LOG_WARN("Get view changes with seq_indexed=false.~n", []), - couch_changes:handle_db_changes(ChangesArgs, Req, Db) - end. - -handle_view_filtered_changes(#db{name=DbName}=Db0, DDocId, VName, ViewOptions, - ChangesArgs, Req) -> - #changes_args{ - feed = ResponseType, - since = Since, - db_open_options = DbOptions} = ChangesArgs, - - Options0 = [{since, Since}, - {view_options, ViewOptions}], - Options = case ResponseType of - "continuous" -> [stream | Options0]; - "eventsource" -> [stream | Options0]; - "longpoll" -> [{stream, once} | Options0]; - _ -> Options0 - end, - - %% reopen the db with the db options given to the changes args - couch_db:close(Db0), - DbOptions1 = [{user_ctx, Db0#db.user_ctx} | DbOptions], - {ok, Db} = couch_db:open(DbName, DbOptions1), - - - %% initialise the changes fun - ChangesFun = fun(Callback) -> - Callback(start, ResponseType), - - Acc0 = {"", 0, Db, Callback, ChangesArgs}, - couch_mrview_changes:handle_changes(DbName, DDocId, VName, - fun view_changes_cb/2, - Acc0, Options) - end, - ChangesFun. - - -view_changes_cb(stop, {LastSeq, {_, _, _, Callback, Args}}) -> - Callback({stop, LastSeq}, Args#changes_args.feed); - -view_changes_cb(heartbeat, {_, _, _, Callback, Args}=Acc) -> - Callback(timeout, Args#changes_args.feed), - {ok, Acc}; -view_changes_cb({{Seq, _Key, DocId}, Val}, - {Prepend, OldLimit, Db0, Callback, Args}=Acc) -> - - %% is the key removed from the index? - Removed = case Val of - {[{<<"_removed">>, true}]} -> true; - _ -> false - end, - - #changes_args{ - feed = ResponseType, - limit = Limit} = Args, - - %% if the doc sequence is > to the one in the db record, reopen the - %% database since it means we don't have the latest db value. - Db = case Db0#db.update_seq >= Seq of - true -> Db0; - false -> - {ok, Db1} = couch_db:reopen_db(Db0), - Db1 - end, - - case couch_db:get_doc_info(Db, DocId) of - {ok, DocInfo} -> - %% get change row - {Deleted, ChangeRow} = view_change_row(Db, DocInfo, Args), - - case Removed of - true when Deleted /= true -> - %% the key has been removed from the view but the - %% document hasn't been deleted so ignore it. - {ok, Acc}; - _ -> - %% emit change row - Callback({change, ChangeRow, Prepend}, ResponseType), - - %% if we achieved the limit, stop here, else continue. - NewLimit = OldLimit + 1, - if Limit > NewLimit -> - {ok, {<<",\n">>, NewLimit, Db, Callback, Args}}; - true -> - {stop, {<<"">>, NewLimit, Db, Callback, Args}} - end - end; - {error, not_found} -> - %% doc not found, continue - {ok, Acc}; - Error -> - throw(Error) - end. - - -view_change_row(Db, DocInfo, Args) -> - #doc_info{id = Id, high_seq = Seq, revs = Revs} = DocInfo, - [#rev_info{rev=Rev, deleted=Del} | _] = Revs, - - #changes_args{style=Style, - include_docs=InDoc, - doc_options = DocOpts, - conflicts=Conflicts}=Args, - - Changes = case Style of - main_only -> - [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}]; - all_docs -> - [{[{<<"rev">>, couch_doc:rev_to_str(R)}]} - || #rev_info{rev=R} <- Revs] - end, - - {Del, {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"changes">>, Changes}] ++ - deleted_item(Del) ++ case InDoc of - true -> - Opts = case Conflicts of - true -> [deleted, conflicts]; - false -> [deleted] - end, - Doc = couch_index_util:load_doc(Db, DocInfo, Opts), - case Doc of - null -> - [{doc, null}]; - _ -> - [{doc, couch_doc:to_json_obj(Doc, DocOpts)}] - end; - false -> - [] - end}}. - -parse_changes_query(Req, Db, IsViewChanges) -> +parse_changes_query(Req, Db) -> ChangesArgs = lists:foldl(fun({Key, Value}, Args) -> case {string:to_lower(Key), Value} of {"feed", _} -> @@ -426,6 +257,3 @@ parse_json(V) when is_list(V) -> ?JSON_DECODE(V); parse_json(V) -> V. - -deleted_item(true) -> [{<<"deleted">>, true}]; -deleted_item(_) -> []. From 434b5414ed975747e8144f9c5542cefa3f83fe3c Mon Sep 17 00:00:00 2001 From: Benjamin Bastian Date: Mon, 25 Aug 2014 16:50:05 +0700 Subject: [PATCH 06/11] Add rev to view changes respose. See couch_mrview for corresponding changes --- src/couch_changes.erl | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/src/couch_changes.erl b/src/couch_changes.erl index 259f83cb..50f9d50c 100644 --- a/src/couch_changes.erl +++ b/src/couch_changes.erl @@ -265,7 +265,7 @@ filter(Db, DocInfo, {custom, Style, Req0, DDoc, FName}) -> {ok, Passes} = couch_query_servers:filter_docs(Req, Db, DDoc, FName, Docs), filter_revs(Passes, Docs). -fast_view_filter(Db, {{Seq, _}, {ID, _}}, {fast_view, Style, _, _}) -> +fast_view_filter(Db, {{Seq, _}, {ID, _, _}}, {fast_view, Style, _, _}) -> case couch_db:get_doc_info(Db, ID) of {ok, #doc_info{high_seq=Seq}=DocInfo} -> Docs = open_revs(Db, DocInfo, Style), @@ -284,15 +284,15 @@ fast_view_filter(Db, {{Seq, _}, {ID, _}}, {fast_view, Style, _, _}) -> % I left the Seq > HighSeq guard in so if (for some godforsaken % reason) the seq in the view is more current than the database, % we'll throw an error. - {ok, []}; + {undefined, []}; {error, not_found} -> - {ok, []} + {undefined, []} end. -view_filter(_Db, _KV, {default, _Style}) -> - [ok]. % TODO: make a real thing +view_filter(Db, KV, {default, Style}) -> + apply_view_style(Db, KV, Style). get_view_qs({json_req, {Props}}) -> @@ -353,6 +353,16 @@ apply_style(#doc_info{revs=Revs}, main_only) -> apply_style(#doc_info{revs=Revs}, all_docs) -> [{[{<<"rev">>, couch_doc:rev_to_str(R)}]} || #rev_info{rev=R} <- Revs]. +apply_view_style(_Db, {{_Seq, _Key}, {_ID, _Value, Rev}}, main_only) -> + [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}]; +apply_view_style(Db, {{_Seq, _Key}, {ID, _Value, _Rev}}, all_docs) -> + case couch_db:get_doc_info(Db, ID) of + {ok, DocInfo} -> + apply_style(DocInfo, all_docs); + {error, not_found} -> + [] + end. + open_revs(Db, DocInfo, Style) -> DocInfos = case Style of @@ -627,7 +637,7 @@ changes_enumerator(Value0, Acc) -> changes_row(Results, DocInfo, #changes_acc{filter={fast_view,_,_,_}}=Acc) -> format_doc_info_change(Results, DocInfo, Acc); changes_row(Results, KV, #changes_acc{view=#mrview{}}) -> - {{Seq, Key}, {Id, Value}} = KV, + {{Seq, Key}, {Id, Value, _Rev}} = KV, {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"key">>, Key}, {<<"value">>, Value}, {<<"changes">>, Results}]}; changes_row(Results, #doc_info{}=DocInfo, Acc) -> format_doc_info_change(Results, DocInfo, Acc). From 8d5c90094214ad186641fe25f1c70d621adda378 Mon Sep 17 00:00:00 2001 From: Benjamin Bastian Date: Tue, 26 Aug 2014 15:34:21 +0700 Subject: [PATCH 07/11] Open view in changes start function rather than outside --- src/couch_changes.erl | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/src/couch_changes.erl b/src/couch_changes.erl index 50f9d50c..8ff109b5 100644 --- a/src/couch_changes.erl +++ b/src/couch_changes.erl @@ -73,7 +73,7 @@ handle_changes(Args1, Req, Db0, Type) -> _ -> false end, - {StartListenerFun, DDocName, ViewName, View} = if UseViewChanges -> + {StartListenerFun, DDocName, ViewName} = if UseViewChanges -> {DDocName0, ViewName0} = case {Type, Filter} of {{view, DDocName1, ViewName1}, _} -> {DDocName1, ViewName1}; @@ -93,14 +93,14 @@ handle_changes(Args1, Req, Db0, Type) -> ?MODULE, handle_view_event, {self(), DDocName0}, [{dbname, Db0#db.name}] ) end, - {SNFun, DDocName0, ViewName0, View0}; - true -> + {SNFun, DDocName0, ViewName0}; + true -> SNFun = fun() -> couch_event:link_listener( ?MODULE, handle_db_event, self(), [{dbname, Db0#db.name}] ) end, - {SNFun, undefined, undefined, undefined} + {SNFun, undefined, undefined} end, Start = fun() -> {ok, Db} = couch_db:reopen(Db0), @@ -110,7 +110,14 @@ handle_changes(Args1, Req, Db0, Type) -> fwd -> Since end, - {Db, StartSeq} + View2 = if UseViewChanges -> + {ok, {_, View1, _}, _, _} = couch_mrview_util:get_view( + Db0#db.name, DDocName, ViewName, #mrargs{}), + View1; + true -> + undefined + end, + {Db, View2, StartSeq} end, % begin timer to deal with heartbeat when filter function fails case Args#changes_args.heartbeat of @@ -126,7 +133,7 @@ handle_changes(Args1, Req, Db0, Type) -> {Callback, UserAcc} = get_callback_acc(CallbackAcc), {ok, Listener} = StartListenerFun(), - {Db, StartSeq} = Start(), + {Db, View, StartSeq} = Start(), UserAcc2 = start_sending_changes(Callback, UserAcc, Feed), {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback), Acc0 = build_acc(Args, Callback, UserAcc2, Db, StartSeq, @@ -147,7 +154,7 @@ handle_changes(Args1, Req, Db0, Type) -> {Callback, UserAcc} = get_callback_acc(CallbackAcc), UserAcc2 = start_sending_changes(Callback, UserAcc, Feed), {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback), - {Db, StartSeq} = Start(), + {Db, View, StartSeq} = Start(), Acc0 = build_acc(Args#changes_args{feed="normal"}, Callback, UserAcc2, Db, StartSeq, <<>>, Timeout, TimeoutFun, DDocName, ViewName, View), From 20cd47e7164975618b32084a10774e4f8103e608 Mon Sep 17 00:00:00 2001 From: Benjamin Bastian Date: Tue, 26 Aug 2014 15:55:07 +0700 Subject: [PATCH 08/11] Make include_docs=true work for view changes --- src/couch_changes.erl | 46 ++++++++++++++++++++++++------------------- 1 file changed, 26 insertions(+), 20 deletions(-) diff --git a/src/couch_changes.erl b/src/couch_changes.erl index 8ff109b5..29920daa 100644 --- a/src/couch_changes.erl +++ b/src/couch_changes.erl @@ -643,9 +643,9 @@ changes_enumerator(Value0, Acc) -> changes_row(Results, DocInfo, #changes_acc{filter={fast_view,_,_,_}}=Acc) -> format_doc_info_change(Results, DocInfo, Acc); -changes_row(Results, KV, #changes_acc{view=#mrview{}}) -> - {{Seq, Key}, {Id, Value, _Rev}} = KV, - {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"key">>, Key}, {<<"value">>, Value}, {<<"changes">>, Results}]}; +changes_row(Results, KV, #changes_acc{view=#mrview{}}=Acc) -> + {{Seq, Key}, {Id, Value, Rev}} = KV, + {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"key">>, Key}, {<<"value">>, Value}, {<<"changes">>, Results}] ++ maybe_get_changes_doc({Id, Rev}, Acc)}; changes_row(Results, #doc_info{}=DocInfo, Acc) -> format_doc_info_change(Results, DocInfo, Acc). @@ -653,29 +653,35 @@ format_doc_info_change(Results, #doc_info{}=DocInfo, Acc) -> #doc_info{ id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _] } = DocInfo, + {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"changes">>, Results}] ++ + deleted_item(Del) ++ maybe_get_changes_doc(DocInfo, Acc)}. + +maybe_get_changes_doc(Value, #changes_acc{include_docs=true}=Acc) -> #changes_acc{ db = Db, include_docs = IncDoc, doc_options = DocOpts, conflicts = Conflicts } = Acc, - {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"changes">>, Results}] ++ - deleted_item(Del) ++ case IncDoc of - true -> - Opts = case Conflicts of - true -> [deleted, conflicts]; - false -> [deleted] - end, - Doc = couch_index_util:load_doc(Db, DocInfo, Opts), - case Doc of - null -> - [{doc, null}]; - _ -> - [{doc, couch_doc:to_json_obj(Doc, DocOpts)}] - end; - false -> - [] - end}. + case IncDoc of + true -> + Opts = case Conflicts of + true -> [deleted, conflicts]; + false -> [deleted] + end, + Doc = couch_index_util:load_doc(Db, Value, Opts), + case Doc of + null -> + [{doc, null}]; + _ -> + [{doc, couch_doc:to_json_obj(Doc, DocOpts)}] + end; + false -> + [] + end; +maybe_get_changes_doc(_Value, _Acc) -> + []. + deleted_item(true) -> [{<<"deleted">>, true}]; deleted_item(_) -> []. From da2836b6afa4fcb96a18bff5f32f5ac3d6376de8 Mon Sep 17 00:00:00 2001 From: Benjamin Bastian Date: Thu, 28 Aug 2014 00:00:50 +0700 Subject: [PATCH 09/11] Refactor code for sanity and so the view changes handler can reuse code better --- src/couch_httpd_changes.erl | 259 ------------------------------------ src/couch_httpd_db.erl | 54 ++++---- 2 files changed, 29 insertions(+), 284 deletions(-) delete mode 100644 src/couch_httpd_changes.erl diff --git a/src/couch_httpd_changes.erl b/src/couch_httpd_changes.erl deleted file mode 100644 index 4963a5f0..00000000 --- a/src/couch_httpd_changes.erl +++ /dev/null @@ -1,259 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_httpd_changes). - --export([handle_db_changes_req/2, - handle_changes_req/4, - parse_changes_query/2]). - --include_lib("couch/include/couch_db.hrl"). - -handle_db_changes_req(Req, Db) -> - ChangesArgs = parse_changes_query(Req, Db), - ChangesFun = couch_changes:handle_db_changes(ChangesArgs, Req, Db), - handle_changes_req(Req, Db, ChangesArgs, ChangesFun). - -handle_changes_req(#httpd{method='POST'}=Req, Db, ChangesArgs, ChangesFun) -> - couch_httpd:validate_ctype(Req, "application/json"), - handle_changes_req1(Req, Db, ChangesArgs, ChangesFun); -handle_changes_req(#httpd{method='GET'}=Req, Db, ChangesArgs, ChangesFun) -> - handle_changes_req1(Req, Db, ChangesArgs, ChangesFun); -handle_changes_req(#httpd{}=Req, _Db, _ChangesArgs, _ChangesFun) -> - couch_httpd:send_method_not_allowed(Req, "GET,HEAD,POST"). - -handle_changes_req1(Req, #db{name=DbName}=Db, ChangesArgs, ChangesFun) -> - AuthDbName = ?l2b(config:get("couch_httpd_auth", "authentication_db")), - case AuthDbName of - DbName -> - % in the authentication database, _changes is admin-only. - ok = couch_db:check_is_admin(Db); - _Else -> - % on other databases, _changes is free for all. - ok - end, - - MakeCallback = fun(Resp) -> - fun({change, {ChangeProp}=Change, _}, "eventsource") -> - Seq = proplists:get_value(<<"seq">>, ChangeProp), - couch_httpd:send_chunk(Resp, ["data: ", ?JSON_ENCODE(Change), - "\n", "id: ", ?JSON_ENCODE(Seq), - "\n\n"]); - ({change, Change, _}, "continuous") -> - couch_httpd:send_chunk(Resp, [?JSON_ENCODE(Change) | "\n"]); - ({change, Change, Prepend}, _) -> - couch_httpd:send_chunk(Resp, [Prepend, ?JSON_ENCODE(Change)]); - (start, "eventsource") -> - ok; - (start, "continuous") -> - ok; - (start, _) -> - couch_httpd:send_chunk(Resp, "{\"results\":[\n"); - ({stop, _EndSeq}, "eventsource") -> - couch_httpd:end_json_response(Resp); - ({stop, EndSeq}, "continuous") -> - couch_httpd:send_chunk( - Resp, - [?JSON_ENCODE({[{<<"last_seq">>, EndSeq}]}) | "\n"] - ), - couch_httpd:end_json_response(Resp); - ({stop, EndSeq}, _) -> - couch_httpd:send_chunk( - Resp, - io_lib:format("\n],\n\"last_seq\":~w}\n", [EndSeq]) - ), - couch_httpd:end_json_response(Resp); - (timeout, _) -> - couch_httpd:send_chunk(Resp, "\n") - end - end, - WrapperFun = case ChangesArgs#changes_args.feed of - "normal" -> - {ok, Info} = couch_db:get_db_info(Db), - CurrentEtag = couch_httpd:make_etag(Info), - fun(FeedChangesFun) -> - couch_httpd:etag_respond( - Req, - CurrentEtag, - fun() -> - {ok, Resp} = couch_httpd:start_json_response( - Req, 200, [{"ETag", CurrentEtag}] - ), - FeedChangesFun(MakeCallback(Resp)) - end - ) - end; - "eventsource" -> - Headers = [ - {"Content-Type", "text/event-stream"}, - {"Cache-Control", "no-cache"} - ], - {ok, Resp} = couch_httpd:start_chunked_response(Req, 200, Headers), - fun(FeedChangesFun) -> - FeedChangesFun(MakeCallback(Resp)) - end; - _ -> - % "longpoll" or "continuous" - {ok, Resp} = couch_httpd:start_json_response(Req, 200), - fun(FeedChangesFun) -> - FeedChangesFun(MakeCallback(Resp)) - end - end, - couch_stats_collector:increment( - {httpd, clients_requesting_changes} - ), - try - WrapperFun(ChangesFun) - after - couch_stats_collector:decrement( - {httpd, clients_requesting_changes} - ) - end. - - -parse_changes_query(Req, Db) -> - ChangesArgs = lists:foldl(fun({Key, Value}, Args) -> - case {string:to_lower(Key), Value} of - {"feed", _} -> - Args#changes_args{feed=Value}; - {"descending", "true"} -> - Args#changes_args{dir=rev}; - {"since", "now"} -> - UpdateSeq = couch_util:with_db(Db#db.name, fun(WDb) -> - couch_db:get_update_seq(WDb) - end), - Args#changes_args{since=UpdateSeq}; - {"since", _} -> - Args#changes_args{since=list_to_integer(Value)}; - {"last-event-id", _} -> - Args#changes_args{since=list_to_integer(Value)}; - {"limit", _} -> - Args#changes_args{limit=list_to_integer(Value)}; - {"style", _} -> - Args#changes_args{style=list_to_existing_atom(Value)}; - {"heartbeat", "true"} -> - Args#changes_args{heartbeat=true}; - {"heartbeat", _} -> - Args#changes_args{heartbeat=list_to_integer(Value)}; - {"timeout", _} -> - Args#changes_args{timeout=list_to_integer(Value)}; - {"include_docs", "true"} -> - Args#changes_args{include_docs=true}; - {"attachments", "true"} -> - Opts = Args#changes_args.doc_options, - Args#changes_args{doc_options=[attachments|Opts]}; - {"att_encoding_info", "true"} -> - Opts = Args#changes_args.doc_options, - Args#changes_args{doc_options=[att_encoding_info|Opts]}; - {"conflicts", "true"} -> - Args#changes_args{conflicts=true}; - {"filter", _} -> - Args#changes_args{filter=Value}; - _Else -> % unknown key value pair, ignore. - Args - end - end, #changes_args{}, couch_httpd:qs(Req)), - %% if it's an EventSource request with a Last-event-ID header - %% that should override the `since` query string, since it's - %% probably the browser reconnecting. - case ChangesArgs#changes_args.feed of - "eventsource" -> - case couch_httpd:header_value(Req, "last-event-id") of - undefined -> - ChangesArgs; - Value -> - ChangesArgs#changes_args{since=list_to_integer(Value)} - end; - _ -> - ChangesArgs - end. - -parse_view_param({json_req, {Props}}) -> - {Query} = couch_util:get_value(<<"query">>, Props), - parse_view_param1(couch_util:get_value(<<"view">>, Query, <<"">>)); -parse_view_param(Req) -> - parse_view_param1(list_to_binary(couch_httpd:qs_value(Req, "view", ""))). - -parse_view_param1(ViewParam) -> - case re:split(ViewParam, <<"/">>) of - [DName, ViewName] -> - {<< "_design/", DName/binary >>, ViewName}; - _ -> - throw({bad_request, "Invalid `view` parameter."}) - end. - -parse_view_options([], Acc) -> - Acc; -parse_view_options([{K, V} | Rest], Acc) -> - Acc1 = case couch_util:to_binary(K) of - <<"reduce">> -> - [{reduce, couch_mrview_http:parse_boolean(V)}]; - <<"key">> -> - V1 = parse_json(V), - [{start_key, V1}, {end_key, V1} | Acc]; - <<"keys">> -> - [{keys, parse_json(V)} | Acc]; - <<"startkey">> -> - [{start_key, parse_json(V)} | Acc]; - <<"start_key">> -> - [{start_key, parse_json(V)} | Acc]; - <<"startkey_docid">> -> - [{start_key_docid, couch_util:to_binary(V)} | Acc]; - <<"start_key_docid">> -> - [{start_key_docid, couch_util:to_binary(V)} | Acc]; - <<"endkey">> -> - [{end_key, parse_json(V)} | Acc]; - <<"end_key">> -> - [{end_key, parse_json(V)} | Acc]; - <<"endkey_docid">> -> - [{start_key_docid, couch_util:to_binary(V)} | Acc]; - <<"end_key_docid">> -> - [{start_key_docid, couch_util:to_binary(V)} | Acc]; - <<"limit">> -> - [{limit, couch_mrview_http:parse_pos_int(V)} | Acc]; - <<"count">> -> - throw({query_parse_error, <<"QS param `count` is not `limit`">>}); - <<"stale">> when V =:= <<"ok">> orelse V =:= "ok" -> - [{stale, ok} | Acc]; - <<"stale">> when V =:= <<"update_after">> orelse V =:= "update_after" -> - [{stale, update_after} | Acc]; - <<"stale">> -> - throw({query_parse_error, <<"Invalid value for `stale`.">>}); - <<"descending">> -> - case couch_mrview_http:parse_boolean(V) of - true -> - [{direction, rev} | Acc]; - _ -> - [{direction, fwd} | Acc] - end; - <<"skip">> -> - [{skip, couch_mrview_http:parse_pos_int(V)} | Acc]; - <<"group">> -> - case couch_mrview_http:parse_booolean(V) of - true -> - [{group_level, exact} | Acc]; - _ -> - [{group_level, 0} | Acc] - end; - <<"group_level">> -> - [{group_level, couch_mrview_http:parse_pos_int(V)} | Acc]; - <<"inclusive_end">> -> - [{inclusive_end, couch_mrview_http:parse_boolean(V)}]; - _ -> - Acc - end, - parse_view_options(Rest, Acc1). - -parse_json(V) when is_list(V) -> - ?JSON_DECODE(V); -parse_json(V) -> - V. diff --git a/src/couch_httpd_db.erl b/src/couch_httpd_db.erl index 442e8725..34f04f21 100644 --- a/src/couch_httpd_db.erl +++ b/src/couch_httpd_db.erl @@ -14,9 +14,10 @@ -include_lib("couch/include/couch_db.hrl"). -export([handle_request/1, handle_compact_req/2, handle_design_req/2, - db_req/2, couch_doc_open/4,handle_changes_req/2, + db_req/2, couch_doc_open/4, handle_db_changes_req/2, update_doc_result_to_json/1, update_doc_result_to_json/2, - handle_design_info_req/3, parse_copy_destination_header/1]). + handle_design_info_req/3, parse_copy_destination_header/1, + parse_changes_query/2, handle_changes_req/4]). -import(couch_httpd, [send_json/2,send_json/3,send_json/4,send_method_not_allowed/2, @@ -54,15 +55,22 @@ handle_request(#httpd{path_parts=[DbName|RestParts],method=Method, do_db_req(Req, Handler) end. -handle_changes_req(#httpd{method='POST'}=Req, Db) -> + +handle_db_changes_req(Req, Db) -> + ChangesArgs = parse_changes_query(Req, Db), + ChangesFun = couch_changes:handle_db_changes(ChangesArgs, Req, Db), + handle_changes_req(Req, Db, ChangesArgs, ChangesFun). + + +handle_changes_req(#httpd{method='POST'}=Req, Db, ChangesArgs, ChangesFun) -> couch_httpd:validate_ctype(Req, "application/json"), - handle_changes_req1(Req, Db); -handle_changes_req(#httpd{method='GET'}=Req, Db) -> - handle_changes_req1(Req, Db); -handle_changes_req(#httpd{path_parts=[_,<<"_changes">>]}=Req, _Db) -> - send_method_not_allowed(Req, "GET,HEAD,POST"). + handle_changes_req1(Req, Db, ChangesArgs, ChangesFun); +handle_changes_req(#httpd{method='GET'}=Req, Db, ChangesArgs, ChangesFun) -> + handle_changes_req1(Req, Db, ChangesArgs, ChangesFun); +handle_changes_req(#httpd{}=Req, _Db, _ChangesArgs, _ChangesFun) -> + couch_httpd:send_method_not_allowed(Req, "GET,HEAD,POST"). -handle_changes_req1(Req, #db{name=DbName}=Db) -> +handle_changes_req1(Req, #db{name=DbName}=Db, ChangesArgs, ChangesFun) -> AuthDbName = ?l2b(config:get("couch_httpd_auth", "authentication_db")), case AuthDbName of DbName -> @@ -72,47 +80,41 @@ handle_changes_req1(Req, #db{name=DbName}=Db) -> % on other databases, _changes is free for all. ok end, - handle_changes_req2(Req, Db). -handle_changes_req2(Req, Db) -> MakeCallback = fun(Resp) -> fun({change, {ChangeProp}=Change, _}, "eventsource") -> Seq = proplists:get_value(<<"seq">>, ChangeProp), - send_chunk(Resp, ["data: ", ?JSON_ENCODE(Change), + couch_httpd:send_chunk(Resp, ["data: ", ?JSON_ENCODE(Change), "\n", "id: ", ?JSON_ENCODE(Seq), "\n\n"]); ({change, Change, _}, "continuous") -> - send_chunk(Resp, [?JSON_ENCODE(Change) | "\n"]); + couch_httpd:send_chunk(Resp, [?JSON_ENCODE(Change) | "\n"]); ({change, Change, Prepend}, _) -> - send_chunk(Resp, [Prepend, ?JSON_ENCODE(Change)]); + couch_httpd:send_chunk(Resp, [Prepend, ?JSON_ENCODE(Change)]); (start, "eventsource") -> ok; (start, "continuous") -> ok; (start, _) -> - send_chunk(Resp, "{\"results\":[\n"); + couch_httpd:send_chunk(Resp, "{\"results\":[\n"); ({stop, _EndSeq}, "eventsource") -> - end_json_response(Resp); + couch_httpd:end_json_response(Resp); ({stop, EndSeq}, "continuous") -> - send_chunk( + couch_httpd:send_chunk( Resp, [?JSON_ENCODE({[{<<"last_seq">>, EndSeq}]}) | "\n"] ), - end_json_response(Resp); + couch_httpd:end_json_response(Resp); ({stop, EndSeq}, _) -> - send_chunk( + couch_httpd:send_chunk( Resp, io_lib:format("\n],\n\"last_seq\":~w}\n", [EndSeq]) ), - end_json_response(Resp); - (timeout, "eventsource") -> - send_chunk(Resp, "event: heartbeat\ndata: \n\n"); + couch_httpd:end_json_response(Resp); (timeout, _) -> - send_chunk(Resp, "\n") + couch_httpd:send_chunk(Resp, "\n") end end, - ChangesArgs = parse_changes_query(Req, Db), - ChangesFun = couch_changes:handle_db_changes(ChangesArgs, Req, Db), WrapperFun = case ChangesArgs#changes_args.feed of "normal" -> {ok, Info} = couch_db:get_db_info(Db), @@ -154,6 +156,8 @@ handle_changes_req2(Req, Db) -> [couchdb, httpd, clients_requesting_changes]) end. + + handle_compact_req(#httpd{method='POST'}=Req, Db) -> case Req#httpd.path_parts of [_DbName, <<"_compact">>] -> From 6e7f19ceaf14f2824c4e3077dbbf1402c12239bc Mon Sep 17 00:00:00 2001 From: Benjamin Bastian Date: Fri, 29 Aug 2014 18:15:55 +0700 Subject: [PATCH 10/11] Add comment and do minor refactoring --- src/couch_changes.erl | 33 +++++++++++++++------------------ 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/src/couch_changes.erl b/src/couch_changes.erl index 29920daa..2ff78e76 100644 --- a/src/couch_changes.erl +++ b/src/couch_changes.erl @@ -65,23 +65,20 @@ handle_changes(Args1, Req, Db0, Type) -> } = Args1, Filter = configure_filter(FilterName, Style, Req, Db0), Args = Args1#changes_args{filter_fun = Filter}, - UseViewChanges = case {Type, Filter} of - {{view, _, _}, _} -> - true; - {_, {fast_view, _, _, _}} -> - true; + % The type of changes feed depends on the supplied filter. If the query is + % for an optimized view-filtered db changes, we need to use the view + % sequence tree. + {UseViewChanges, DDocName, ViewName} = case {Type, Filter} of + {{view, DDocName0, ViewName0}, _} -> + {true, DDocName0, ViewName0}; + {_, {fast_view, _, DDoc, ViewName0}} -> + {true, DDoc#doc.id, ViewName0}; _ -> - false + {false, undefined, undefined} end, - {StartListenerFun, DDocName, ViewName} = if UseViewChanges -> - {DDocName0, ViewName0} = case {Type, Filter} of - {{view, DDocName1, ViewName1}, _} -> - {DDocName1, ViewName1}; - {_, {fast_view, _, DDoc, ViewName1}} -> - {DDoc#doc.id, ViewName1} - end, + {StartListenerFun, View} = if UseViewChanges -> {ok, {_, View0, _}, _, _} = couch_mrview_util:get_view( - Db0#db.name, DDocName0, ViewName0, #mrargs{}), + Db0#db.name, DDocName, ViewName, #mrargs{}), case View0#mrview.seq_btree of #btree{} -> ok; @@ -90,17 +87,17 @@ handle_changes(Args1, Req, Db0, Type) -> end, SNFun = fun() -> couch_event:link_listener( - ?MODULE, handle_view_event, {self(), DDocName0}, [{dbname, Db0#db.name}] + ?MODULE, handle_view_event, {self(), DDocName}, [{dbname, Db0#db.name}] ) end, - {SNFun, DDocName0, ViewName0}; - true -> + {SNFun, View0}; + true -> SNFun = fun() -> couch_event:link_listener( ?MODULE, handle_db_event, self(), [{dbname, Db0#db.name}] ) end, - {SNFun, undefined, undefined} + {SNFun, undefined} end, Start = fun() -> {ok, Db} = couch_db:reopen(Db0), From 6125862cfed4e948f0f188faa46780dfb3bcbd0d Mon Sep 17 00:00:00 2001 From: Benjamin Bastian Date: Mon, 15 Sep 2014 19:10:44 -0700 Subject: [PATCH 11/11] Change return format of _view_changes This commit makes KV pairs for _view_changes requests be returned in the format: {..."add": [["key", "val"],["key2", "val2"]], "remove": ["oldkey"]} Note that the "add" field is a list of lists rather than an object because: 1) there can be multiple values for a given key and 2) keys can be non-strings (non-string keys are invalid JSON). Also note that if a view update causes adds or removes some (but not all) values for a given key, all values associated with that key will be returned in the _view_changes response. --- src/couch_changes.erl | 154 +++++++++++++++++++++++++++++++++++------- 1 file changed, 131 insertions(+), 23 deletions(-) diff --git a/src/couch_changes.erl b/src/couch_changes.erl index 2ff78e76..2b2647f8 100644 --- a/src/couch_changes.erl +++ b/src/couch_changes.erl @@ -49,7 +49,9 @@ doc_options, conflicts, timeout, - timeout_fun + timeout_fun, + aggregation_kvs, + aggregation_results }). handle_db_changes(Args, Req, Db) -> @@ -450,7 +452,9 @@ build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun, D timeout_fun = TimeoutFun, ddoc_name = DDocName, view_name = ViewName, - view = View + view = View, + aggregation_results=[], + aggregation_kvs=[] }. send_changes(Acc, Dir, FirstRound) -> @@ -460,16 +464,36 @@ send_changes(Acc, Dir, FirstRound) -> filter = Filter, view = View } = Acc, - EnumFun = fun changes_enumerator/2, + DbEnumFun = fun changes_enumerator/2, case can_optimize(FirstRound, Filter) of {true, Fun} -> - Fun(Db, StartSeq, Dir, EnumFun, Acc, Filter); + Fun(Db, StartSeq, Dir, DbEnumFun, Acc, Filter); _ -> - case View of - undefined -> - couch_db:changes_since(Db, StartSeq, EnumFun, [{dir, Dir}], Acc); - #mrview{} -> - couch_mrview:view_changes_since(View, StartSeq, EnumFun, [{dir, Dir}], Acc) + case {View, Filter} of + {#mrview{}, {fast_view, _, _, _}} -> + couch_mrview:view_changes_since(View, StartSeq, DbEnumFun, [{dir, Dir}], Acc); + {undefined, _} -> + couch_db:changes_since(Db, StartSeq, DbEnumFun, [{dir, Dir}], Acc); + {#mrview{}, _} -> + ViewEnumFun = fun view_changes_enumerator/2, + {Go, Acc0} = couch_mrview:view_changes_since(View, StartSeq, ViewEnumFun, [{dir, Dir}], Acc), + case Acc0 of + #changes_acc{aggregation_results=[]} -> + {Go, Acc0}; + _ -> + #changes_acc{ + aggregation_results = AggResults, + aggregation_kvs = AggKVs, + user_acc = UserAcc, + callback = Callback, + resp_type = ResponseType, + prepend = Prepend + } = Acc0, + ChangesRow = view_changes_row(AggResults, AggKVs, Acc0), + UserAcc0 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc), + reset_heartbeat(), + {Go, Acc0#changes_acc{user_acc=UserAcc0}} + end end end. @@ -589,18 +613,84 @@ maybe_refresh_view(Db, DDocName, ViewName) -> end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) -> Callback({stop, EndSeq}, ResponseType, UserAcc). +view_changes_enumerator(Value, Acc) -> + #changes_acc{ + filter = Filter, callback = Callback, prepend = Prepend, + user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db, + timeout = Timeout, timeout_fun = TimeoutFun, seq = CurrentSeq, + aggregation_kvs=AggKVs, aggregation_results=AggResults + } = Acc, + + Results0 = view_filter(Db, Value, Filter), + Results = [Result || Result <- Results0, Result /= null], + {{Seq, _}, _} = Value, + + Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end, + + if CurrentSeq =:= Seq -> + NewAggKVs = case Results of + [] -> AggKVs; + _ -> [Value|AggKVs] + end, + {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc), + Acc0 = Acc#changes_acc{ + seq = Seq, + user_acc = UserAcc2, + aggregation_kvs=NewAggKVs + }, + case Done of + stop -> {stop, Acc0}; + ok -> {Go, Acc0} + end; + AggResults =/= [] -> + {NewAggKVs, NewAggResults} = case Results of + [] -> {[], []}; + _ -> {[Value], Results} + end, + if ResponseType =:= "continuous" orelse ResponseType =:= "eventsource" -> + ChangesRow = view_changes_row(AggResults, AggKVs, Acc), + UserAcc2 = Callback({change, ChangesRow, <<>>}, ResponseType, UserAcc), + reset_heartbeat(), + {Go, Acc#changes_acc{ + seq = Seq, user_acc = UserAcc2, limit = Limit - 1, + aggregation_kvs=NewAggKVs, aggregation_results=NewAggResults}}; + true -> + ChangesRow = view_changes_row(AggResults, AggKVs, Acc), + UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc), + reset_heartbeat(), + {Go, Acc#changes_acc{ + seq = Seq, prepend = <<",\n">>, user_acc = UserAcc2, + limit = Limit - 1, aggregation_kvs=[Value], + aggregation_results=Results}} + end; + true -> + {NewAggKVs, NewAggResults} = case Results of + [] -> {[], []}; + _ -> {[Value], Results} + end, + {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc), + Acc0 = Acc#changes_acc{ + seq = Seq, + user_acc = UserAcc2, + aggregation_kvs=NewAggKVs, + aggregation_results=NewAggResults + }, + case Done of + stop -> {stop, Acc0}; + ok -> {Go, Acc0} + end + end. + changes_enumerator(Value0, Acc) -> #changes_acc{ filter = Filter, callback = Callback, prepend = Prepend, user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db, - timeout = Timeout, timeout_fun = TimeoutFun, view = View + timeout = Timeout, timeout_fun = TimeoutFun } = Acc, - {Value, Results0} = case {View, Filter} of - {_, {fast_view, _, _, _}} -> + {Value, Results0} = case Filter of + {fast_view, _, _, _} -> fast_view_filter(Db, Value0, Filter); - {#mrview{}, _} -> - {Value0, view_filter(Db, Value0, Filter)}; - {_, _} -> + _ -> {Value0, filter(Db, Value0, Filter)} end, Results = [Result || Result <- Results0, Result /= null], @@ -638,15 +728,33 @@ changes_enumerator(Value0, Acc) -> -changes_row(Results, DocInfo, #changes_acc{filter={fast_view,_,_,_}}=Acc) -> - format_doc_info_change(Results, DocInfo, Acc); -changes_row(Results, KV, #changes_acc{view=#mrview{}}=Acc) -> - {{Seq, Key}, {Id, Value, Rev}} = KV, - {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"key">>, Key}, {<<"value">>, Value}, {<<"changes">>, Results}] ++ maybe_get_changes_doc({Id, Rev}, Acc)}; -changes_row(Results, #doc_info{}=DocInfo, Acc) -> - format_doc_info_change(Results, DocInfo, Acc). +view_changes_row(Results, KVs, Acc) -> + {Add, Remove} = lists:foldl(fun(Row, {AddAcc, RemAcc}) -> + {{_Seq, Key}, {_Id, Value, _Rev}} = Row, + case Value of + removed -> + {AddAcc, [Key|RemAcc]}; + {dups, DupValues} -> + AddAcc1 = lists:foldl(fun(DupValue, AddAcc0) -> + [[Key, DupValue]|AddAcc0] + end, AddAcc, DupValues), + {AddAcc1, RemAcc}; + _ -> + {[[Key, Value]|AddAcc], RemAcc} + end + end, {[], []}, KVs), + + % Seq, Id, and Rev should be the same for all KVs, since we're aggregating + % by seq. + [{{Seq, _Key}, {Id, _Value, Rev}}|_] = KVs, + + {[ + {<<"seq">>, Seq}, {<<"id">>, Id}, {<<"add">>, Add}, + {<<"remove">>, Remove}, {<<"changes">>, Results} + ] ++ maybe_get_changes_doc({Id, Rev}, Acc)}. + -format_doc_info_change(Results, #doc_info{}=DocInfo, Acc) -> +changes_row(Results, DocInfo, Acc) -> #doc_info{ id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _] } = DocInfo,