From e63f6864891ac9f00373ec0d3d5fee0085d2e2fa Mon Sep 17 00:00:00 2001 From: Nick Vatamaniuc Date: Thu, 14 Apr 2016 17:29:00 -0400 Subject: [PATCH] Implement Mango selectors for change feeds API is modeled after _doc_ids filter for change feeds. User POSTs to {db}/_changes with `filter=_selector`. Document body should have a "selector" field, with a Mango selector object as value. For example: ``` http http://.../d1/_changes?filter=_selector { "selector": {"z" : {"$gte" : 1} } } ``` Jira: COUCHDB-2988 --- src/couch_changes.erl | 28 +++++++ test/couch_changes_tests.erl | 155 ++++++++++++++++++++++++++++++++++- 2 files changed, 182 insertions(+), 1 deletion(-) diff --git a/src/couch_changes.erl b/src/couch_changes.erl index 248ba3de..75ec1cd8 100644 --- a/src/couch_changes.erl +++ b/src/couch_changes.erl @@ -196,6 +196,8 @@ get_callback_acc(Callback) when is_function(Callback, 2) -> configure_filter("_doc_ids", Style, Req, _Db) -> {doc_ids, Style, get_doc_ids(Req)}; +configure_filter("_selector", Style, Req, _Db) -> + {selector, Style, get_selector(Req)}; configure_filter("_design", Style, _Req, _Db) -> {design_docs, Style}; configure_filter("_view", Style, Req, Db) -> @@ -267,6 +269,11 @@ filter(_Db, DocInfo, {doc_ids, Style, DocIds}) -> false -> [] end; +filter(Db, DocInfo, {selector, Style, Selector}) -> + Docs = open_revs(Db, DocInfo, Style), + Passes = [mango_selector:match(Selector, couch_doc:to_json_obj(Doc, [])) + || Doc <- Docs], + filter_revs(Passes, Docs); filter(_Db, DocInfo, {design_docs, Style}) -> case DocInfo#doc_info.id of <<"_design", _/binary>> -> @@ -336,6 +343,15 @@ get_doc_ids(_) -> throw({bad_request, no_doc_ids_provided}). +get_selector({json_req, {Props}}) -> + check_selector(couch_util:get_value(<<"selector">>, Props)); +get_selector(#httpd{method='POST'}=Req) -> + couch_httpd:validate_ctype(Req, "application/json"), + get_selector({json_req, couch_httpd:json_body_obj(Req)}); +get_selector(_) -> + throw({bad_request, "Selector must be specified in POST payload"}). + + check_docids(DocIds) when is_list(DocIds) -> lists:foreach(fun (DocId) when not is_binary(DocId) -> @@ -349,6 +365,18 @@ check_docids(_) -> throw({bad_request, Msg}). +check_selector(Selector={_}) -> + try + mango_selector:normalize(Selector) + catch + {mango_error, Mod, Reason0} -> + {_StatusCode, _Error, Reason} = mango_error:info(Mod, Reason0), + throw({bad_request, Reason}) + end; +check_selector(_Selector) -> + throw({bad_request, "Selector error: expected a JSON object"}). + + open_ddoc(#db{name=DbName, id_tree=undefined}, DDocId) -> case ddoc_cache:open_doc(mem3:dbname(DbName), DDocId) of {ok, _} = Resp -> Resp; diff --git a/test/couch_changes_tests.erl b/test/couch_changes_tests.erl index f3dcf6e7..52eff8a2 100644 --- a/test/couch_changes_tests.erl +++ b/test/couch_changes_tests.erl @@ -60,6 +60,7 @@ changes_test_() -> setup, fun test_util:start_couch/0, fun test_util:stop_couch/1, [ + filter_by_selector(), filter_by_doc_id(), filter_by_design(), continuous_feed() @@ -84,6 +85,24 @@ filter_by_doc_id() -> } }. +filter_by_selector() -> + { + "Filter _selector", + { + foreach, + fun setup/0, fun teardown/1, + [ + fun should_select_basic/1, + fun should_select_with_since/1, + fun should_select_when_no_result/1, + fun should_select_with_deleted_docs/1, + fun should_select_with_continuous/1, + fun should_stop_selector_when_db_deleted/1 + ] + } + }. + + filter_by_design() -> { "Filter _design", @@ -317,7 +336,7 @@ should_filter_continuous_feed_by_specific_doc_ids({DbName, Revs}) -> should_end_changes_when_db_deleted({DbName, _Revs}) -> ?_test(begin - {ok, Db} = couch_db:open_int(DbName, []), + {ok, _Db} = couch_db:open_int(DbName, []), ChangesArgs = #changes_args{ filter = "_doc_ids", feed = "continuous" @@ -333,6 +352,140 @@ should_end_changes_when_db_deleted({DbName, _Revs}) -> ok end). + +should_select_basic({DbName, _}) -> + ?_test( + begin + ChArgs = #changes_args{filter = "_selector"}, + Selector = {[{<<"_id">>, <<"doc3">>}]}, + Req = {json_req, {[{<<"selector">>, Selector}]}}, + Consumer = spawn_consumer(DbName, ChArgs, Req), + {Rows, LastSeq} = wait_finished(Consumer), + {ok, Db} = couch_db:open_int(DbName, []), + UpSeq = couch_db:get_update_seq(Db), + couch_db:close(Db), + stop_consumer(Consumer), + ?assertEqual(1, length(Rows)), + [#row{seq = Seq, id = Id}] = Rows, + ?assertEqual(<<"doc3">>, Id), + ?assertEqual(6, Seq), + ?assertEqual(UpSeq, LastSeq) + end). + +should_select_with_since({DbName, _}) -> + ?_test( + begin + ChArgs = #changes_args{filter = "_selector", since = 9}, + GteDoc2 = {[{<<"$gte">>, <<"doc1">>}]}, + Selector = {[{<<"_id">>, GteDoc2}]}, + Req = {json_req, {[{<<"selector">>, Selector}]}}, + Consumer = spawn_consumer(DbName, ChArgs, Req), + {Rows, LastSeq} = wait_finished(Consumer), + {ok, Db} = couch_db:open_int(DbName, []), + UpSeq = couch_db:get_update_seq(Db), + couch_db:close(Db), + stop_consumer(Consumer), + ?assertEqual(1, length(Rows)), + [#row{seq = Seq, id = Id}] = Rows, + ?assertEqual(<<"doc8">>, Id), + ?assertEqual(10, Seq), + ?assertEqual(UpSeq, LastSeq) + end). + +should_select_when_no_result({DbName, _}) -> + ?_test( + begin + ChArgs = #changes_args{filter = "_selector"}, + Selector = {[{<<"_id">>, <<"nopers">>}]}, + Req = {json_req, {[{<<"selector">>, Selector}]}}, + Consumer = spawn_consumer(DbName, ChArgs, Req), + {Rows, LastSeq} = wait_finished(Consumer), + {ok, Db} = couch_db:open_int(DbName, []), + UpSeq = couch_db:get_update_seq(Db), + couch_db:close(Db), + stop_consumer(Consumer), + ?assertEqual(0, length(Rows)), + ?assertEqual(UpSeq, LastSeq) + end). + +should_select_with_deleted_docs({DbName, Revs}) -> + ?_test( + begin + Rev3_2 = element(6, Revs), + {ok, Db} = couch_db:open_int(DbName, []), + {ok, _} = save_doc( + Db, + {[{<<"_id">>, <<"doc3">>}, + {<<"_deleted">>, true}, + {<<"_rev">>, Rev3_2}]}), + ChArgs = #changes_args{filter = "_selector"}, + Selector = {[{<<"_id">>, <<"doc3">>}]}, + Req = {json_req, {[{<<"selector">>, Selector}]}}, + Consumer = spawn_consumer(DbName, ChArgs, Req), + {Rows, LastSeq} = wait_finished(Consumer), + couch_db:close(Db), + stop_consumer(Consumer), + ?assertMatch( + [#row{seq = LastSeq, id = <<"doc3">>, deleted = true}], + Rows + ), + ?assertEqual(11, LastSeq) + end). + +should_select_with_continuous({DbName, Revs}) -> + ?_test( + begin + {ok, Db} = couch_db:open_int(DbName, []), + ChArgs = #changes_args{filter = "_selector", feed = "continuous"}, + GteDoc8 = {[{<<"$gte">>, <<"doc8">>}]}, + Selector = {[{<<"_id">>, GteDoc8}]}, + Req = {json_req, {[{<<"selector">>, Selector}]}}, + Consumer = spawn_consumer(DbName, ChArgs, Req), + ok = pause(Consumer), + Rows = get_rows(Consumer), + ?assertMatch( + [#row{seq = 10, id = <<"doc8">>, deleted = false}], + Rows + ), + clear_rows(Consumer), + {ok, _} = save_doc(Db, {[{<<"_id">>, <<"doc01">>}]}), + ok = unpause(Consumer), + timer:sleep(100), + ok = pause(Consumer), + ?assertEqual([], get_rows(Consumer)), + Rev4 = element(4, Revs), + Rev8 = element(10, Revs), + {ok, _} = save_doc(Db, {[{<<"_id">>, <<"doc8">>}, + {<<"_rev">>, Rev8}]}), + {ok, _} = save_doc(Db, {[{<<"_id">>, <<"doc4">>}, + {<<"_rev">>, Rev4}]}), + ok = unpause(Consumer), + timer:sleep(100), + ok = pause(Consumer), + NewRows = get_rows(Consumer), + ?assertMatch( + [#row{seq = _, id = <<"doc8">>, deleted = false}], + NewRows + ) + end). + +should_stop_selector_when_db_deleted({DbName, _Revs}) -> + ?_test( + begin + {ok, _Db} = couch_db:open_int(DbName, []), + ChArgs = #changes_args{filter = "_selector", feed = "continuous"}, + Selector = {[{<<"_id">>, <<"doc3">>}]}, + Req = {json_req, {[{<<"selector">>, Selector}]}}, + Consumer = spawn_consumer(DbName, ChArgs, Req), + ok = pause(Consumer), + ok = couch_server:delete(DbName, [?ADMIN_CTX]), + ok = unpause(Consumer), + {_Rows, _LastSeq} = wait_finished(Consumer), + stop_consumer(Consumer), + ok + end). + + should_emit_only_design_documents({DbName, Revs}) -> ?_test( begin