Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

12540 post dup docs2 #59

Open
wants to merge 2 commits into from

1 participant

@bdionne

alternative approach using references. This relates to couchdb-911

Bob Dionne added some commits
Bob Dionne Format code, fix whitespace cbb4bd7
Bob Dionne Add references to docs to prevent dups from being collapsed
bulk docs may have duplicates or multiple
docs with the same id. These were being
identified and collapsed in a dictionary as
messages were passed from merge_rev_trees to
couch_db collect_results. Attaching a
reference to each doc allows them to be
processed correctly.

Jira-911
BugzID:12540
a0b67d8
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Sep 2, 2011
  1. Format code, fix whitespace

    Bob Dionne authored
  2. Add references to docs to prevent dups from being collapsed

    Bob Dionne authored
    bulk docs may have duplicates or multiple
    docs with the same id. These were being
    identified and collapsed in a dictionary as
    messages were passed from merge_rev_trees to
    couch_db collect_results. Attaching a
    reference to each doc allows them to be
    processed correctly.
    
    Jira-911
    BugzID:12540
This page is out of date. Refresh to see the latest.
Showing with 96 additions and 86 deletions.
  1. +80 −69 apps/couch/src/couch_db.erl
  2. +16 −17 apps/couch/src/couch_db_updater.erl
View
149 apps/couch/src/couch_db.erl
@@ -73,16 +73,16 @@ open_int(DbName, Options) ->
% it ensures that the http userCtx is a valid reader
open(DbName, Options) ->
case couch_server:open(DbName, Options) of
- {ok, Db} ->
- try
- check_is_reader(Db),
- {ok, Db}
- catch
- throw:Error ->
- close(Db),
- throw(Error)
- end;
- Else -> Else
+ {ok, Db} ->
+ try
+ check_is_reader(Db),
+ {ok, Db}
+ catch
+ throw:Error ->
+ close(Db),
+ throw(Error)
+ end;
+ Else -> Else
end.
reopen(#db{main_pid = Pid, fd = Fd, fd_monitor = OldRef, user_ctx = UserCtx}) ->
@@ -154,7 +154,7 @@ apply_open_options({ok, Doc},Options) ->
apply_open_options2(Doc,Options);
apply_open_options(Else,_Options) ->
Else.
-
+
apply_open_options2(Doc,[]) ->
{ok, Doc};
apply_open_options2(#doc{atts=Atts,revs=Revs}=Doc,
@@ -421,22 +421,22 @@ update_docs(Db, Docs) ->
% group_alike_docs groups the sorted documents into sublist buckets, by id.
% ([DocA, DocA, DocB, DocC], []) -> [[DocA, DocA], [DocB], [DocC]]
group_alike_docs(Docs) ->
- Sorted = lists:sort(fun(#doc{id=A},#doc{id=B})-> A < B end, Docs),
+ Sorted = lists:sort(fun({#doc{id=A},_},{#doc{id=B},_})-> A < B end, Docs),
group_alike_docs(Sorted, []).
group_alike_docs([], Buckets) ->
lists:reverse(Buckets);
group_alike_docs([Doc|Rest], []) ->
group_alike_docs(Rest, [[Doc]]);
-group_alike_docs([Doc|Rest], [Bucket|RestBuckets]) ->
- [#doc{id=BucketId}|_] = Bucket,
+group_alike_docs([{Doc,Ref}|Rest], [Bucket|RestBuckets]) ->
+ [{#doc{id=BucketId},_Ref}|_] = Bucket,
case Doc#doc.id == BucketId of
true ->
% add to existing bucket
- group_alike_docs(Rest, [[Doc|Bucket]|RestBuckets]);
+ group_alike_docs(Rest, [[{Doc,Ref}|Bucket]|RestBuckets]);
false ->
% add to new bucket
- group_alike_docs(Rest, [[Doc]|[Bucket|RestBuckets]])
+ group_alike_docs(Rest, [[{Doc,Ref}]|[Bucket|RestBuckets]])
end.
validate_doc_update(#db{}=Db, #doc{id= <<"_design/",_/binary>>}, _GetDiskDocFun) ->
@@ -523,10 +523,8 @@ prep_and_validate_updates(_Db, [], [], _AllowConflict, AccPrepped,
{AccPrepped, AccFatalErrors};
prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups],
AllowConflict, AccPrepped, AccErrors) ->
- [#doc{id=Id}|_]=DocBucket,
- % no existing revs are known,
{PreppedBucket, AccErrors3} = lists:foldl(
- fun(#doc{revs=Revs}=Doc, {AccBucket, AccErrors2}) ->
+ fun({#doc{revs=Revs}=Doc, Ref}, {AccBucket, AccErrors2}) ->
case couch_doc:has_stubs(Doc) of
true ->
couch_doc:merge_stubs(Doc, #doc{}); % will throw exception
@@ -536,13 +534,13 @@ prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups],
{0, []} ->
case validate_doc_update(Db, Doc, fun() -> nil end) of
ok ->
- {[Doc | AccBucket], AccErrors2};
+ {[{Doc, Ref} | AccBucket], AccErrors2};
Error ->
- {AccBucket, [{{Id, {0, []}}, Error} | AccErrors2]}
+ {AccBucket, [{Ref, Error} | AccErrors2]}
end;
_ ->
% old revs specified but none exist, a conflict
- {AccBucket, [{{Id, Revs}, conflict} | AccErrors2]}
+ {AccBucket, [{Ref, conflict} | AccErrors2]}
end
end,
{[], AccErrors}, DocBucket),
@@ -556,14 +554,14 @@ prep_and_validate_updates(Db, [DocBucket|RestBuckets],
LeafRevsDict = dict:from_list([{{Start, RevId}, {Del, Ptr, Revs}} ||
{#leaf{deleted=Del, ptr=Ptr}, {Start, [RevId|_]}=Revs} <- Leafs]),
{PreppedBucket, AccErrors3} = lists:foldl(
- fun(Doc, {Docs2Acc, AccErrors2}) ->
+ fun({Doc,Ref}, {Docs2Acc, AccErrors2}) ->
case prep_and_validate_update(Db, Doc, OldFullDocInfo,
LeafRevsDict, AllowConflict) of
{ok, Doc2} ->
- {[Doc2 | Docs2Acc], AccErrors2};
- {Error, #doc{id=Id,revs=Revs}} ->
+ {[{Doc2, Ref} | Docs2Acc], AccErrors2};
+ {Error, #doc{}} ->
% Record the error
- {Docs2Acc, [{{Id, Revs}, Error} |AccErrors2]}
+ {Docs2Acc, [{Ref, Error} |AccErrors2]}
end
end,
{[], AccErrors}, DocBucket),
@@ -583,7 +581,7 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI
case OldInfo of
not_found ->
{ValidatedBucket, AccErrors3} = lists:foldl(
- fun(Doc, {AccPrepped2, AccErrors2}) ->
+ fun({Doc,Ref}, {AccPrepped2, AccErrors2}) ->
case couch_doc:has_stubs(Doc) of
true ->
couch_doc:merge_stubs(Doc, #doc{}); % will throw exception
@@ -591,7 +589,7 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI
end,
case validate_doc_update(Db, Doc, fun() -> nil end) of
ok ->
- {[Doc | AccPrepped2], AccErrors2};
+ {[{Doc,Ref} | AccPrepped2], AccErrors2};
Error ->
{AccPrepped2, [{Doc, Error} | AccErrors2]}
end
@@ -600,7 +598,7 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI
prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo, [ValidatedBucket | AccPrepped], AccErrors3);
{ok, #full_doc_info{rev_tree=OldTree}} ->
NewRevTree = lists:foldl(
- fun(NewDoc, AccTree) ->
+ fun({NewDoc,_Ref}, AccTree) ->
{NewTree, _} = couch_key_tree:merge(AccTree,
couch_db:doc_to_tree(NewDoc), Db#db.revs_limit),
NewTree
@@ -610,16 +608,16 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI
LeafRevsFullDict = dict:from_list( [{{Start, RevId}, FullPath} || {Start, [{RevId, _}|_]}=FullPath <- Leafs]),
{ValidatedBucket, AccErrors3} =
lists:foldl(
- fun(#doc{id=Id,revs={Pos, [RevId|_]}}=Doc, {AccValidated, AccErrors2}) ->
+ fun({#doc{id=Id,revs={Pos, [RevId|_]}}=Doc, Ref}, {AccValidated, AccErrors2}) ->
case dict:find({Pos, RevId}, LeafRevsFullDict) of
{ok, {Start, Path}} ->
% our unflushed doc is a leaf node. Go back on the path
% to find the previous rev that's on disk.
-
+
LoadPrevRevFun = fun() ->
make_first_doc_on_disk(Db,Id,Start-1, tl(Path))
end,
-
+
case couch_doc:has_stubs(Doc) of
true ->
DiskDoc = LoadPrevRevFun(),
@@ -629,10 +627,10 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI
Doc2 = Doc,
GetDiskDocFun = LoadPrevRevFun
end,
-
+
case validate_doc_update(Db, Doc2, GetDiskDocFun) of
ok ->
- {[Doc2 | AccValidated], AccErrors2};
+ {[{Doc2, Ref} | AccValidated], AccErrors2};
Error ->
{AccValidated, [{Doc, Error} | AccErrors2]}
end;
@@ -664,10 +662,10 @@ new_revs([], OutBuckets, IdRevsAcc) ->
{lists:reverse(OutBuckets), IdRevsAcc};
new_revs([Bucket|RestBuckets], OutBuckets, IdRevsAcc) ->
{NewBucket, IdRevsAcc3} = lists:mapfoldl(
- fun(#doc{id=Id,revs={Start, RevIds}}=Doc, IdRevsAcc2)->
+ fun({#doc{revs={Start, RevIds}}=Doc, Ref}, IdRevsAcc2)->
NewRevId = new_revid(Doc),
- {Doc#doc{revs={Start+1, [NewRevId | RevIds]}},
- [{{Id, {Start, RevIds}}, {ok, {Start+1, NewRevId}}} | IdRevsAcc2]}
+ {{Doc#doc{revs={Start+1, [NewRevId | RevIds]}}, Ref},
+ [{Ref, {ok, {Start+1, NewRevId}}} | IdRevsAcc2]}
end, IdRevsAcc, Bucket),
new_revs(RestBuckets, [NewBucket|OutBuckets], IdRevsAcc3).
@@ -686,16 +684,21 @@ check_dup_atts2(_) ->
update_docs(Db, Docs, Options, replicated_changes) ->
increment_stat(Db, {couchdb, database_writes}),
- DocBuckets = group_alike_docs(Docs),
+
+ % associate reference with each doc in order to track duplicates
+ Docs2 = lists:map(fun(Doc) ->
+ {Doc, make_ref()}
+ end,Docs),
+ DocBuckets = group_alike_docs(Docs2),
case (Db#db.validate_doc_funs /= []) orelse
lists:any(
- fun(#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>}) -> true;
- (#doc{atts=Atts}) ->
+ fun({#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>},_Ref}) -> true;
+ ({#doc{atts=Atts},_Ref}) ->
Atts /= []
- end, Docs) of
+ end, Docs2) of
true ->
- Ids = [Id || [#doc{id=Id}|_] <- DocBuckets],
+ Ids = [Id || [{#doc{id=Id}, _Ref}|_] <- DocBuckets],
ExistingDocs = get_full_doc_infos(Db, Ids),
{DocBuckets2, DocErrors} =
@@ -705,8 +708,8 @@ update_docs(Db, Docs, Options, replicated_changes) ->
DocErrors = [],
DocBuckets3 = DocBuckets
end,
- DocBuckets4 = [[doc_flush_atts(check_dup_atts(Doc), Db#db.fd)
- || Doc <- Bucket] || Bucket <- DocBuckets3],
+ DocBuckets4 = [[{doc_flush_atts(check_dup_atts(Doc), Db#db.fd), Ref}
+ || {Doc, Ref} <- Bucket] || Bucket <- DocBuckets3],
{ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts | Options]),
{ok, DocErrors};
@@ -715,28 +718,33 @@ update_docs(Db, Docs, Options, interactive_edit) ->
AllOrNothing = lists:member(all_or_nothing, Options),
% go ahead and generate the new revision ids for the documents.
% separate out the NonRep documents from the rest of the documents
- {Docs2, NonRepDocs} = lists:foldl(
- fun(#doc{id=Id}=Doc, {DocsAcc, NonRepDocsAcc}) ->
+
+ % associate reference with each doc in order to track duplicates
+ Docs2 = lists:map(fun(Doc) ->
+ {Doc, make_ref()}
+ end,Docs),
+ {Docs3, NonRepDocs} = lists:foldl(
+ fun({#doc{id=Id},_Ref}=Doc, {DocsAcc, NonRepDocsAcc}) ->
case Id of
<<?LOCAL_DOC_PREFIX, _/binary>> ->
{DocsAcc, [Doc | NonRepDocsAcc]};
Id->
{[Doc | DocsAcc], NonRepDocsAcc}
end
- end, {[], []}, Docs),
-
- DocBuckets = group_alike_docs(Docs2),
+ end, {[], []}, Docs2),
+
+ DocBuckets = group_alike_docs(Docs3),
case (Db#db.validate_doc_funs /= []) orelse
lists:any(
- fun(#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>}) ->
+ fun({#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>},_Ref}) ->
true;
- (#doc{atts=Atts}) ->
+ ({#doc{atts=Atts},_Ref}) ->
Atts /= []
- end, Docs2) of
+ end, Docs3) of
true ->
% lookup the doc by id and get the most recent
- Ids = [Id || [#doc{id=Id}|_] <- DocBuckets],
+ Ids = [Id || [{#doc{id=Id}, _Ref}|_] <- DocBuckets],
ExistingDocInfos = get_full_doc_infos(Db, Ids),
{DocBucketsPrepped, PreCommitFailures} = prep_and_validate_updates(Db,
@@ -750,29 +758,32 @@ update_docs(Db, Docs, Options, interactive_edit) ->
end,
if (AllOrNothing) and (PreCommitFailures /= []) ->
- {aborted, lists:map(
- fun({{Id,{Pos, [RevId|_]}}, Error}) ->
- {{Id, {Pos, RevId}}, Error};
- ({{Id,{0, []}}, Error}) ->
- {{Id, {0, <<>>}}, Error}
- end, PreCommitFailures)};
+ {aborted,
+ lists:foldl(fun({#doc{id=Id,revs={Pos, RevIds}}, Ref},Acc) ->
+ case lists:keyfind(Ref,1,PreCommitFailures) of
+ {Ref, Error} ->
+ [{{Id,{Pos,RevIds}}, Error} | Acc];
+ false ->
+ Acc
+ end
+ end,[],Docs3)};
true ->
Options2 = if AllOrNothing -> [merge_conflicts];
true -> [] end ++ Options,
DocBuckets3 = [[
- doc_flush_atts(set_new_att_revpos(
- check_dup_atts(Doc)), Db#db.fd)
- || Doc <- B] || B <- DocBuckets2],
+ {doc_flush_atts(set_new_att_revpos(
+ check_dup_atts(Doc)), Db#db.fd), Ref}
+ || {Doc, Ref} <- B] || B <- DocBuckets2],
{DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []),
-
+
{ok, CommitResults} = write_and_commit(Db, DocBuckets4, NonRepDocs, Options2),
-
+
ResultsDict = dict:from_list(IdRevs ++ CommitResults ++ PreCommitFailures),
{ok, lists:map(
- fun(#doc{id=Id,revs={Pos, RevIds}}) ->
- {ok, Result} = dict:find({Id, {Pos, RevIds}}, ResultsDict),
+ fun({#doc{}, Ref}) ->
+ {ok, Result} = dict:find(Ref, ResultsDict),
Result
- end, Docs)}
+ end, Docs2)}
end.
% Returns the first available document on disk. Input list is a full rev path
@@ -831,7 +842,7 @@ write_and_commit(#db{main_pid=Pid, user_ctx=Ctx}=Db, DocBuckets,
% This can happen if the db file we wrote to was swapped out by
% compaction. Retry by reopening the db and writing to the current file
{ok, Db2} = open(Db#db.name, [{user_ctx, Ctx}]),
- DocBuckets2 = [[doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets],
+ DocBuckets2 = [[{doc_flush_atts(Doc, Db2#db.fd), Ref} || {Doc,Ref} <- Bucket] || Bucket <- DocBuckets],
% We only retry once
close(Db2),
Pid ! {update_docs, self(), DocBuckets2, NonRepDocs, MergeConflicts, FullCommit},
@@ -852,7 +863,7 @@ set_new_att_revpos(#doc{revs={RevPos,_Revs},atts=Atts}=Doc) ->
(Att) ->
Att#att{revpos=RevPos+1}
end, Atts)}.
-
+
doc_flush_atts(Doc, Fd) ->
Doc#doc{atts=[flush_att(Fd, Att) || Att <- Doc#doc.atts]}.
View
33 apps/couch/src/couch_db_updater.erl
@@ -275,8 +275,8 @@ merge_updates([], RestB, AccOutGroups) ->
lists:reverse(AccOutGroups, RestB);
merge_updates(RestA, [], AccOutGroups) ->
lists:reverse(AccOutGroups, RestA);
-merge_updates([[{_, #doc{id=IdA}}|_]=GroupA | RestA],
- [[{_, #doc{id=IdB}}|_]=GroupB | RestB], AccOutGroups) ->
+merge_updates([[{_, {#doc{id=IdA},_}}|_]=GroupA | RestA],
+ [[{_, {#doc{id=IdB},_}}|_]=GroupB | RestB], AccOutGroups) ->
if IdA == IdB ->
merge_updates(RestA, RestB, [GroupA ++ GroupB | AccOutGroups]);
IdA < IdB ->
@@ -285,6 +285,7 @@ merge_updates([[{_, #doc{id=IdA}}|_]=GroupA | RestA],
merge_updates([GroupA | RestA], RestB, [GroupB | AccOutGroups])
end.
+
collect_updates(GroupedDocsAcc, ClientsAcc, MergeConflicts, FullCommit) ->
receive
% Only collect updates with the same MergeConflicts flag and without
@@ -531,9 +532,9 @@ flush_trees(#db{fd=Fd,header=Header}=Db,
flush_trees(Db, RestUnflushed, [InfoUnflushed#full_doc_info{rev_tree=Flushed} | AccFlushed]).
-send_result(Client, Id, OriginalRevs, NewResult) ->
+send_result(Client, Ref, NewResult) ->
% used to send a result to the client
- catch(Client ! {result, self(), {{Id, OriginalRevs}, NewResult}}).
+ catch(Client ! {result, self(), {Ref, NewResult}}).
merge_rev_trees(_Limit, _Merge, [], [], AccNewInfos, AccRemoveSeqs, AccSeq) ->
{ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccSeq};
@@ -542,12 +543,12 @@ merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList],
#full_doc_info{id=Id,rev_tree=OldTree,deleted=OldDeleted,update_seq=OldSeq}
= OldDocInfo,
NewRevTree = lists:foldl(
- fun({Client, #doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc}, AccTree) ->
+ fun({Client, {#doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc, Ref}}, AccTree) ->
if not MergeConflicts ->
case couch_key_tree:merge(AccTree, couch_db:doc_to_tree(NewDoc),
Limit) of
{_NewTree, conflicts} when (not OldDeleted) ->
- send_result(Client, Id, {Pos-1,PrevRevs}, conflict),
+ send_result(Client, Ref, conflict),
AccTree;
{NewTree, conflicts} when PrevRevs /= [] ->
% Check to be sure if prev revision was specified, it's
@@ -559,7 +560,7 @@ merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList],
if IsPrevLeaf ->
NewTree;
true ->
- send_result(Client, Id, {Pos-1,PrevRevs}, conflict),
+ send_result(Client, Ref, conflict),
AccTree
end;
{NewTree, no_conflicts} when AccTree == NewTree ->
@@ -578,11 +579,10 @@ merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList],
{NewTree2, _} = couch_key_tree:merge(AccTree,
couch_db:doc_to_tree(NewDoc2), Limit),
% we changed the rev id, this tells the caller we did
- send_result(Client, Id, {Pos-1,PrevRevs},
- {ok, {OldPos + 1, NewRevId}}),
+ send_result(Client, Ref, {ok, {OldPos + 1, NewRevId}}),
NewTree2;
true ->
- send_result(Client, Id, {Pos-1,PrevRevs}, conflict),
+ send_result(Client, Ref, conflict),
AccTree
end;
{NewTree, _} ->
@@ -630,7 +630,7 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
update_seq = LastSeq,
revs_limit = RevsLimit
} = Db,
- Ids = [Id || [{_Client, #doc{id=Id}}|_] <- DocsList],
+ Ids = [Id || [{_Client, {#doc{id=Id},_Ref}}|_] <- DocsList],
% lookup up the old documents, if they exist.
OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids),
OldDocInfos = lists:zipwith(
@@ -692,10 +692,10 @@ compute_data_sizes([FullDocInfo | RestDocInfos], Acc) ->
update_local_docs(Db, []) ->
{ok, Db};
update_local_docs(#db{local_tree=Btree}=Db, Docs) ->
- Ids = [Id || {_Client, #doc{id=Id}} <- Docs],
+ Ids = [Id || {_Client, {#doc{id=Id},_Ref}} <- Docs],
OldDocLookups = couch_btree:lookup(Btree, Ids),
BtreeEntries = lists:zipwith(
- fun({Client, #doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}},
+ fun({Client, {#doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body},Ref}},
_OldDocLookup) ->
case PrevRevs of
[RevStr|_] ->
@@ -713,14 +713,13 @@ update_local_docs(#db{local_tree=Btree}=Db, Docs) ->
% true ->
case Delete of
false ->
- send_result(Client, Id, {0, PrevRevs}, {ok,
+ send_result(Client, Ref, {ok,
{0, ?l2b(integer_to_list(PrevRev + 1))}}),
{update, {Id, {PrevRev + 1, Body}}};
true ->
- send_result(Client, Id, {0, PrevRevs},
- {ok, {0, <<"0">>}}),
+ send_result(Client, Ref, {ok, {0, <<"0">>}}),
{remove, Id}
- end%;
+ end
% false ->
% send_result(Client, Id, {0, PrevRevs}, conflict),
% ignore
Something went wrong with that request. Please try again.