Skip to content
Browse files

Removal of CouchDB revision tree support for efficiency

XDCR is broken. Otherwise ready for testing.

Change-Id: I25a929e1fc66904a455c5d49c2c1cf70f6bd00d0
Reviewed-on: http://review.couchbase.org/11220
Reviewed-by: Damien Katz <damien@couchbase.com>
Tested-by: Damien Katz <damien@couchbase.com>
  • Loading branch information...
1 parent 5809b81 commit 5dd1d5440aa6b965bd06e9ae68231e87c5cefaee @Damienkatz Damienkatz committed
Showing with 768 additions and 7,016 deletions.
  1. +1 −0 Makefile.am
  2. +10 −2 share/www/script/couch.js
  3. +1 −1 src/couch_set_view/src/couch_set_view_compactor.erl
  4. +1 −1 src/couch_set_view/src/couch_set_view_group.erl
  5. +6 −12 src/couch_set_view/src/couch_set_view_http.erl
  6. +1 −1 src/couch_set_view/src/couch_set_view_updater.erl
  7. +7 −8 src/couch_set_view/test/02-old-index-cleanup.t
  8. +1 −1 src/couch_set_view/test/03-db-compaction-file-leaks.t
  9. +1 −1 src/couch_set_view/test/04-group-shutdown.t
  10. +4 −5 src/couch_set_view/test/couch_set_view_test_util.erl
  11. +0 −10 src/couchdb/Makefile.am
  12. +38 −447 src/couchdb/couch_api_wrap.erl
  13. +2 −2 src/couchdb/couch_auth_cache.erl
  14. +62 −87 src/couchdb/couch_changes.erl
  15. +2 −2 src/couchdb/couch_compaction_daemon.erl
  16. +127 −873 src/couchdb/couch_db.erl
  17. +20 −57 src/couchdb/couch_db.hrl
  18. +9 −35 src/couchdb/couch_db_frontend.erl
  19. +120 −565 src/couchdb/couch_db_updater.erl
  20. +66 −450 src/couchdb/couch_doc.erl
  21. +22 −46 src/couchdb/couch_file.erl
  22. +14 −5 src/couchdb/couch_httpd.erl
  23. +60 −597 src/couchdb/couch_httpd_db.erl
  24. +2 −2 src/couchdb/couch_httpd_show.erl
  25. +0 −2 src/couchdb/couch_httpd_vhost.erl
  26. +6 −12 src/couchdb/couch_httpd_view.erl
  27. +2 −3 src/couchdb/couch_index_merger.erl
  28. +0 −175 src/couchdb/couch_internal_load_gen.erl
  29. +5 −5 src/couchdb/couch_query_servers.erl
  30. +3 −3 src/couchdb/couch_replication_manager.erl
  31. +20 −18 src/couchdb/couch_replicator.erl
  32. +1 −1 src/couchdb/couch_replicator_utils.erl
  33. +18 −114 src/couchdb/couch_replicator_worker.erl
  34. +32 −0 src/couchdb/couch_util.erl
  35. +1 −1 src/couchdb/couch_view_compactor.erl
  36. +1 −1 src/couchdb/couch_view_group.erl
  37. +9 −18 src/couchdb/couch_view_merger.erl
  38. +1 −1 src/couchdb/couch_view_updater.erl
  39. +0 −197 test/etap/001-couchbase-clobber_head.t
  40. +1 −4 test/etap/001-load.t
  41. +6 −72 test/etap/030-doc-from-json.t
  42. +7 −124 test/etap/031-doc-to-json.t
  43. +0 −88 test/etap/050-stream.t
  44. +0 −176 test/etap/060-kt-merging.t
  45. +0 −65 test/etap/061-kt-missing-leaves.t
  46. +0 −69 test/etap/062-kt-remove-leaves.t
  47. +0 −98 test/etap/063-kt-get-leaves.t
  48. +0 −46 test/etap/064-kt-counting.t
  49. +0 −42 test/etap/065-kt-stemming.t
  50. +43 −44 test/etap/073-changes.t
  51. +0 −158 test/etap/074-doc-update-conflicts.t
  52. +0 −248 test/etap/130-attachments-md5.t
  53. +0 −725 test/etap/140-attachment-comp.t
  54. +3 −3 test/etap/150-invalid-view-seq.t
  55. +0 −282 test/etap/160-vhosts.t
  56. +9 −12 test/etap/200-view-group-no-db-leaks.t
  57. +2 −2 test/etap/201-view-group-shutdown.t
  58. +2 −2 test/etap/220-compaction-daemon.t
  59. +0 −486 test/etap/240-replication-compact.t
  60. +0 −267 test/etap/241-replication-large-atts.t
  61. +0 −216 test/etap/242-replication-many-leaves.t
  62. +1 −16 test/etap/Makefile.am
  63. +13 −2 test/python/set_view/common/common.py
  64. +2 −2 test/python/set_view/erlang_views.py
  65. +0 −2 test/python/set_view/include_docs.py
  66. +0 −1 test/python/set_view/passive_partitions_update.py
  67. +1 −1 test/python/set_view/run.py
  68. +2 −2 test/python/set_view/updates.py
View
1 Makefile.am
@@ -82,6 +82,7 @@ THANKS.gz: $(top_srcdir)/THANKS
-gzip -9 < $< > $@
check: dev
+ rm -fr $(top_builddir)/tmp/lib/*
$(top_builddir)/test/etap/run test/etap
cover: dev
View
12 share/www/script/couch.js
@@ -72,8 +72,12 @@ function CouchDB(name, httpHeaders) {
// Deletes a document from the database
this.deleteDoc = function(doc) {
+ var rev = '';
+ if (doc._rev!==undefined) {
+ rev = "?rev=" + doc._rev;
+ }
this.last_req = this.request("DELETE", this.uri + encodeURIComponent(doc._id)
- + "?rev=" + doc._rev);
+ + rev);
CouchDB.maybeThrowError(this.last_req);
var result = JSON.parse(this.last_req.responseText);
doc._rev = result.rev; //record rev in input document
@@ -83,8 +87,12 @@ function CouchDB(name, httpHeaders) {
// Deletes an attachment from a document
this.deleteDocAttachment = function(doc, attachment_name) {
+ var rev = '';
+ if (doc._rev!==undefined) {
+ rev = "?rev=" + doc._rev;
+ }
this.last_req = this.request("DELETE", this.uri + encodeURIComponent(doc._id)
- + "/" + attachment_name + "?rev=" + doc._rev);
+ + "/" + attachment_name + rev);
CouchDB.maybeThrowError(this.last_req);
var result = JSON.parse(this.last_req.responseText);
doc._rev = result.rev; //record rev in input document
View
2 src/couch_set_view/src/couch_set_view_compactor.erl
@@ -55,7 +55,7 @@ compact_group(Group, EmptyGroup, SetName) ->
IdsCount = lists:foldl(
fun({PartId, _}, Acc) ->
{ok, Db} = couch_db:open_int(?dbname(SetName, PartId), []),
- {ok, DbReduce} = couch_btree:full_reduce(Db#db.fulldocinfo_by_id_btree),
+ {ok, DbReduce} = couch_btree:full_reduce(Db#db.docinfo_by_id_btree),
ok = couch_db:close(Db),
Acc + element(1, DbReduce)
end,
View
2 src/couch_set_view/src/couch_set_view_group.erl
@@ -812,7 +812,7 @@ sum_btree_sizes(Size1, Size2) ->
Size1 + Size2.
% maybe move to another module
-design_doc_to_set_view_group(SetName, #doc{id=Id,body={Fields}}) ->
+design_doc_to_set_view_group(SetName, #doc{id=Id,json={Fields}}) ->
Language = couch_util:get_value(<<"language">>, Fields, <<"javascript">>),
{DesignOptions} = couch_util:get_value(<<"options">>, Fields, {[]}),
{RawViews} = couch_util:get_value(<<"views">>, Fields, {[]}),
View
18 src/couch_set_view/src/couch_set_view_http.erl
@@ -18,7 +18,7 @@
-export([make_view_fold_fun/6, finish_view_fold/4, finish_view_fold/5, view_row_obj/2]).
-export([view_etag/2, view_etag/3, make_reduce_fold_funs/5]).
-export([design_doc_view/6, parse_bool_param/1]).
--export([make_key_options/1]).
+-export([make_key_options/1,get_row_doc/6]).
-import(couch_httpd,
[send_json/2,send_json/3,send_json/4,send_method_not_allowed/2,send_chunk/2,
@@ -504,24 +504,18 @@ get_row_doc(_Kv, _SetName, _PartId, false, _UserCtx, _DocOpenOptions) ->
nil;
get_row_doc({{_Key, DocId}, {Props}}, SetName, PartId, true, UserCtx, DocOpenOptions) ->
- Rev = case couch_util:get_value(<<"_rev">>, Props) of
- undefined ->
- nil;
- Rev0 ->
- couch_doc:parse_rev(Rev0)
- end,
Id = couch_util:get_value(<<"_id">>, Props, DocId),
- open_row_doc(SetName, PartId, Id, Rev, UserCtx, DocOpenOptions);
+ open_row_doc(SetName, PartId, Id, UserCtx, DocOpenOptions);
get_row_doc({{_Key, DocId}, _Value}, SetName, PartId, true, UserCtx, DocOpenOptions) ->
- open_row_doc(SetName, PartId, DocId, nil, UserCtx, DocOpenOptions).
+ open_row_doc(SetName, PartId, DocId, UserCtx, DocOpenOptions).
-open_row_doc(SetName, PartId, Id, Rev, UserCtx, DocOptions) ->
+open_row_doc(SetName, PartId, Id, UserCtx, DocOptions) ->
{ok, Db} = couch_db:open(
?dbname(SetName, PartId), [{user_ctx, UserCtx}]),
- JsonDoc = case (catch couch_db_frontend:couch_doc_open(Db, Id, Rev, DocOptions)) of
- #doc{} = Doc ->
+ JsonDoc = case (catch couch_db_frontend:open_doc(Db, Id, DocOptions)) of
+ {ok, #doc{} = Doc} ->
couch_doc:to_json_obj(Doc, []);
_ ->
null
View
2 src/couch_set_view/src/couch_set_view_updater.erl
@@ -212,7 +212,7 @@ purge_index(#set_view_group{fd=Fd, views=Views, id_btree=IdBtree}=Group, Db, Par
load_doc(Db, PartitionId, DocInfo, MapQueue, DocOpts, IncludeDesign) ->
- #doc_info{id=DocId, high_seq=Seq, revs=[#rev_info{deleted=Deleted}|_]} = DocInfo,
+ #doc_info{id=DocId, local_seq=Seq, deleted=Deleted} = DocInfo,
case {IncludeDesign, DocId} of
{false, <<?DESIGN_DOC_PREFIX, _/binary>>} -> % we skip design docs
ok;
View
15 src/couch_set_view/test/02-old-index-cleanup.t
@@ -40,7 +40,7 @@ test() ->
couch_set_view_test_util:delete_set_dbs(test_set_name(), num_set_partitions()),
couch_set_view_test_util:create_set_dbs(test_set_name(), num_set_partitions()),
- {ok, DDocRev} = populate_set(),
+ ok = populate_set(),
GroupPid = couch_set_view:get_group_pid(test_set_name(), ddoc_id()),
query_view(num_docs(), []),
@@ -50,7 +50,7 @@ test() ->
etap:is(all_index_files(), [IndexFile], "Index file found"),
- {ok, _} = update_ddoc(DDocRev),
+ ok = update_ddoc(),
ok = timer:sleep(1000),
NewGroupPid = couch_set_view:get_group_pid(test_set_name(), ddoc_id()),
etap:isnt(NewGroupPid, GroupPid, "Got new group after ddoc update"),
@@ -132,7 +132,7 @@ populate_set() ->
]}}
]}}
]},
- {ok, DDocRev} = couch_set_view_test_util:update_ddoc(test_set_name(), DDoc),
+ ok = couch_set_view_test_util:update_ddoc(test_set_name(), DDoc),
DocList = lists:map(
fun(I) ->
{[
@@ -151,13 +151,12 @@ populate_set() ->
num_set_partitions(),
lists:seq(0, num_set_partitions() - 1),
[]),
- {ok, DDocRev}.
+ ok.
-update_ddoc(DDocRev) ->
+update_ddoc() ->
NewDDoc = {[
{<<"_id">>, ddoc_id()},
- {<<"_rev">>, DDocRev},
{<<"language">>, <<"javascript">>},
{<<"views">>, {[
{<<"test">>, {[
@@ -165,14 +164,14 @@ update_ddoc(DDocRev) ->
]}}
]}}
]},
- {ok, NewRev} = couch_set_view_test_util:update_ddoc(test_set_name(), NewDDoc),
+ ok = couch_set_view_test_util:update_ddoc(test_set_name(), NewDDoc),
ok = couch_set_view_test_util:define_set_view(
test_set_name(),
ddoc_id(),
num_set_partitions(),
lists:seq(0, num_set_partitions() - 1),
[]),
- {ok, NewRev}.
+ ok.
get_group_sig() ->
View
2 src/couch_set_view/test/03-db-compaction-file-leaks.t
@@ -109,7 +109,7 @@ populate_set() ->
]}}
]}}
]},
- {ok, _} = couch_set_view_test_util:update_ddoc(test_set_name(), DDoc),
+ ok = couch_set_view_test_util:update_ddoc(test_set_name(), DDoc),
DocList = lists:map(
fun(I) ->
{[
View
2 src/couch_set_view/test/04-group-shutdown.t
@@ -130,7 +130,7 @@ populate_set() ->
]}}
]}}
]},
- {ok, _DDocRev} = couch_set_view_test_util:update_ddoc(test_set_name(), DDoc),
+ ok = couch_set_view_test_util:update_ddoc(test_set_name(), DDoc),
DocList = lists:map(
fun(I) ->
{[
View
9 src/couch_set_view/test/couch_set_view_test_util.erl
@@ -122,7 +122,7 @@ populate_set_alternated(SetName, Partitions, DocList) ->
fun(DocJson, I) ->
Db = lists:nth(I + 1, Dbs),
Doc = couch_doc:from_json_obj(DocJson),
- {ok, _} = couch_db:update_doc(Db, Doc, []),
+ ok = couch_db:update_doc(Db, Doc, []),
(I + 1) rem length(Dbs)
end,
0,
@@ -133,16 +133,15 @@ populate_set_alternated(SetName, Partitions, DocList) ->
update_ddoc(SetName, DDoc) ->
DbName = iolist_to_binary([SetName, "/master"]),
{ok, Db} = couch_db:open_int(DbName, [admin_user_ctx()]),
- {ok, NewRev} = couch_db:update_doc(Db, couch_doc:from_json_obj(DDoc), []),
- ok = couch_db:close(Db),
- {ok, couch_doc:rev_to_str(NewRev)}.
+ ok = couch_db:update_doc(Db, couch_doc:from_json_obj(DDoc), []),
+ ok = couch_db:close(Db).
delete_ddoc(SetName, DDocId) ->
DbName = iolist_to_binary([SetName, "/master"]),
{ok, Db} = couch_db:open_int(DbName, [admin_user_ctx()]),
{ok, DDoc} = couch_db:open_doc(Db, DDocId, []),
- {ok, _} = couch_db:update_doc(Db, DDoc#doc{deleted = true}, []),
+ ok = couch_db:update_doc(Db, DDoc#doc{deleted = true}, []),
ok = couch_db:close(Db).
View
10 src/couchdb/Makefile.am
@@ -58,15 +58,12 @@ source_files = \
couch_httpd_auth.erl \
couch_httpd_oauth.erl \
couch_httpd_external.erl \
- couch_httpd_show.erl \
couch_httpd_view.erl \
couch_httpd_misc_handlers.erl \
couch_httpd_proxy.erl \
couch_httpd_replicator.erl \
- couch_httpd_rewrite.erl \
couch_httpd_stats_handlers.erl \
couch_httpd_vhost.erl \
- couch_key_tree.erl \
couch_log.erl \
couch_access_log.erl \
couch_native_process.erl \
@@ -86,7 +83,6 @@ source_files = \
couch_server_sup.erl \
couch_stats_aggregator.erl \
couch_stats_collector.erl \
- couch_stream.erl \
couch_task_status.erl \
couch_util.erl \
couch_uuids.erl \
@@ -97,7 +93,6 @@ source_files = \
couch_db_updater.erl \
couch_work_queue.erl \
json_stream_parse.erl \
- couch_internal_load_gen.erl \
couch_httpd_view_merger.erl \
couch_index_merger.erl \
couch_view_merger.erl \
@@ -142,14 +137,11 @@ compiled_files = \
couch_httpd_oauth.beam \
couch_httpd_proxy.beam \
couch_httpd_external.beam \
- couch_httpd_show.beam \
couch_httpd_view.beam \
couch_httpd_misc_handlers.beam \
couch_httpd_replicator.beam \
- couch_httpd_rewrite.beam \
couch_httpd_stats_handlers.beam \
couch_httpd_vhost.beam \
- couch_key_tree.beam \
couch_log.beam \
couch_access_log.beam \
couch_native_process.beam \
@@ -169,7 +161,6 @@ compiled_files = \
couch_server_sup.beam \
couch_stats_aggregator.beam \
couch_stats_collector.beam \
- couch_stream.beam \
couch_task_status.beam \
couch_util.beam \
couch_uuids.beam \
@@ -180,7 +171,6 @@ compiled_files = \
couch_db_updater.beam \
couch_work_queue.beam \
json_stream_parse.beam \
- couch_internal_load_gen.beam \
couch_httpd_view_merger.beam \
couch_index_merger.beam \
couch_view_merger.beam \
View
485 src/couchdb/couch_api_wrap.erl
@@ -27,13 +27,13 @@
db_close/1,
get_db_info/1,
update_doc/3,
- update_doc/4,
update_docs/3,
update_docs/4,
ensure_full_commit/1,
get_missing_revs/2,
open_doc/3,
- open_doc_revs/6,
+ open_doc/5,
+ couch_doc_open/3,
changes_since/5,
db_uri/1
]).
@@ -133,66 +133,33 @@ ensure_full_commit(Db) ->
couch_db:ensure_full_commit(Db).
-get_missing_revs(#httpdb{} = Db, IdRevs) ->
- JsonBody = {[{Id, couch_doc:revs_to_strs(Revs)} || {Id, Revs} <- IdRevs]},
+get_missing_revs(#httpdb{} = Db, IdRevList) ->
+ JsonBody = {[{Id, couch_doc:rev_to_str(Rev)} || {Id, Rev} <- IdRevList]},
send_req(
Db,
[{method, post}, {path, "_revs_diff"}, {body, ?JSON_ENCODE(JsonBody)}],
fun(200, _, {Props}) ->
ConvertToNativeFun = fun({Id, {Result}}) ->
- MissingRevs = couch_doc:parse_revs(
+ MissingRev = couch_doc:parse_rev(
get_value(<<"missing">>, Result)
),
- PossibleAncestors = couch_doc:parse_revs(
- get_value(<<"possible_ancestors">>, Result, [])
- ),
- {Id, MissingRevs, PossibleAncestors}
+ {Id, MissingRev}
end,
{ok, lists:map(ConvertToNativeFun, Props)}
end);
-get_missing_revs(Db, IdRevs) ->
- couch_db:get_missing_revs(Db, IdRevs).
-
-
-
-open_doc_revs(#httpdb{} = HttpDb, Id, Revs, Options, Fun, Acc) ->
- Path = encode_doc_id(Id),
- QArgs = options_to_query_args(
- HttpDb, Path, [revs, {open_revs, Revs} | Options]),
- Self = self(),
- Streamer = spawn_link(fun() ->
- send_req(
- HttpDb,
- [{path, Path}, {qs, QArgs},
- {ibrowse_options, [{stream_to, {self(), once}}]},
- {headers, [{"Accept", "multipart/mixed"}]}],
- fun(200, Headers, StreamDataFun) ->
- remote_open_doc_revs_streamer_start(Self),
- {<<"--">>, _, _} = couch_httpd:parse_multipart_request(
- get_value("Content-Type", Headers),
- StreamDataFun,
- fun mp_parse_mixed/1)
- end),
- unlink(Self)
- end),
- receive
- {started_open_doc_revs, Ref} ->
- receive_docs_loop(Streamer, Fun, Id, Revs, Ref, Acc)
- end;
-open_doc_revs(Db, Id, Revs, Options, Fun, Acc) ->
- {ok, Results} = couch_db:open_doc_revs(Db, Id, Revs, Options),
- {ok, lists:foldl(fun(R, A) -> {_, A2} = Fun(R, A), A2 end, Acc, Results)}.
+get_missing_revs(Db, IdRevList) ->
+ couch_db:get_missing_revs(Db, IdRevList).
-open_doc(#httpdb{} = Db, Id, Options) ->
- send_req(
- Db,
- [{path, encode_doc_id(Id)}, {qs, options_to_query_args(Options, [])}],
- fun(200, _, Body) ->
- {ok, couch_doc:from_json_obj(Body)};
- (_, _, {Props}) ->
- {error, get_value(<<"error">>, Props)}
- end);
+open_doc(#httpdb{}, _Id, _Options, Fun, Acc) ->
+ {ok, Fun({error, <<"not_found">>}, Acc)};
+open_doc(Db, Id, Options, Fun, Acc) ->
+ {ok, Result} = couch_db:open_doc(Db, Id, Options),
+ {ok, Fun(Result, Acc)}.
+
+
+open_doc(#httpdb{}, _Id, _Options) ->
+ {error, <<"not_found">>};
open_doc(Db, Id, Options) ->
case couch_db:open_doc(Db, Id, Options) of
{ok, _} = Ok ->
@@ -202,107 +169,24 @@ open_doc(Db, Id, Options) ->
end.
-update_doc(Db, Doc, Options) ->
- update_doc(Db, Doc, Options, interactive_edit).
-
-update_doc(#httpdb{} = HttpDb, #doc{id = DocId} = Doc, Options, Type) ->
- QArgs = case Type of
- replicated_changes ->
- [{"new_edits", "false"}];
- _ ->
- []
- end ++ options_to_query_args(Options, []),
- Boundary = couch_uuids:random(),
- JsonBytes = ?JSON_ENCODE(
- couch_doc:to_json_obj(
- Doc, [revs, attachments, follows, att_encoding_info | Options])),
- {ContentType, Len} = couch_doc:len_doc_to_multi_part_stream(Boundary,
- JsonBytes, Doc#doc.atts, true),
- Headers = case lists:member(delay_commit, Options) of
- true ->
- [{"X-Couch-Full-Commit", "false"}];
- false ->
- []
- end ++ [{"Content-Type", ?b2l(ContentType)}, {"Content-Length", Len}],
- Body = {fun stream_doc/1, {JsonBytes, Doc#doc.atts, Boundary, Len}},
- send_req(
- HttpDb,
- [{method, put}, {path, encode_doc_id(DocId)},
- {qs, QArgs}, {headers, Headers}, {body, Body}],
- fun(Code, _, {Props}) when Code =:= 200 orelse Code =:= 201 ->
- {ok, couch_doc:parse_rev(get_value(<<"rev">>, Props))};
- (409, _, _) ->
- throw(conflict);
- (Code, _, {Props}) ->
- case {Code, get_value(<<"error">>, Props)} of
- {401, <<"unauthorized">>} ->
- throw({unauthorized, get_value(<<"reason">>, Props)});
- {403, <<"forbidden">>} ->
- throw({forbidden, get_value(<<"reason">>, Props)});
- {412, <<"missing_stub">>} ->
- throw({missing_stub, get_value(<<"reason">>, Props)});
- {_, Error} ->
- {error, Error}
- end
- end);
-update_doc(Db, Doc, Options, Type) ->
- couch_db:update_doc(Db, Doc, Options, Type).
+couch_doc_open(Db, DocId, Options) ->
+ case open_doc(Db, DocId, Options) of
+ {ok, Doc} ->
+ Doc;
+ Error ->
+ throw(Error)
+ end.
+update_doc(Db, Doc, Options) ->
+ couch_db:update_doc(Db, Doc, Options).
+
update_docs(Db, DocList, Options) ->
- update_docs(Db, DocList, Options, interactive_edit).
-
-update_docs(_Db, [], _Options, _UpdateType) ->
- {ok, []};
-update_docs(#httpdb{} = HttpDb, DocList, Options, UpdateType) ->
- FullCommit = atom_to_list(not lists:member(delay_commit, Options)),
- Prefix = case UpdateType of
- replicated_changes ->
- <<"{\"new_edits\":false,\"docs\":[">>;
- interactive_edit ->
- <<"{\"docs\":[">>
- end,
- Suffix = <<"]}">>,
- % Note: nginx and other servers don't like PUT/POST requests without
- % a Content-Length header, so we can't do a chunked transfer encoding
- % and JSON encode each doc only before sending it through the socket.
- {Docs, Len} = lists:mapfoldl(
- fun(#doc{} = Doc, Acc) ->
- Json = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [revs, attachments])),
- {Json, Acc + iolist_size(Json)};
- (Doc, Acc) ->
- {Doc, Acc + iolist_size(Doc)}
- end,
- byte_size(Prefix) + byte_size(Suffix) + length(DocList) - 1,
- DocList),
- BodyFun = fun(eof) ->
- eof;
- ([]) ->
- {ok, Suffix, eof};
- ([prefix | Rest]) ->
- {ok, Prefix, Rest};
- ([Doc]) ->
- {ok, Doc, []};
- ([Doc | RestDocs]) ->
- {ok, [Doc, ","], RestDocs}
- end,
- Headers = [
- {"Content-Length", Len},
- {"Content-Type", "application/json"},
- {"X-Couch-Full-Commit", FullCommit}
- ],
- send_req(
- HttpDb,
- [{method, post}, {path, "_bulk_docs"},
- {body, {BodyFun, [prefix | Docs]}}, {headers, Headers}],
- fun(201, _, Results) when is_list(Results) ->
- {ok, bulk_results_to_errors(DocList, Results, remote)};
- (417, _, Results) when is_list(Results) ->
- {ok, bulk_results_to_errors(DocList, Results, remote)}
- end);
-update_docs(Db, DocList, Options, UpdateType) ->
- Result = couch_db:update_docs(Db, DocList, Options, UpdateType),
- {ok, bulk_results_to_errors(DocList, Result, UpdateType)}.
+ ok = couch_db:update_docs(Db, DocList, Options).
+
+update_docs(Db, DocList, Options, replicated_changes) ->
+ ok = couch_db:update_docs(Db, DocList, Options),
+ {ok, []}.
changes_since(#httpdb{headers = Headers1} = HttpDb, Style, StartSeq,
@@ -433,221 +317,6 @@ changes_json_req(Db, FilterName, {QueryParams}, _Options) ->
]}.
-options_to_query_args(HttpDb, Path, Options) ->
- case lists:keytake(atts_since, 1, Options) of
- false ->
- options_to_query_args(Options, []);
- {value, {atts_since, []}, Options2} ->
- options_to_query_args(Options2, []);
- {value, {atts_since, PAs}, Options2} ->
- QueryArgs1 = options_to_query_args(Options2, []),
- FullUrl = couch_api_wrap_httpc:full_url(
- HttpDb, [{path, Path}, {qs, QueryArgs1}]),
- RevList = atts_since_arg(
- length("GET " ++ FullUrl ++ " HTTP/1.1\r\n") +
- length("&atts_since=") + 6, % +6 = % encoded [ and ]
- PAs, []),
- [{"atts_since", ?JSON_ENCODE(RevList)} | QueryArgs1]
- end.
-
-
-options_to_query_args([], Acc) ->
- lists:reverse(Acc);
-options_to_query_args([ejson_body | Rest], Acc) ->
- options_to_query_args(Rest, Acc);
-options_to_query_args([delay_commit | Rest], Acc) ->
- options_to_query_args(Rest, Acc);
-options_to_query_args([revs | Rest], Acc) ->
- options_to_query_args(Rest, [{"revs", "true"} | Acc]);
-options_to_query_args([{open_revs, all} | Rest], Acc) ->
- options_to_query_args(Rest, [{"open_revs", "all"} | Acc]);
-options_to_query_args([{open_revs, Revs} | Rest], Acc) ->
- JsonRevs = ?b2l(?JSON_ENCODE(couch_doc:revs_to_strs(Revs))),
- options_to_query_args(Rest, [{"open_revs", JsonRevs} | Acc]).
-
-
--define(MAX_URL_LEN, 7000).
-
-atts_since_arg(_UrlLen, [], Acc) ->
- lists:reverse(Acc);
-atts_since_arg(UrlLen, [PA | Rest], Acc) ->
- RevStr = couch_doc:rev_to_str(PA),
- NewUrlLen = case Rest of
- [] ->
- % plus 2 double quotes (% encoded)
- UrlLen + size(RevStr) + 6;
- _ ->
- % plus 2 double quotes and a comma (% encoded)
- UrlLen + size(RevStr) + 9
- end,
- case NewUrlLen >= ?MAX_URL_LEN of
- true ->
- lists:reverse(Acc);
- false ->
- atts_since_arg(NewUrlLen, Rest, [RevStr | Acc])
- end.
-
-
-% TODO: A less verbose, more elegant and automatic restart strategy for
-% the exported open_doc_revs/6 function. The restart should be
-% transparent to the caller like any other Couch API function exported
-% by this module.
-receive_docs_loop(Streamer, Fun, Id, Revs, Ref, Acc) ->
- try
- % Left only for debugging purposes via an interactive or remote shell
- erlang:put(open_doc_revs, {Id, Revs, Ref, Streamer}),
- receive_docs(Streamer, Fun, Ref, Acc)
- catch
- error:{restart_open_doc_revs, NewRef} ->
- receive_docs_loop(Streamer, Fun, Id, Revs, NewRef, Acc)
- end.
-
-receive_docs(Streamer, UserFun, Ref, UserAcc) ->
- Streamer ! {get_headers, Ref, self()},
- receive
- {started_open_doc_revs, NewRef} ->
- restart_remote_open_doc_revs(Ref, NewRef);
- {headers, Ref, Headers} ->
- case get_value("content-type", Headers) of
- {"multipart/related", _} = ContentType ->
- case doc_from_multi_part_stream(
- ContentType,
- fun() -> receive_doc_data(Streamer, Ref) end,
- Ref) of
- {ok, Doc, Parser} ->
- case UserFun({ok, Doc}, UserAcc) of
- {ok, UserAcc2} ->
- ok;
- {skip, UserAcc2} ->
- couch_doc:abort_multi_part_stream(Parser)
- end,
- receive_docs(Streamer, UserFun, Ref, UserAcc2)
- end;
- {"application/json", []} ->
- Doc = couch_doc:from_json_obj(
- ?JSON_DECODE(receive_all(Streamer, Ref, []))),
- {_, UserAcc2} = UserFun({ok, Doc}, UserAcc),
- receive_docs(Streamer, UserFun, Ref, UserAcc2);
- {"application/json", [{"error","true"}]} ->
- {ErrorProps} = ?JSON_DECODE(receive_all(Streamer, Ref, [])),
- Rev = get_value(<<"missing">>, ErrorProps),
- Result = {{not_found, missing}, couch_doc:parse_rev(Rev)},
- {_, UserAcc2} = UserFun(Result, UserAcc),
- receive_docs(Streamer, UserFun, Ref, UserAcc2)
- end;
- {done, Ref} ->
- {ok, UserAcc}
- end.
-
-
-restart_remote_open_doc_revs(Ref, NewRef) ->
- receive
- {body_bytes, Ref, _} ->
- restart_remote_open_doc_revs(Ref, NewRef);
- {body_done, Ref} ->
- restart_remote_open_doc_revs(Ref, NewRef);
- {done, Ref} ->
- restart_remote_open_doc_revs(Ref, NewRef);
- {headers, Ref, _} ->
- restart_remote_open_doc_revs(Ref, NewRef)
- after 0 ->
- erlang:error({restart_open_doc_revs, NewRef})
- end.
-
-
-remote_open_doc_revs_streamer_start(Parent) ->
- receive
- {get_headers, _Ref, Parent} ->
- remote_open_doc_revs_streamer_start(Parent);
- {next_bytes, _Ref, Parent} ->
- remote_open_doc_revs_streamer_start(Parent)
- after 0 ->
- Parent ! {started_open_doc_revs, make_ref()}
- end.
-
-
-receive_all(Streamer, Ref, Acc) ->
- Streamer ! {next_bytes, Ref, self()},
- receive
- {started_open_doc_revs, NewRef} ->
- restart_remote_open_doc_revs(Ref, NewRef);
- {body_bytes, Ref, Bytes} ->
- receive_all(Streamer, Ref, [Bytes | Acc]);
- {body_done, Ref} ->
- lists:reverse(Acc)
- end.
-
-
-mp_parse_mixed(eof) ->
- receive {get_headers, Ref, From} ->
- From ! {done, Ref}
- end;
-mp_parse_mixed({headers, H}) ->
- receive {get_headers, Ref, From} ->
- From ! {headers, Ref, H}
- end,
- fun mp_parse_mixed/1;
-mp_parse_mixed({body, Bytes}) ->
- receive {next_bytes, Ref, From} ->
- From ! {body_bytes, Ref, Bytes}
- end,
- fun mp_parse_mixed/1;
-mp_parse_mixed(body_end) ->
- receive {next_bytes, Ref, From} ->
- From ! {body_done, Ref};
- {get_headers, Ref, From} ->
- self() ! {get_headers, Ref, From}
- end,
- fun mp_parse_mixed/1.
-
-
-receive_doc_data(Streamer, Ref) ->
- Streamer ! {next_bytes, Ref, self()},
- receive
- {body_bytes, Ref, Bytes} ->
- {Bytes, fun() -> receive_doc_data(Streamer, Ref) end};
- {body_done, Ref} ->
- {<<>>, fun() -> receive_doc_data(Streamer, Ref) end}
- end.
-
-doc_from_multi_part_stream(ContentType, DataFun, Ref) ->
- Self = self(),
- Parser = spawn_link(fun() ->
- {<<"--">>, _, _} = couch_httpd:parse_multipart_request(
- ContentType, DataFun,
- fun(Next) -> couch_doc:mp_parse_doc(Next, []) end),
- unlink(Self)
- end),
- Parser ! {get_doc_bytes, Ref, self()},
- receive
- {started_open_doc_revs, NewRef} ->
- unlink(Parser),
- exit(Parser, kill),
- restart_remote_open_doc_revs(Ref, NewRef);
- {doc_bytes, Ref, DocBytes} ->
- Doc = couch_doc:from_json_obj(?JSON_DECODE(DocBytes)),
- ReadAttachmentDataFun = fun() ->
- Parser ! {get_bytes, Ref, self()},
- receive
- {started_open_doc_revs, NewRef} ->
- unlink(Parser),
- exit(Parser, kill),
- receive {bytes, Ref, _} -> ok after 0 -> ok end,
- restart_remote_open_doc_revs(Ref, NewRef);
- {bytes, Ref, Bytes} ->
- Bytes
- end
- end,
- Atts2 = lists:map(
- fun(#att{data = follows} = A) ->
- A#att{data = ReadAttachmentDataFun};
- (A) ->
- A
- end, Doc#doc.atts),
- {ok, Doc#doc{atts = Atts2}, Parser}
- end.
-
-
changes_ev1(object_start, UserFun, UserAcc) ->
fun(Ev) -> changes_ev2(Ev, UserFun, UserAcc) end.
@@ -686,91 +355,13 @@ parse_changes_line(object_start, UserFun) ->
end.
json_to_doc_info({Props}) ->
- RevsInfo = lists:map(
- fun({Change}) ->
- Rev = couch_doc:parse_rev(get_value(<<"rev">>, Change)),
- Del = (true =:= get_value(<<"deleted">>, Change)),
- #rev_info{rev=Rev, deleted=Del}
- end, get_value(<<"changes">>, Props)),
+ {Change} = get_value(<<"changes">>, Props),
+ Rev = couch_doc:parse_rev(get_value(<<"rev">>, Change)),
+ Del = (true =:= get_value(<<"deleted">>, Change)),
#doc_info{
id = get_value(<<"id">>, Props),
- high_seq = get_value(<<"seq">>, Props),
- revs = RevsInfo
+ local_seq = get_value(<<"seq">>, Props),
+ rev = Rev,
+ deleted = Del
}.
-
-bulk_results_to_errors(Docs, {ok, Results}, interactive_edit) ->
- lists:reverse(lists:foldl(
- fun({_, {ok, _}}, Acc) ->
- Acc;
- ({#doc{id = Id, revs = {Pos, [RevId | _]}}, Error}, Acc) ->
- {_, Error, Reason} = couch_httpd:error_info(Error),
- [ {[{id, Id}, {rev, rev_to_str({Pos, RevId})},
- {error, Error}, {reason, Reason}]} | Acc ]
- end,
- [], lists:zip(Docs, Results)));
-
-bulk_results_to_errors(Docs, {ok, Results}, replicated_changes) ->
- bulk_results_to_errors(Docs, {aborted, Results}, interactive_edit);
-
-bulk_results_to_errors(_Docs, {aborted, Results}, interactive_edit) ->
- lists:map(
- fun({{Id, Rev}, Err}) ->
- {_, Error, Reason} = couch_httpd:error_info(Err),
- {[{id, Id}, {rev, rev_to_str(Rev)}, {error, Error}, {reason, Reason}]}
- end,
- Results);
-
-bulk_results_to_errors(_Docs, Results, remote) ->
- lists:reverse(lists:foldl(
- fun({Props}, Acc) ->
- case get_value(<<"error">>, Props, get_value(error, Props)) of
- undefined ->
- Acc;
- Error ->
- Id = get_value(<<"id">>, Props, get_value(id, Props)),
- Rev = get_value(<<"rev">>, Props, get_value(rev, Props)),
- Reason = get_value(<<"reason">>, Props, get_value(reason, Props)),
- [ {[{id, Id}, {rev, rev_to_str(Rev)},
- {error, Error}, {reason, Reason}]} | Acc ]
- end
- end,
- [], Results)).
-
-
-rev_to_str({_Pos, _Id} = Rev) ->
- couch_doc:rev_to_str(Rev);
-rev_to_str(Rev) ->
- Rev.
-
-
-stream_doc({JsonBytes, Atts, Boundary, Len}) ->
- case erlang:erase({doc_streamer, Boundary}) of
- Pid when is_pid(Pid) ->
- unlink(Pid),
- exit(Pid, kill);
- _ ->
- ok
- end,
- Self = self(),
- DocStreamer = spawn_link(fun() ->
- couch_doc:doc_to_multi_part_stream(
- Boundary, JsonBytes, Atts,
- fun(Data) ->
- receive {get_data, Ref, From} ->
- From ! {data, Ref, Data}
- end
- end, true),
- unlink(Self)
- end),
- erlang:put({doc_streamer, Boundary}, DocStreamer),
- {ok, <<>>, {Len, Boundary}};
-stream_doc({0, Id}) ->
- erlang:erase({doc_streamer, Id}),
- eof;
-stream_doc({LenLeft, Id}) when LenLeft > 0 ->
- Ref = make_ref(),
- erlang:get({doc_streamer, Id}) ! {get_data, Ref, self()},
- receive {data, Ref, Data} ->
- {ok, Data, {LenLeft - iolist_size(Data), Id}}
- end.
View
4 src/couchdb/couch_auth_cache.erl
@@ -278,7 +278,7 @@ refresh_entries(AuthDb) ->
end.
-refresh_entry(Db, #doc_info{high_seq = DocSeq} = DocInfo) ->
+refresh_entry(Db, #doc_info{local_seq = DocSeq} = DocInfo) ->
case is_user_doc(DocInfo) of
{true, UserName} ->
case ets:lookup(?BY_USER, UserName) of
@@ -391,7 +391,7 @@ ensure_auth_ddoc_exists(Db, DDocId) ->
case couch_db:open_doc(Db, DDocId) of
{not_found, _Reason} ->
{ok, AuthDesign} = auth_design_doc(DDocId),
- {ok, _Rev} = couch_db:update_doc(Db, AuthDesign, []);
+ ok = couch_db:update_doc(Db, AuthDesign, []);
_ ->
ok
end,
View
149 src/couchdb/couch_changes.erl
@@ -110,32 +110,24 @@ os_filter_fun(FilterName, Style, Req, Db) ->
case [list_to_binary(couch_httpd:unquote(Part))
|| Part <- string:tokens(FilterName, "/")] of
[] ->
- fun(_Db2, #doc_info{revs=Revs}) ->
- builtin_results(Style, Revs)
+ fun(_Db2, #doc_info{rev=Rev}) ->
+ builtin_results(Style, Rev)
end;
[DName, FName] ->
DesignId = <<"_design/", DName/binary>>,
- DDoc = couch_db_frontend:couch_doc_open(Db, DesignId, nil, [ejson_body]),
+ DDoc = couch_db_frontend:couch_doc_open(Db, DesignId, [ejson_body]),
% validate that the ddoc has the filter fun
- #doc{body={Props}} = DDoc,
+ #doc{json={Props}} = DDoc,
couch_util:get_nested_json_value({Props}, [<<"filters">>, FName]),
fun(Db2, DocInfo) ->
- DocInfos =
- case Style of
- main_only ->
- [DocInfo];
- all_docs ->
- [DocInfo#doc_info{revs=[Rev]}|| Rev <- DocInfo#doc_info.revs]
- end,
- Docs = [Doc || {ok, Doc} <- [
- couch_db:open_doc(Db2, DocInfo2, [deleted, conflicts])
- || DocInfo2 <- DocInfos]],
+ {ok, Doc} =
+ couch_db:open_doc(Db2, DocInfo, [deleted, conflicts]),
{ok, Passes} = couch_query_servers:filter_docs(
- Req, Db2, DDoc, FName, Docs
+ Req, Db2, DDoc, FName, [Doc]
),
- [{[{<<"rev">>, couch_doc:rev_to_str({RevPos,RevId})}]}
- || {Pass, #doc{revs={RevPos,[RevId|_]}}}
- <- lists:zip(Passes, Docs), Pass == true]
+ [{<<"rev">>, couch_doc:rev_to_str({RevPos,RevId})}
+ || {Pass, #doc{rev={RevPos,RevId}}}
+ <- lists:zip(Passes, [Doc]), Pass == true]
end;
_Else ->
throw({bad_request,
@@ -161,66 +153,53 @@ builtin_filter_fun(_FilterName, _Style, _Req, _Db) ->
throw({bad_request, "unknown builtin filter name"}).
filter_docids(DocIds, Style) when is_list(DocIds)->
- fun(_Db, #doc_info{id=DocId, revs=Revs}) ->
- case lists:member(DocId, DocIds) of
- true ->
- builtin_results(Style, Revs);
- _ -> []
- end
+ fun(_Db, DocInfo) ->
+ #doc_info{id=DocId, rev=Rev} = DocInfo,
+ case lists:member(DocId, DocIds) of
+ true ->
+ builtin_results(Style, Rev);
+ _ -> null
+ end
end;
filter_docids(_, _) ->
throw({bad_request, "`doc_ids` filter parameter is not a list."}).
filter_designdoc(Style) ->
- fun(_Db, #doc_info{id=DocId, revs=Revs}) ->
+ fun(_Db, #doc_info{id=DocId, rev=Rev}) ->
case DocId of
<<"_design", _/binary>> ->
- builtin_results(Style, Revs);
- _ -> []
+ builtin_results(Style, Rev);
+ _ -> null
end
end.
filter_view("", _Style, _Db) ->
throw({bad_request, "`view` filter parameter is not provided."});
-filter_view(ViewName, Style, Db) ->
+filter_view(ViewName, _Style, Db) ->
case [list_to_binary(couch_httpd:unquote(Part))
|| Part <- string:tokens(ViewName, "/")] of
[] ->
throw({bad_request, "Invalid `view` parameter."});
[DName, VName] ->
DesignId = <<"_design/", DName/binary>>,
- DDoc = couch_db_frontend:couch_doc_open(Db, DesignId, nil, [ejson_body]),
+ {ok, DDoc} = couch_db_frontend:open_doc(Db, DesignId, [ejson_body]),
% validate that the ddoc has the filter fun
- #doc{body={Props}} = DDoc,
+ #doc{json={Props}} = DDoc,
couch_util:get_nested_json_value({Props}, [<<"views">>, VName]),
fun(Db2, DocInfo) ->
- DocInfos =
- case Style of
- main_only ->
- [DocInfo];
- all_docs ->
- [DocInfo#doc_info{revs=[Rev]}|| Rev <- DocInfo#doc_info.revs]
- end,
- Docs = [Doc || {ok, Doc} <- [
- couch_db:open_doc(Db2, DocInfo2, [deleted, conflicts])
- || DocInfo2 <- DocInfos]],
+ {ok, Doc} =
+ couch_db:open_doc(Db2, DocInfo, [deleted, conflicts]),
{ok, Passes} = couch_query_servers:filter_view(
- DDoc, VName, Docs
+ DDoc, VName, [Doc]
),
- [{[{<<"rev">>, couch_doc:rev_to_str({RevPos,RevId})}]}
- || {Pass, #doc{revs={RevPos,[RevId|_]}}}
- <- lists:zip(Passes, Docs), Pass == true]
+ [{<<"rev">>, couch_doc:rev_to_str({RevPos,RevId})}
+ || {Pass, #doc{rev={RevPos,RevId}}}
+ <- lists:zip(Passes, [Doc]), Pass == true]
end
end.
-builtin_results(Style, [#rev_info{rev=Rev}|_]=Revs) ->
- case Style of
- main_only ->
- [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}];
- all_docs ->
- [{[{<<"rev">>, couch_doc:rev_to_str(R)}]}
- || #rev_info{rev=R} <- Revs]
- end.
+builtin_results(_Style, Rev) ->
+ {[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}.
get_changes_timeout(Args, Callback) ->
#changes_args{
@@ -298,28 +277,28 @@ send_changes(Args, Callback, UserAcc, Db, StartSeq, Prepend, FirstRound) ->
send_changes_doc_ids(DocIds, Db, StartSeq, Dir, Fun, Acc0) ->
- Lookups = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, DocIds),
- FullDocInfos = lists:foldl(
- fun({ok, FDI}, Acc) ->
- [FDI | Acc];
+ Lookups = couch_btree:lookup(Db#db.docinfo_by_id_btree, DocIds),
+ DocInfos = lists:foldl(
+ fun({ok, DI}, Acc) ->
+ [DI | Acc];
(not_found, Acc) ->
Acc
end,
[], Lookups),
- send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0).
+ send_lookup_changes(DocInfos, StartSeq, Dir, Db, Fun, Acc0).
send_changes_design_docs(Db, StartSeq, Dir, Fun, Acc0) ->
- FoldFun = fun(FullDocInfo, _, Acc) ->
- {ok, [FullDocInfo | Acc]}
+ FoldFun = fun(DocInfo, _, Acc) ->
+ {ok, [DocInfo | Acc]}
end,
KeyOpts = [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}],
- {ok, _, FullDocInfos} = couch_btree:fold(
- Db#db.fulldocinfo_by_id_btree, FoldFun, [], KeyOpts),
- send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0).
+ {ok, _, DocInfos} = couch_btree:fold(
+ Db#db.docinfo_by_id_btree, FoldFun, [], KeyOpts),
+ send_lookup_changes(DocInfos, StartSeq, Dir, Db, Fun, Acc0).
-send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0) ->
+send_lookup_changes(DocInfos, StartSeq, Dir, Db, Fun, Acc0) ->
FoldFun = case Dir of
fwd ->
fun lists:foldl/3;
@@ -332,18 +311,17 @@ send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0) ->
rev ->
fun(A, B) -> A =< B end
end,
- DocInfos = lists:foldl(
- fun(FDI, Acc) ->
- DI = couch_doc:to_doc_info(FDI),
- case GreaterFun(DI#doc_info.high_seq, StartSeq) of
+ DocInfos2 = lists:foldl(
+ fun(DI, Acc) ->
+ case GreaterFun(DI#doc_info.local_seq, StartSeq) of
true ->
[DI | Acc];
false ->
Acc
end
end,
- [], FullDocInfos),
- SortedDocInfos = lists:keysort(#doc_info.high_seq, DocInfos),
+ [], DocInfos),
+ SortedDocInfos = lists:keysort(#doc_info.local_seq, DocInfos2),
FinalAcc = try
FoldFun(
fun(DocInfo, Acc) ->
@@ -374,7 +352,6 @@ keep_sending_changes(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout,
limit = Limit,
db_open_options = DbOptions
} = Args,
-
{ok, ChangesAcc} = send_changes(
Args#changes_args{dir=fwd},
Callback,
@@ -425,15 +402,14 @@ changes_enumerator(DocInfo, #changes_acc{resp_type = "continuous"} = Acc) ->
filter = FilterFun, callback = Callback,
user_acc = UserAcc, limit = Limit, db = Db
} = Acc,
- #doc_info{high_seq = Seq} = DocInfo,
- Results0 = FilterFun(Db, DocInfo),
- Results = [Result || Result <- Results0, Result /= null],
- Go = if Limit =< 1 -> stop; true -> ok end,
- case Results of
- [] ->
+ #doc_info{local_seq = Seq} = DocInfo,
+ Result = FilterFun(Db, DocInfo),
+ Go = if (Limit =< 1) andalso Result =/= null-> stop; true -> ok end,
+ case Result of
+ null ->
{Go, Acc#changes_acc{seq = Seq}};
_ ->
- ChangesRow = changes_row(Results, DocInfo, Acc),
+ ChangesRow = changes_row(Result, DocInfo, Acc),
UserAcc2 = Callback({change, ChangesRow, <<>>}, "continuous", UserAcc),
{Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2, limit = Limit - 1}}
end;
@@ -442,15 +418,14 @@ changes_enumerator(DocInfo, Acc) ->
filter = FilterFun, callback = Callback, prepend = Prepend,
user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db
} = Acc,
- #doc_info{high_seq = Seq} = DocInfo,
- Results0 = FilterFun(Db, DocInfo),
- Results = [Result || Result <- Results0, Result /= null],
- Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end,
- case Results of
- [] ->
+ #doc_info{local_seq = Seq} = DocInfo,
+ Result = FilterFun(Db, DocInfo),
+ Go = if (Limit =< 1) andalso Result =/= null -> stop; true -> ok end,
+ case Result of
+ null ->
{Go, Acc#changes_acc{seq = Seq}};
_ ->
- ChangesRow = changes_row(Results, DocInfo, Acc),
+ ChangesRow = changes_row(Result, DocInfo, Acc),
UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc),
{Go, Acc#changes_acc{
seq = Seq, prepend = <<",\n">>,
@@ -458,12 +433,12 @@ changes_enumerator(DocInfo, Acc) ->
end.
-changes_row(Results, DocInfo, Acc) ->
+changes_row(Result, DocInfo, Acc) ->
#doc_info{
- id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]
+ id = Id, local_seq = Seq, deleted = Del
} = DocInfo,
#changes_acc{db = Db, include_docs = IncDoc, conflicts = Conflicts} = Acc,
- {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"changes">>, Results}] ++
+ {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"changes">>, Result}] ++
deleted_item(Del) ++ case IncDoc of
true ->
Options = if Conflicts -> [conflicts]; true -> [] end,
View
4 src/couchdb/couch_compaction_daemon.erl
@@ -210,9 +210,9 @@ maybe_compact_views(DbName, [DDocName | Rest], Config) ->
db_ddoc_names(Db) ->
{ok, _, DDocNames} = couch_db:enum_docs(
Db,
- fun(#full_doc_info{id = <<"_design/", _/binary>>, deleted = true}, _, Acc) ->
+ fun(#doc_info{id = <<"_design/", _/binary>>, deleted = true}, _, Acc) ->
{ok, Acc};
- (#full_doc_info{id = <<"_design/", Id/binary>>}, _, Acc) ->
+ (#doc_info{id = <<"_design/", Id/binary>>}, _, Acc) ->
{ok, [Id | Acc]};
(_, _, Acc) ->
{stop, Acc}
View
1,000 src/couchdb/couch_db.erl
@@ -14,10 +14,11 @@
-behaviour(gen_server).
-export([open/2,open_int/2,close/1,create/2,get_db_info/1,get_design_docs/1]).
--export([start_compact/1, cancel_compact/1]).
+-export([start_compact/1, cancel_compact/1,get_design_docs/2]).
-export([open_ref_counted/2,is_idle/1,monitor/1,count_changes_since/2]).
--export([update_doc/3,update_doc/4,update_docs/4,update_docs/2,update_docs/3,delete_doc/3]).
--export([get_doc_info/2,open_doc/2,open_doc/3,open_doc_revs/4]).
+-export([update_doc/2,update_doc/3]).
+-export([update_docs/2,update_docs/3]).
+-export([get_doc_info/2,open_doc/2,open_doc/3]).
-export([set_revs_limit/2,get_revs_limit/1]).
-export([get_missing_revs/2,name/1,get_update_seq/1,get_committed_update_seq/1]).
-export([enum_docs/4,enum_docs_since/5]).
@@ -26,9 +27,9 @@
-export([start_link/3,open_doc_int/3,ensure_full_commit/1]).
-export([set_security/2,get_security/1]).
-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
--export([changes_since/4,changes_since/5,read_doc/2,new_revid/1]).
+-export([changes_since/4,changes_since/5]).
-export([check_is_admin/1, check_is_member/1]).
--export([reopen/1, get_current_seq/1, fast_reads/2]).
+-export([reopen/1,get_current_seq/1,fast_reads/2]).
-include("couch_db.hrl").
@@ -127,11 +128,6 @@ start_compact(#db{update_pid=Pid}) ->
cancel_compact(#db{update_pid=Pid}) ->
gen_server:call(Pid, cancel_compact, infinity).
-delete_doc(Db, Id, Revisions) ->
- DeletedDocs = [#doc{id=Id, revs=[Rev], deleted=true} || Rev <- Revisions],
- {ok, [Result]} = update_docs(Db, DeletedDocs, []),
- {ok, Result}.
-
open_doc(Db, IdOrDocInfo) ->
open_doc(Db, IdOrDocInfo, []).
@@ -153,15 +149,9 @@ apply_open_options({ok, Doc},Options) ->
apply_open_options2(Doc,Options);
apply_open_options(Else,_Options) ->
Else.
-
+
apply_open_options2(Doc,[]) ->
{ok, Doc};
-apply_open_options2(#doc{atts=Atts,revs=Revs}=Doc,
- [{atts_since, PossibleAncestors}|Rest]) ->
- RevPos = find_ancestor_rev_pos(Revs, PossibleAncestors),
- apply_open_options2(Doc#doc{atts=[A#att{data=
- if AttPos>RevPos -> Data; true -> stub end}
- || #att{revpos=AttPos,data=Data}=A <- Atts]}, Rest);
apply_open_options2(Doc, [ejson_body | Rest]) ->
apply_open_options2(couch_doc:with_ejson_body(Doc), Rest);
apply_open_options2(Doc, [json_bin_body | Rest]) ->
@@ -170,73 +160,36 @@ apply_open_options2(Doc,[_|Rest]) ->
apply_open_options2(Doc,Rest).
-find_ancestor_rev_pos({_, []}, _AttsSinceRevs) ->
- 0;
-find_ancestor_rev_pos(_DocRevs, []) ->
- 0;
-find_ancestor_rev_pos({RevPos, [RevId|Rest]}, AttsSinceRevs) ->
- case lists:member({RevPos, RevId}, AttsSinceRevs) of
- true ->
- RevPos;
- false ->
- find_ancestor_rev_pos({RevPos - 1, Rest}, AttsSinceRevs)
- end.
-
-open_doc_revs(Db, Id, Revs, Options) ->
- increment_stat(Db, {couchdb, database_reads}),
- [{ok, Results}] = open_doc_revs_int(Db, [{Id, Revs}], Options),
- {ok, [apply_open_options(Result, Options) || Result <- Results]}.
-
% Each returned result is a list of tuples:
-% {Id, MissingRevs, PossibleAncestors}
-% if no revs are missing, it's omitted from the results.
-get_missing_revs(Db, IdRevsList) ->
- Results = get_full_doc_infos(Db, [Id1 || {Id1, _Revs} <- IdRevsList]),
- {ok, find_missing(IdRevsList, Results)}.
+% {Id, MissingRev}
+% if the is on disk, it's omitted from the results.
+get_missing_revs(Db, IdRevList) ->
+ Results = get_doc_infos(Db, [Id || {Id, _Rev} <- IdRevList]),
+ {ok, find_missing(IdRevList, Results)}.
+
find_missing([], []) ->
[];
-find_missing([{Id, Revs}|RestIdRevs], [{ok, FullInfo} | RestLookupInfo]) ->
- case couch_key_tree:find_missing(FullInfo#full_doc_info.rev_tree, Revs) of
- [] ->
+find_missing([{Id,{RevPos, RevId}}|RestIdRevs],
+ [{ok, #doc_info{rev={DiskRevPos, DiskRevId}}} | RestLookupInfo]) ->
+ case {RevPos, RevId} of
+ {DiskRevPos, DiskRevId} ->
find_missing(RestIdRevs, RestLookupInfo);
- MissingRevs ->
- #doc_info{revs=RevsInfo} = couch_doc:to_doc_info(FullInfo),
- LeafRevs = [Rev || #rev_info{rev=Rev} <- RevsInfo],
- % Find the revs that are possible parents of this rev
- PossibleAncestors =
- lists:foldl(fun({LeafPos, LeafRevId}, Acc) ->
- % this leaf is a "possible ancenstor" of the missing
- % revs if this LeafPos lessthan any of the missing revs
- case lists:any(fun({MissingPos, _}) ->
- LeafPos < MissingPos end, MissingRevs) of
- true ->
- [{LeafPos, LeafRevId} | Acc];
- false ->
- Acc
- end
- end, [], LeafRevs),
- [{Id, MissingRevs, PossibleAncestors} |
+ _ ->
+ [{Id, {DiskRevPos, DiskRevId}} |
find_missing(RestIdRevs, RestLookupInfo)]
end;
-find_missing([{Id, Revs}|RestIdRevs], [not_found | RestLookupInfo]) ->
- [{Id, Revs, []} | find_missing(RestIdRevs, RestLookupInfo)].
+find_missing([{Id, Rev}|RestIdRevs], [not_found | RestLookupInfo]) ->
+ [{Id, Rev} | find_missing(RestIdRevs, RestLookupInfo)].
-get_doc_info(Db, Id) ->
- case get_full_doc_info(Db, Id) of
- {ok, DocInfo} ->
- {ok, couch_doc:to_doc_info(DocInfo)};
- Else ->
- Else
- end.
% returns {ok, DocInfo} or not_found
-get_full_doc_info(Db, Id) ->
- [Result] = get_full_doc_infos(Db, [Id]),
+get_doc_info(Db, Id) ->
+ [Result] = get_doc_infos(Db, [Id]),
Result.
-get_full_doc_infos(Db, Ids) ->
- couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids).
+get_doc_infos(Db, Ids) ->
+ couch_btree:lookup(Db#db.docinfo_by_id_btree, Ids).
increment_update_seq(#db{update_pid=UpdatePid}) ->
gen_server:call(UpdatePid, increment_update_seq, infinity).
@@ -266,7 +219,7 @@ get_db_info(Db) ->
name=Name,
instance_start_time=StartTime,
committed_update_seq=CommittedUpdateSeq,
- fulldocinfo_by_id_btree = IdBtree,
+ docinfo_by_id_btree = IdBtree,
docinfo_by_seq_btree = SeqBtree,
local_docs_btree = LocalBtree
} = Db,
@@ -305,12 +258,17 @@ sum_tree_sizes(Acc, [T | Rest]) ->
sum_tree_sizes(Acc + Sz, Rest)
end.
+
get_design_docs(Db) ->
- {ok,_, Docs} = couch_btree:fold(Db#db.fulldocinfo_by_id_btree,
- fun(#full_doc_info{deleted = true}, _Reds, AccDocs) ->
+ get_design_docs(Db, no_deletes).
+
+get_design_docs(Db, DeletedAlso) ->
+ {ok,_, Docs} = couch_btree:fold(Db#db.docinfo_by_id_btree,
+ fun(#doc_info{deleted = true}, _Reds, AccDocs)
+ when DeletedAlso == no_deletes ->
{ok, AccDocs};
- (#full_doc_info{id= <<"_design/",_/binary>>}=FullDocInfo, _Reds, AccDocs) ->
- {ok, Doc} = open_doc_int(Db, FullDocInfo, [ejson_body]),
+ (#doc_info{id= <<"_design/",_/binary>>}=DocInfo, _Reds, AccDocs) ->
+ {ok, Doc} = open_doc_int(Db, DocInfo, [ejson_body]),
{ok, [Doc | AccDocs]};
(_, _Reds, AccDocs) ->
{stop, AccDocs}
@@ -417,22 +375,11 @@ set_revs_limit(_Db, _Limit) ->
name(#db{name=Name}) ->
Name.
+update_doc(Db, Docs) ->
+ update_doc(Db, Docs, []).
+
update_doc(Db, Doc, Options) ->
- update_doc(Db, Doc, Options, interactive_edit).
-
-update_doc(Db, Doc, Options, UpdateType) ->
- case update_docs(Db, [Doc], Options, UpdateType) of
- {ok, [{ok, NewRev}]} ->
- {ok, NewRev};
- {ok, [{{_Id, _Rev}, Error}]} ->
- throw(Error);
- {ok, [Error]} ->
- throw(Error);
- {ok, []} ->
- % replication success
- {Pos, [RevId | _]} = Doc#doc.revs,
- {ok, {Pos, RevId}}
- end.
+ update_docs(Db, [Doc], Options).
update_docs(Db, Docs) ->
update_docs(Db, Docs, []).
@@ -455,322 +402,13 @@ fast_reads(#db{main_pid=Pid}=Db, Fun) ->
end.
-% group_alike_docs groups the sorted documents into sublist buckets, by id.
-% ([DocA, DocA, DocB, DocC], []) -> [[DocA, DocA], [DocB], [DocC]]
-group_alike_docs(Docs) ->
- Sorted = lists:sort(fun(#doc{id=A},#doc{id=B})-> A < B end, Docs),
- group_alike_docs(Sorted, []).
-
-group_alike_docs([], Buckets) ->
- lists:reverse(Buckets);
-group_alike_docs([Doc|Rest], []) ->
- group_alike_docs(Rest, [[Doc]]);
-group_alike_docs([Doc|Rest], [Bucket|RestBuckets]) ->
- [#doc{id=BucketId}|_] = Bucket,
- case Doc#doc.id == BucketId of
- true ->
- % add to existing bucket
- group_alike_docs(Rest, [[Doc|Bucket]|RestBuckets]);
- false ->
- % add to new bucket
- group_alike_docs(Rest, [[Doc]|[Bucket|RestBuckets]])
- end.
-
-validate_doc_update(#db{}=Db, #doc{id= <<"_design/",_/binary>>}, _GetDiskDocFun) ->
- catch check_is_admin(Db);
-validate_doc_update(#db{validate_doc_funs=[]}, _Doc, _GetDiskDocFun) ->
- ok;
-validate_doc_update(_Db, #doc{id= <<"_local/",_/binary>>}, _GetDiskDocFun) ->
- ok;
-validate_doc_update(Db, Doc, GetDiskDocFun) ->
- DiskDoc = GetDiskDocFun(),
- JsonCtx = couch_util:json_user_ctx(Db),
- SecObj = get_security(Db),
- try [case Fun(Doc, DiskDoc, JsonCtx, SecObj) of
- ok -> ok;
- Error -> throw(Error)
- end || Fun <- Db#db.validate_doc_funs],
- ok
- catch
- throw:Error ->
- Error
- end.
-
-
-prep_and_validate_update(Db, #doc{id=Id,revs={RevStart0, Revs0}}=Doc,
- OldFullDocInfo, LeafRevsDict, AllowConflict, Clobber) ->
- case Clobber of
- true ->
- {RevStart, Revs} = {0, []};
- false ->
- {RevStart, Revs} = {RevStart0, Revs0}
- end,
-
- case Revs of
- [PrevRev|_] ->
- case dict:find({RevStart, PrevRev}, LeafRevsDict) of
- {ok, {Deleted, DiskSp, DiskRevs}} ->
- case couch_doc:has_stubs(Doc) of
- true ->
- DiskDoc = make_doc(Db, Id, Deleted, DiskSp, DiskRevs),
- Doc2 = couch_doc:merge_stubs(Doc, DiskDoc),
- {validate_doc_update(Db, Doc2, fun() -> DiskDoc end), Doc2};
- false ->
- LoadDiskDoc = fun() -> make_doc(Db,Id,Deleted,DiskSp,DiskRevs) end,
- {validate_doc_update(Db, Doc, LoadDiskDoc), Doc}
- end;
- error when AllowConflict ->
- couch_doc:merge_stubs(Doc, #doc{}), % will generate error if
- % there are stubs
- {validate_doc_update(Db, Doc, fun() -> nil end), Doc};
- error ->
- {conflict, Doc}
- end;
- [] ->
- % new doc, and we have existing revs.
- % reuse existing deleted doc
- if OldFullDocInfo#full_doc_info.deleted orelse AllowConflict ->
- {validate_doc_update(Db, Doc, fun() -> nil end), Doc};
- true ->
- {conflict, Doc}
- end
- end.
-
-
-
-prep_and_validate_updates(_Db, [], [], _AllowConflict, _Clobber, AccPrepped,
- AccFatalErrors) ->
- {lists:reverse(AccPrepped), lists:reverse(AccFatalErrors)};
-prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups],
- AllowConflict, Clobber, AccPrepped, AccErrors) ->
- [#doc{id=Id}|_]=DocBucket,
- % no existing revs are known,
- {PreppedBucket, AccErrors3} = lists:foldl(
- fun(#doc{revs=Revs0}=Doc, {AccBucket, AccErrors2}) ->
- case Clobber of
- true ->
- Revs = {0, []};
- false ->
- Revs = Revs0
- end,
- case couch_doc:has_stubs(Doc) of
- true ->
- couch_doc:merge_stubs(Doc, #doc{}); % will throw exception
- false -> ok
- end,
- case Revs of
- {0, []} ->
- case validate_doc_update(Db, Doc, fun() -> nil end) of
- ok ->
- {[Doc | AccBucket], AccErrors2};
- Error ->
- {AccBucket, [{{Id, {0, []}}, Error} | AccErrors2]}
- end;
- _ ->
- % old revs specified but none exist, a conflict
- {AccBucket, [{{Id, Revs}, conflict} | AccErrors2]}
- end
- end,
- {[], AccErrors}, DocBucket),
-
- prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict,
- Clobber, [PreppedBucket | AccPrepped], AccErrors3);
-prep_and_validate_updates(Db, [DocBucket|RestBuckets],
- [{ok, #full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo}|RestLookups],
- AllowConflict, Clobber, AccPrepped, AccErrors) ->
- Leafs = couch_key_tree:get_all_leafs(OldRevTree),
- LeafRevsDict = dict:from_list([
- begin
- Deleted = element(1, LeafVal),
- Sp = element(2, LeafVal),
- {{Start, RevId}, {Deleted, Sp, Revs}}
- end ||
- {LeafVal, {Start, [RevId | _]} = Revs} <- Leafs
- ]),
- {PreppedBucket, AccErrors3} = lists:foldl(
- fun(Doc, {Docs2Acc, AccErrors2}) ->
- case prep_and_validate_update(Db, Doc, OldFullDocInfo,
- LeafRevsDict, AllowConflict, Clobber) of
- {ok, Doc2} ->
- {[Doc2 | Docs2Acc], AccErrors2};
- {Error, #doc{id=Id,revs=Revs}} ->
- % Record the error
- {Docs2Acc, [{{Id, Revs}, Error} |AccErrors2]}
- end
- end,
- {[], AccErrors}, DocBucket),
- prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict,
- Clobber, [PreppedBucket | AccPrepped], AccErrors3).
-
-
-update_docs(Db, Docs, Options) ->
- update_docs(Db, Docs, Options, interactive_edit).
-
-
-to_replicated_path({Start, RevIds}) ->
- [Branch] = to_replicated_branch(lists:reverse(RevIds)),
- {Start - length(RevIds) + 1, Branch}.
-
-to_replicated_branch([RevId]) ->
- [{RevId, ?REV_MISSING, []}];
-to_replicated_branch([RevId | Rest]) ->
- [{RevId, ?REV_MISSING, to_replicated_branch(Rest)}].
-
-
-prep_and_validate_replicated_updates(_Db, [], [], AccPrepped, AccErrors) ->
- Errors2 = [{{Id, {Pos, Rev}}, Error} ||
- {#doc{id=Id,revs={Pos,[Rev|_]}}, Error} <- AccErrors],
- {lists:reverse(AccPrepped), lists:reverse(Errors2)};
-prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldInfo], AccPrepped, AccErrors) ->
- case OldInfo of
- not_found ->
- {ValidatedBucket, AccErrors3} = lists:foldl(
- fun(Doc, {AccPrepped2, AccErrors2}) ->
- case couch_doc:has_stubs(Doc) of
- true ->
- couch_doc:merge_stubs(Doc, #doc{}); % will throw exception
- false -> ok
- end,
- case validate_doc_update(Db, Doc, fun() -> nil end) of
- ok ->
- {[Doc | AccPrepped2], AccErrors2};
- Error ->
- {AccPrepped2, [{Doc, Error} | AccErrors2]}
- end
- end,
- {[], AccErrors}, Bucket),
- prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo, [ValidatedBucket | AccPrepped], AccErrors3);
- {ok, #full_doc_info{rev_tree=OldTree}} ->
- NewRevTree = lists:foldl(
- fun(#doc{revs=Revs}, AccTree) ->
- {NewTree, _} = couch_key_tree:merge(AccTree,
- to_replicated_path(Revs), Db#db.revs_limit),
- NewTree
- end,
- OldTree, Bucket),
- Leafs = couch_key_tree:get_all_leafs_full(NewRevTree),
- LeafRevsFullDict = dict:from_list( [{{Start, RevId}, FullPath} || {Start, [{RevId, _}|_]}=FullPath <- Leafs]),
- {ValidatedBucket, AccErrors3} =
- lists:foldl(
- fun(#doc{id=Id,revs={Pos, [RevId|_]}}=Doc, {AccValidated, AccErrors2}) ->
- case dict:find({Pos, RevId}, LeafRevsFullDict) of
- {ok, {Start, Path}} ->
- % our unflushed doc is a leaf node. Go back on the path
- % to find the previous rev that's on disk.
-
- LoadPrevRevFun = fun() ->
- make_first_doc_on_disk(Db,Id,Start-1, tl(Path))
- end,
-
- case couch_doc:has_stubs(Doc) of
- true ->
- DiskDoc = LoadPrevRevFun(),
- Doc2 = couch_doc:merge_stubs(Doc, DiskDoc),
- GetDiskDocFun = fun() -> DiskDoc end;
- false ->
- Doc2 = Doc,
- GetDiskDocFun = LoadPrevRevFun
- end,
-
- case validate_doc_update(Db, Doc2, GetDiskDocFun) of
- ok ->
- {[Doc2 | AccValidated], AccErrors2};
- Error ->
- {AccValidated, [{Doc, Error} | AccErrors2]}
- end;
- _ ->
- % this doc isn't a leaf or already exists in the tree.
- % ignore but consider it a success.
- {AccValidated, AccErrors2}
- end
- end,
- {[], AccErrors}, Bucket),
- prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo,
- [ValidatedBucket | AccPrepped], AccErrors3)
- end.
-
-
-
-new_revid(#doc{body=Body,revs={OldStart,OldRevs},
- atts=Atts,deleted=Deleted}) ->
- case [{N, T, M} || #att{name=N,type=T,md5=M} <- Atts, M =/= <<>>] of
- Atts2 when length(Atts) =/= length(Atts2) ->
- % We must have old style non-md5 attachments
- ?l2b(integer_to_list(couch_util:rand32()));
- Atts2 ->
- OldRev = case OldRevs of [] -> 0; [OldRev0|_] -> OldRev0 end,
- couch_util:md5(term_to_binary([Deleted, OldStart, OldRev, Body, Atts2]))
- end.
-
-new_revs([], _Clobber, OutBuckets, IdRevsAcc) ->
- {lists:reverse(OutBuckets), IdRevsAcc};
-new_revs([Bucket|RestBuckets], Clobber, OutBuckets, IdRevsAcc) ->
- {NewBucket, IdRevsAcc3} = lists:mapfoldl(
- fun(#doc{id=Id,revs={Start, RevIds}}=Doc, IdRevsAcc2)->
- {Start2, RevIds2} = case Clobber of
- true ->
- case RevIds /= [] of
- true -> % A new rev id is provided by the client with clobber option
- [NewRevId | _] = RevIds;
- false ->
- NewRevId = 0
- end,
- {Start, RevIds};
- false ->
- NewRevId = new_revid(Doc),
- {Start+1, [NewRevId | RevIds]}
- end,
- {Doc#doc{revs={Start2, RevIds2}},
- [{{Id, {Start, RevIds}}, {ok, {Start2, NewRevId}}} | IdRevsAcc2]}
- end, IdRevsAcc, Bucket),
- new_revs(RestBuckets, Clobber, [NewBucket|OutBuckets], IdRevsAcc3).
-
-check_dup_atts(#doc{atts=Atts}=Doc) ->
- Atts2 = lists:sort(fun(#att{name=N1}, #att{name=N2}) -> N1 < N2 end, Atts),
- check_dup_atts2(Atts2),
- Doc.
-
-check_dup_atts2([#att{name=N}, #att{name=N} | _]) ->
- throw({bad_request, <<"Duplicate attachments">>});
-check_dup_atts2([_ | Rest]) ->
- check_dup_atts2(Rest);
-check_dup_atts2(_) ->
- ok.
-
-
-update_docs(Db, Docs, Options, replicated_changes) ->
- increment_stat(Db, {couchdb, database_writes}),
- DocBuckets = group_alike_docs(Docs),
-
- case (Db#db.validate_doc_funs /= []) orelse
- lists:any(
- fun(#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>}) -> true;
- (#doc{atts=Atts}) ->
- Atts /= []
- end, Docs) of
- true ->
- Ids = [Id || [#doc{id=Id}|_] <- DocBuckets],
- ExistingDocs = get_full_doc_infos(Db, Ids),
- {DocBuckets2, DocErrors} =
- prep_and_validate_replicated_updates(Db, DocBuckets, ExistingDocs, [], []),
- DocBuckets3 = [Bucket || [_|_]=Bucket <- DocBuckets2]; % remove empty buckets
- false ->
- DocErrors = [],
- DocBuckets3 = DocBuckets
- end,
- DocBuckets4 = [[doc_flush_atts(check_dup_atts(Doc), Db#db.fd)
- || Doc <- Bucket] || Bucket <- DocBuckets3],
- {ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts | Options]),
- {ok, DocErrors};
-update_docs(Db, Docs, Options, interactive_edit) ->
+update_docs(#db{name=DbName}=Db, Docs, Options0) ->
increment_stat(Db, {couchdb, database_writes}),
- AllOrNothing = lists:member(all_or_nothing, Options),
- PreSorted = lists:member(presorted, Options),
% go ahead and generate the new revision ids for the documents.
% separate out the NonRep documents from the rest of the documents
- {Docs2, NonRepDocs} = lists:foldl(
+ {Docs1, NonRepDocs1} = lists:foldl(
fun(#doc{id=Id}=Doc, {DocsAcc, NonRepDocsAcc}) ->
case Id of
<<?LOCAL_DOC_PREFIX, _/binary>> ->
@@ -779,97 +417,47 @@ update_docs(Db, Docs, Options, interactive_edit) ->
{[Doc | DocsAcc], NonRepDocsAcc}
end
end, {[], []}, Docs),
-
- DocBuckets = if PreSorted ->
- group_alike_docs(Docs2, []);
+ case lists:member(sort_docs, Options0) of
true ->
- group_alike_docs(Docs2)
- end,
- Optimistic = lists:member(optimistic, Options),
- Clobber = lists:member(clobber, Options),
-
- case (Db#db.validate_doc_funs /= []) orelse
- lists:any(
- fun(#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>}) ->
- true;
- (#doc{atts=[]}) ->
- false;
- (Doc) when Optimistic ->
- % if we are optimistically committing, we don't do any
- % lookup before we write the attachments or the bodies
- % unless there are stubs, then we have to.
- couch_doc:has_stubs(Doc);
- (_Doc) ->
- true
- end, Docs2) of
- true ->
- % lookup the doc by id and get the most recent
- Ids = [Id || [#doc{id=Id}|_] <- DocBuckets],
- ExistingDocInfos = get_full_doc_infos(Db, Ids),
-
- {DocBucketsPrepped, PreCommitFailures} = prep_and_validate_updates(Db,
- DocBuckets, ExistingDocInfos, AllOrNothing, Clobber, [], []),
-
- % strip out any empty buckets
- DocBuckets2 = [Bucket || [_|_] = Bucket <- DocBucketsPrepped];
+ Docs2 = lists:keysort(#doc.id, Docs1),
+ NonRepDocs = lists:keysort(#doc.id, NonRepDocs1);
false ->
- PreCommitFailures = [],
- DocBuckets2 = DocBuckets
+ Docs2 = Docs1,
+ NonRepDocs = NonRepDocs1
end,
-
- if (AllOrNothing) and (PreCommitFailures /= []) ->
- {aborted, lists:map(
- fun({{Id,{Pos, [RevId|_]}}, Error}) ->
- {{Id, {Pos, RevId}}, Error};
- ({{Id,{0, []}}, Error}) ->
- {{Id, {0, <<>>}}, Error}
- end, PreCommitFailures)};
- true ->
- Options2 = if AllOrNothing -> [merge_conflicts];
- true -> [] end ++ Options,
- DocBuckets3 = [[
- doc_flush_atts(set_new_att_revpos(
- Doc), Db#db.fd)
- || Doc <- B] || B <- DocBuckets2],
-
- {DocBuckets4, IdRevs} = new_revs(DocBuckets3, Clobber, [], []),
- {ok, CommitResults} = write_and_commit(Db, DocBuckets4, NonRepDocs, Options2),
-
- case lists:member(return_errors_only, Options) of
- false ->
- ResultsDict = dict:from_list(IdRevs ++ CommitResults ++ PreCommitFailures),
- {ok, lists:map(
- fun(#doc{id=Id,revs={Pos, RevIds}}) ->
- {ok, Result} = dict:find({Id, {Pos, RevIds}}, ResultsDict),
- Result
- end, Docs)};
- true ->
- CommitErrors = lists:dropwhile(
- fun({{_Id,_Rev}, Result}) ->
- case Result of
- {ok, _} -> true;
- _ -> false
- end
- end, CommitResults),
- Errors = CommitErrors ++ PreCommitFailures,
- {ok, [{Id, Error} || {{Id,_rev}, Error} <- Errors]}
+ Options = set_commit_option(Options0),
+ FullCommit = lists:member(full_commit, Options),
+ Docs3 = write_doc_bodies(Db, Docs2),
+ MRef = erlang:monitor(process, Db#db.update_pid),
+ try
+ Db#db.update_pid ! {update_docs, self(), Docs3, NonRepDocs,
+ FullCommit},
+ case get_result(Db#db.update_pid, MRef) of
+ ok ->
+ [couch_db_update_notifier:notify({ddoc_updated, {DbName, Id}})
+ || #doc{id = <<?DESIGN_DOC_PREFIX, _/binary>> = Id} <- Docs],
+ ok;
+ retry ->
+ % This can happen if the db file we wrote to was swapped out by
+ % compaction. Retry by reopening the db and writing to the current file
+ {ok, Db2} = open_ref_counted(Db#db.main_pid, self()),
+ % We only retry once
+ Docs4 = write_doc_bodies(Db2, Docs2),
+ close(Db2),
+ Db#db.update_pid ! {update_docs, self(), Docs4, NonRepDocs,
+ FullCommit},
+ case get_result(Db#db.update_pid, MRef) of
+ ok ->
+ [couch_db_update_notifier:notify({ddoc_updated, {DbName, Id}})
+ || #doc{id = <<?DESIGN_DOC_PREFIX, _/binary>> = Id} <- Docs],
+ ok;
+ retry -> throw({update_error, compaction_retry})
+ end
end
+ after
+ erlang:demonitor(MRef, [flush])
end.
-% Returns the first available document on disk. Input list is a full rev path
-% for the doc.
-make_first_doc_on_disk(_Db, _Id, _Pos, []) ->
- nil;
-make_first_doc_on_disk(Db, Id, Pos, [{_Rev, #doc{}} | RestPath]) ->
- make_first_doc_on_disk(Db, Id, Pos-1, RestPath);
-make_first_doc_on_disk(Db, Id, Pos, [{_Rev, ?REV_MISSING}|RestPath]) ->
- make_first_doc_on_disk(Db, Id, Pos - 1, RestPath);
-make_first_doc_on_disk(Db, Id, Pos, [{_Rev, RevValue} |_]=DocPath) ->
- IsDel = element(1, RevValue),
- Sp = element(2, RevValue),
- Revs = [Rev || {Rev, _} <- DocPath],
- make_doc(Db, Id, IsDel, Sp, {Pos, Revs}).
-
set_commit_option(Options) ->
CommitSettings = {
[true || O <- Options, O==full_commit orelse O==delay_commit],
@@ -888,253 +476,54 @@ set_commit_option(Options) ->
[full_commit|Options]
end.
-collect_results(UpdatePid, MRef, ResultsAcc) ->
+get_result(UpdatePid, MRef) ->
receive
- {result, UpdatePid, Result} ->
- collect_results(UpdatePid, MRef, [Result | ResultsAcc]);
{done, UpdatePid} ->
- {ok, ResultsAcc};
+ ok;
{retry, UpdatePid} ->
retry;
{'DOWN', MRef, _, _, Reason} ->
exit(Reason)
end.
-write_and_commit(#db{update_pid=UpdatePid}=Db, DocBuckets1,
- NonRepDocs, Options0) ->
- Options = set_commit_option(