Permalink
Browse files

Add references to docs to prevent dups from being collapsed

bulk docs may have duplicates or multiple
docs with the same id. These were being
identified and collapsed in a dictionary as
messages were passed from merge_rev_trees to
couch_db collect_results. Attaching a
reference to each doc allows them to be
processed correctly.

Jira-911
BugzID:12540
  • Loading branch information...
1 parent cbb4bd7 commit a0b67d83d4a254a505019394a8def5bc35ff9b62 Bob Dionne committed Aug 31, 2011
Showing with 78 additions and 68 deletions.
  1. +62 −51 apps/couch/src/couch_db.erl
  2. +16 −17 apps/couch/src/couch_db_updater.erl
View
@@ -421,22 +421,22 @@ update_docs(Db, Docs) ->
% group_alike_docs groups the sorted documents into sublist buckets, by id.
% ([DocA, DocA, DocB, DocC], []) -> [[DocA, DocA], [DocB], [DocC]]
group_alike_docs(Docs) ->
- Sorted = lists:sort(fun(#doc{id=A},#doc{id=B})-> A < B end, Docs),
+ Sorted = lists:sort(fun({#doc{id=A},_},{#doc{id=B},_})-> A < B end, Docs),
group_alike_docs(Sorted, []).
group_alike_docs([], Buckets) ->
lists:reverse(Buckets);
group_alike_docs([Doc|Rest], []) ->
group_alike_docs(Rest, [[Doc]]);
-group_alike_docs([Doc|Rest], [Bucket|RestBuckets]) ->
- [#doc{id=BucketId}|_] = Bucket,
+group_alike_docs([{Doc,Ref}|Rest], [Bucket|RestBuckets]) ->
+ [{#doc{id=BucketId},_Ref}|_] = Bucket,
case Doc#doc.id == BucketId of
true ->
% add to existing bucket
- group_alike_docs(Rest, [[Doc|Bucket]|RestBuckets]);
+ group_alike_docs(Rest, [[{Doc,Ref}|Bucket]|RestBuckets]);
false ->
% add to new bucket
- group_alike_docs(Rest, [[Doc]|[Bucket|RestBuckets]])
+ group_alike_docs(Rest, [[{Doc,Ref}]|[Bucket|RestBuckets]])
end.
validate_doc_update(#db{}=Db, #doc{id= <<"_design/",_/binary>>}, _GetDiskDocFun) ->
@@ -523,10 +523,8 @@ prep_and_validate_updates(_Db, [], [], _AllowConflict, AccPrepped,
{AccPrepped, AccFatalErrors};
prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups],
AllowConflict, AccPrepped, AccErrors) ->
- [#doc{id=Id}|_]=DocBucket,
- % no existing revs are known,
{PreppedBucket, AccErrors3} = lists:foldl(
- fun(#doc{revs=Revs}=Doc, {AccBucket, AccErrors2}) ->
+ fun({#doc{revs=Revs}=Doc, Ref}, {AccBucket, AccErrors2}) ->
case couch_doc:has_stubs(Doc) of
true ->
couch_doc:merge_stubs(Doc, #doc{}); % will throw exception
@@ -536,13 +534,13 @@ prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups],
{0, []} ->
case validate_doc_update(Db, Doc, fun() -> nil end) of
ok ->
- {[Doc | AccBucket], AccErrors2};
+ {[{Doc, Ref} | AccBucket], AccErrors2};
Error ->
- {AccBucket, [{{Id, {0, []}}, Error} | AccErrors2]}
+ {AccBucket, [{Ref, Error} | AccErrors2]}
end;
_ ->
% old revs specified but none exist, a conflict
- {AccBucket, [{{Id, Revs}, conflict} | AccErrors2]}
+ {AccBucket, [{Ref, conflict} | AccErrors2]}
end
end,
{[], AccErrors}, DocBucket),
@@ -556,14 +554,14 @@ prep_and_validate_updates(Db, [DocBucket|RestBuckets],
LeafRevsDict = dict:from_list([{{Start, RevId}, {Del, Ptr, Revs}} ||
{#leaf{deleted=Del, ptr=Ptr}, {Start, [RevId|_]}=Revs} <- Leafs]),
{PreppedBucket, AccErrors3} = lists:foldl(
- fun(Doc, {Docs2Acc, AccErrors2}) ->
+ fun({Doc,Ref}, {Docs2Acc, AccErrors2}) ->
case prep_and_validate_update(Db, Doc, OldFullDocInfo,
LeafRevsDict, AllowConflict) of
{ok, Doc2} ->
- {[Doc2 | Docs2Acc], AccErrors2};
- {Error, #doc{id=Id,revs=Revs}} ->
+ {[{Doc2, Ref} | Docs2Acc], AccErrors2};
+ {Error, #doc{}} ->
% Record the error
- {Docs2Acc, [{{Id, Revs}, Error} |AccErrors2]}
+ {Docs2Acc, [{Ref, Error} |AccErrors2]}
end
end,
{[], AccErrors}, DocBucket),
@@ -583,15 +581,15 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI
case OldInfo of
not_found ->
{ValidatedBucket, AccErrors3} = lists:foldl(
- fun(Doc, {AccPrepped2, AccErrors2}) ->
+ fun({Doc,Ref}, {AccPrepped2, AccErrors2}) ->
case couch_doc:has_stubs(Doc) of
true ->
couch_doc:merge_stubs(Doc, #doc{}); % will throw exception
false -> ok
end,
case validate_doc_update(Db, Doc, fun() -> nil end) of
ok ->
- {[Doc | AccPrepped2], AccErrors2};
+ {[{Doc,Ref} | AccPrepped2], AccErrors2};
Error ->
{AccPrepped2, [{Doc, Error} | AccErrors2]}
end
@@ -600,7 +598,7 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI
prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo, [ValidatedBucket | AccPrepped], AccErrors3);
{ok, #full_doc_info{rev_tree=OldTree}} ->
NewRevTree = lists:foldl(
- fun(NewDoc, AccTree) ->
+ fun({NewDoc,_Ref}, AccTree) ->
{NewTree, _} = couch_key_tree:merge(AccTree,
couch_db:doc_to_tree(NewDoc), Db#db.revs_limit),
NewTree
@@ -610,7 +608,7 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI
LeafRevsFullDict = dict:from_list( [{{Start, RevId}, FullPath} || {Start, [{RevId, _}|_]}=FullPath <- Leafs]),
{ValidatedBucket, AccErrors3} =
lists:foldl(
- fun(#doc{id=Id,revs={Pos, [RevId|_]}}=Doc, {AccValidated, AccErrors2}) ->
+ fun({#doc{id=Id,revs={Pos, [RevId|_]}}=Doc, Ref}, {AccValidated, AccErrors2}) ->
case dict:find({Pos, RevId}, LeafRevsFullDict) of
{ok, {Start, Path}} ->
% our unflushed doc is a leaf node. Go back on the path
@@ -632,7 +630,7 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI
case validate_doc_update(Db, Doc2, GetDiskDocFun) of
ok ->
- {[Doc2 | AccValidated], AccErrors2};
+ {[{Doc2, Ref} | AccValidated], AccErrors2};
Error ->
{AccValidated, [{Doc, Error} | AccErrors2]}
end;
@@ -664,10 +662,10 @@ new_revs([], OutBuckets, IdRevsAcc) ->
{lists:reverse(OutBuckets), IdRevsAcc};
new_revs([Bucket|RestBuckets], OutBuckets, IdRevsAcc) ->
{NewBucket, IdRevsAcc3} = lists:mapfoldl(
- fun(#doc{id=Id,revs={Start, RevIds}}=Doc, IdRevsAcc2)->
+ fun({#doc{revs={Start, RevIds}}=Doc, Ref}, IdRevsAcc2)->
NewRevId = new_revid(Doc),
- {Doc#doc{revs={Start+1, [NewRevId | RevIds]}},
- [{{Id, {Start, RevIds}}, {ok, {Start+1, NewRevId}}} | IdRevsAcc2]}
+ {{Doc#doc{revs={Start+1, [NewRevId | RevIds]}}, Ref},
+ [{Ref, {ok, {Start+1, NewRevId}}} | IdRevsAcc2]}
end, IdRevsAcc, Bucket),
new_revs(RestBuckets, [NewBucket|OutBuckets], IdRevsAcc3).
@@ -686,16 +684,21 @@ check_dup_atts2(_) ->
update_docs(Db, Docs, Options, replicated_changes) ->
increment_stat(Db, {couchdb, database_writes}),
- DocBuckets = group_alike_docs(Docs),
+
+ % associate reference with each doc in order to track duplicates
+ Docs2 = lists:map(fun(Doc) ->
+ {Doc, make_ref()}
+ end,Docs),
+ DocBuckets = group_alike_docs(Docs2),
case (Db#db.validate_doc_funs /= []) orelse
lists:any(
- fun(#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>}) -> true;
- (#doc{atts=Atts}) ->
+ fun({#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>},_Ref}) -> true;
+ ({#doc{atts=Atts},_Ref}) ->
Atts /= []
- end, Docs) of
+ end, Docs2) of
true ->
- Ids = [Id || [#doc{id=Id}|_] <- DocBuckets],
+ Ids = [Id || [{#doc{id=Id}, _Ref}|_] <- DocBuckets],
ExistingDocs = get_full_doc_infos(Db, Ids),
{DocBuckets2, DocErrors} =
@@ -705,8 +708,8 @@ update_docs(Db, Docs, Options, replicated_changes) ->
DocErrors = [],
DocBuckets3 = DocBuckets
end,
- DocBuckets4 = [[doc_flush_atts(check_dup_atts(Doc), Db#db.fd)
- || Doc <- Bucket] || Bucket <- DocBuckets3],
+ DocBuckets4 = [[{doc_flush_atts(check_dup_atts(Doc), Db#db.fd), Ref}
+ || {Doc, Ref} <- Bucket] || Bucket <- DocBuckets3],
{ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts | Options]),
{ok, DocErrors};
@@ -715,28 +718,33 @@ update_docs(Db, Docs, Options, interactive_edit) ->
AllOrNothing = lists:member(all_or_nothing, Options),
% go ahead and generate the new revision ids for the documents.
% separate out the NonRep documents from the rest of the documents
- {Docs2, NonRepDocs} = lists:foldl(
- fun(#doc{id=Id}=Doc, {DocsAcc, NonRepDocsAcc}) ->
+
+ % associate reference with each doc in order to track duplicates
+ Docs2 = lists:map(fun(Doc) ->
+ {Doc, make_ref()}
+ end,Docs),
+ {Docs3, NonRepDocs} = lists:foldl(
+ fun({#doc{id=Id},_Ref}=Doc, {DocsAcc, NonRepDocsAcc}) ->
case Id of
<<?LOCAL_DOC_PREFIX, _/binary>> ->
{DocsAcc, [Doc | NonRepDocsAcc]};
Id->
{[Doc | DocsAcc], NonRepDocsAcc}
end
- end, {[], []}, Docs),
+ end, {[], []}, Docs2),
- DocBuckets = group_alike_docs(Docs2),
+ DocBuckets = group_alike_docs(Docs3),
case (Db#db.validate_doc_funs /= []) orelse
lists:any(
- fun(#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>}) ->
+ fun({#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>},_Ref}) ->
true;
- (#doc{atts=Atts}) ->
+ ({#doc{atts=Atts},_Ref}) ->
Atts /= []
- end, Docs2) of
+ end, Docs3) of
true ->
% lookup the doc by id and get the most recent
- Ids = [Id || [#doc{id=Id}|_] <- DocBuckets],
+ Ids = [Id || [{#doc{id=Id}, _Ref}|_] <- DocBuckets],
ExistingDocInfos = get_full_doc_infos(Db, Ids),
{DocBucketsPrepped, PreCommitFailures} = prep_and_validate_updates(Db,
@@ -750,29 +758,32 @@ update_docs(Db, Docs, Options, interactive_edit) ->
end,
if (AllOrNothing) and (PreCommitFailures /= []) ->
- {aborted, lists:map(
- fun({{Id,{Pos, [RevId|_]}}, Error}) ->
- {{Id, {Pos, RevId}}, Error};
- ({{Id,{0, []}}, Error}) ->
- {{Id, {0, <<>>}}, Error}
- end, PreCommitFailures)};
+ {aborted,
+ lists:foldl(fun({#doc{id=Id,revs={Pos, RevIds}}, Ref},Acc) ->
+ case lists:keyfind(Ref,1,PreCommitFailures) of
+ {Ref, Error} ->
+ [{{Id,{Pos,RevIds}}, Error} | Acc];
+ false ->
+ Acc
+ end
+ end,[],Docs3)};
true ->
Options2 = if AllOrNothing -> [merge_conflicts];
true -> [] end ++ Options,
DocBuckets3 = [[
- doc_flush_atts(set_new_att_revpos(
- check_dup_atts(Doc)), Db#db.fd)
- || Doc <- B] || B <- DocBuckets2],
+ {doc_flush_atts(set_new_att_revpos(
+ check_dup_atts(Doc)), Db#db.fd), Ref}
+ || {Doc, Ref} <- B] || B <- DocBuckets2],
{DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []),
{ok, CommitResults} = write_and_commit(Db, DocBuckets4, NonRepDocs, Options2),
ResultsDict = dict:from_list(IdRevs ++ CommitResults ++ PreCommitFailures),
{ok, lists:map(
- fun(#doc{id=Id,revs={Pos, RevIds}}) ->
- {ok, Result} = dict:find({Id, {Pos, RevIds}}, ResultsDict),
+ fun({#doc{}, Ref}) ->
+ {ok, Result} = dict:find(Ref, ResultsDict),
Result
- end, Docs)}
+ end, Docs2)}
end.
% Returns the first available document on disk. Input list is a full rev path
@@ -831,7 +842,7 @@ write_and_commit(#db{main_pid=Pid, user_ctx=Ctx}=Db, DocBuckets,
% This can happen if the db file we wrote to was swapped out by
% compaction. Retry by reopening the db and writing to the current file
{ok, Db2} = open(Db#db.name, [{user_ctx, Ctx}]),
- DocBuckets2 = [[doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets],
+ DocBuckets2 = [[{doc_flush_atts(Doc, Db2#db.fd), Ref} || {Doc,Ref} <- Bucket] || Bucket <- DocBuckets],
% We only retry once
close(Db2),
Pid ! {update_docs, self(), DocBuckets2, NonRepDocs, MergeConflicts, FullCommit},
@@ -275,8 +275,8 @@ merge_updates([], RestB, AccOutGroups) ->
lists:reverse(AccOutGroups, RestB);
merge_updates(RestA, [], AccOutGroups) ->
lists:reverse(AccOutGroups, RestA);
-merge_updates([[{_, #doc{id=IdA}}|_]=GroupA | RestA],
- [[{_, #doc{id=IdB}}|_]=GroupB | RestB], AccOutGroups) ->
+merge_updates([[{_, {#doc{id=IdA},_}}|_]=GroupA | RestA],
+ [[{_, {#doc{id=IdB},_}}|_]=GroupB | RestB], AccOutGroups) ->
if IdA == IdB ->
merge_updates(RestA, RestB, [GroupA ++ GroupB | AccOutGroups]);
IdA < IdB ->
@@ -285,6 +285,7 @@ merge_updates([[{_, #doc{id=IdA}}|_]=GroupA | RestA],
merge_updates([GroupA | RestA], RestB, [GroupB | AccOutGroups])
end.
+
collect_updates(GroupedDocsAcc, ClientsAcc, MergeConflicts, FullCommit) ->
receive
% Only collect updates with the same MergeConflicts flag and without
@@ -531,9 +532,9 @@ flush_trees(#db{fd=Fd,header=Header}=Db,
flush_trees(Db, RestUnflushed, [InfoUnflushed#full_doc_info{rev_tree=Flushed} | AccFlushed]).
-send_result(Client, Id, OriginalRevs, NewResult) ->
+send_result(Client, Ref, NewResult) ->
% used to send a result to the client
- catch(Client ! {result, self(), {{Id, OriginalRevs}, NewResult}}).
+ catch(Client ! {result, self(), {Ref, NewResult}}).
merge_rev_trees(_Limit, _Merge, [], [], AccNewInfos, AccRemoveSeqs, AccSeq) ->
{ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccSeq};
@@ -542,12 +543,12 @@ merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList],
#full_doc_info{id=Id,rev_tree=OldTree,deleted=OldDeleted,update_seq=OldSeq}
= OldDocInfo,
NewRevTree = lists:foldl(
- fun({Client, #doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc}, AccTree) ->
+ fun({Client, {#doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc, Ref}}, AccTree) ->
if not MergeConflicts ->
case couch_key_tree:merge(AccTree, couch_db:doc_to_tree(NewDoc),
Limit) of
{_NewTree, conflicts} when (not OldDeleted) ->
- send_result(Client, Id, {Pos-1,PrevRevs}, conflict),
+ send_result(Client, Ref, conflict),
AccTree;
{NewTree, conflicts} when PrevRevs /= [] ->
% Check to be sure if prev revision was specified, it's
@@ -559,7 +560,7 @@ merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList],
if IsPrevLeaf ->
NewTree;
true ->
- send_result(Client, Id, {Pos-1,PrevRevs}, conflict),
+ send_result(Client, Ref, conflict),
AccTree
end;
{NewTree, no_conflicts} when AccTree == NewTree ->
@@ -578,11 +579,10 @@ merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList],
{NewTree2, _} = couch_key_tree:merge(AccTree,
couch_db:doc_to_tree(NewDoc2), Limit),
% we changed the rev id, this tells the caller we did
- send_result(Client, Id, {Pos-1,PrevRevs},
- {ok, {OldPos + 1, NewRevId}}),
+ send_result(Client, Ref, {ok, {OldPos + 1, NewRevId}}),
NewTree2;
true ->
- send_result(Client, Id, {Pos-1,PrevRevs}, conflict),
+ send_result(Client, Ref, conflict),
AccTree
end;
{NewTree, _} ->
@@ -630,7 +630,7 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
update_seq = LastSeq,
revs_limit = RevsLimit
} = Db,
- Ids = [Id || [{_Client, #doc{id=Id}}|_] <- DocsList],
+ Ids = [Id || [{_Client, {#doc{id=Id},_Ref}}|_] <- DocsList],
% lookup up the old documents, if they exist.
OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids),
OldDocInfos = lists:zipwith(
@@ -692,10 +692,10 @@ compute_data_sizes([FullDocInfo | RestDocInfos], Acc) ->
update_local_docs(Db, []) ->
{ok, Db};
update_local_docs(#db{local_tree=Btree}=Db, Docs) ->
- Ids = [Id || {_Client, #doc{id=Id}} <- Docs],
+ Ids = [Id || {_Client, {#doc{id=Id},_Ref}} <- Docs],
OldDocLookups = couch_btree:lookup(Btree, Ids),
BtreeEntries = lists:zipwith(
- fun({Client, #doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}},
+ fun({Client, {#doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body},Ref}},
_OldDocLookup) ->
case PrevRevs of
[RevStr|_] ->
@@ -713,14 +713,13 @@ update_local_docs(#db{local_tree=Btree}=Db, Docs) ->
% true ->
case Delete of
false ->
- send_result(Client, Id, {0, PrevRevs}, {ok,
+ send_result(Client, Ref, {ok,
{0, ?l2b(integer_to_list(PrevRev + 1))}}),
{update, {Id, {PrevRev + 1, Body}}};
true ->
- send_result(Client, Id, {0, PrevRevs},
- {ok, {0, <<"0">>}}),
+ send_result(Client, Ref, {ok, {0, <<"0">>}}),
{remove, Id}
- end%;
+ end
% false ->
% send_result(Client, Id, {0, PrevRevs}, conflict),
% ignore

0 comments on commit a0b67d8

Please sign in to comment.