Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Deterninistic rev ids now working, but before mergng to trunk need to…

… make some optimizations with revid storage to make them smaller, and also add integrity md5 checking for disk document bodies.

git-svn-id: https://svn.apache.org/repos/asf/couchdb/branches/md5_revids@791973 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
commit 432110b7a9e6f97d94465b049f95fd873ff6a7f6 1 parent b6b9b98
Damien F. Katz authored
View
2  share/www/script/test/bulk_docs.js
@@ -96,5 +96,5 @@ couchTests.bulk_docs = function(debug) {
update = {"_id": newdoc._id, "_rev": newdoc._rev, "body": "blam"};
torem = {"_id": newdoc._id, "_rev": newdoc._rev, "_deleted": true};
results = db.bulkSave([update, torem]);
- T(results[1].error == "conflict");
+ T(results[0].error == "conflict" || results[1].error == "conflict");
};
View
8 share/www/script/test/recreate_doc.js
@@ -29,11 +29,7 @@ couchTests.recreate_doc = function(debug) {
T(db.save(doc).ok);
doc = db.open("foo");
doc.a = "baz";
- try {
- T(db.save(doc).ok);
- } finally {
- // And now, we can't even delete the document anymore :/
- T(db.deleteDoc(doc).rev != undefined);
- }
+ T(db.save(doc).ok);
+ T(db.deleteDoc(doc).rev != undefined);
}
};
View
89 src/couchdb/couch_db.erl
@@ -295,7 +295,7 @@ validate_doc_update(#db{name=DbName,user_ctx=Ctx}=Db, Doc, GetDiskDocFun) ->
end.
-prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, [_NewRev|PrevRevs]}}=Doc,
+prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, [NewRev|PrevRevs]}}=Doc,
OldFullDocInfo, LeafRevsDict) ->
case PrevRevs of
[PrevRev|_] ->
@@ -315,9 +315,13 @@ prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, [_NewRev|PrevRevs]}}=Doc
end;
[] ->
% new doc, and we have existing revs.
+ % reuse existing deleted doc
if OldFullDocInfo#full_doc_info.deleted ->
% existing docs are deletions
- {validate_doc_update(Db, Doc, fun() -> nil end), Doc};
+ #doc_info{revs=[#rev_info{rev={Pos, DelRevId}}|_]} =
+ couch_doc:to_doc_info(OldFullDocInfo),
+ Doc2 = Doc#doc{revs={Pos+1, [NewRev, DelRevId]}},
+ {validate_doc_update(Db, Doc2, fun() -> nil end), Doc2};
true ->
{conflict, Doc}
end
@@ -467,7 +471,7 @@ update_docs(Db, Docs, Options, replicated_changes) ->
DocErrors = [],
DocBuckets3 = DocBuckets
end,
- {ok, []} = write_and_commit(Db, DocBuckets3, [merge_conflicts | Options]),
+ {ok, []} = write_and_commit(Db, DocBuckets3, [], [merge_conflicts | Options]),
{ok, DocErrors};
update_docs(Db, Docs, Options, interactive_edit) ->
@@ -475,16 +479,41 @@ update_docs(Db, Docs, Options, interactive_edit) ->
AllOrNothing = lists:member(all_or_nothing, Options),
% go ahead and generate the new revision ids for the documents.
Docs2 = lists:map(
- fun(#doc{id=Id,revs={Start, RevIds}}=Doc) ->
+ fun(#doc{id=Id,revs={Start, RevIds}, attachments=Atts, body=Body,
+ deleted=Deleted}=Doc) ->
case Id of
<<?LOCAL_DOC_PREFIX, _/binary>> ->
- Rev = case RevIds of [] -> 0; [Rev0|_] -> list_to_integer(?b2l(Rev0)) end,
- Doc#doc{revs={Start, [?l2b(integer_to_list(Rev + 1))]}};
+ RevId = case RevIds of
+ [] -> 0;
+ [Rev0|_] -> list_to_integer(?b2l(Rev0))
+ end,
+ Doc#doc{revs={Start, [?l2b(integer_to_list(RevId + 1))]}};
_ ->
- Doc#doc{revs={Start+1, [?l2b(integer_to_list(couch_util:rand32())) | RevIds]}}
+ NewRevId =
+ case [{Name, Type, Md5} || {Name,{Type,{_,_,_,Md5}}}
+ <- Atts, Md5 /= <<>>] of
+ Atts2 when length(Atts) /= length(Atts2) ->
+ % We must have old style non-md5 attachments
+ ?l2b(integer_to_list(couch_util:rand32()));
+ Atts2 ->
+ NewMd5 = erlang:md5(term_to_binary([Deleted, Body, Atts2])),
+ ?l2b(couch_util:to_hex(NewMd5))
+ end,
+ Doc#doc{revs={Start+1, [NewRevId | RevIds]}}
end
end, Docs),
- DocBuckets = group_alike_docs(Docs2),
+ % separate out the NonRep documents from the rest of the documents
+ {Docs3, NonRepDocs} = lists:foldl(
+ fun(#doc{id=Id}=Doc, {DocsAcc, NonRepDocsAcc}) ->
+ case Id of
+ <<?LOCAL_DOC_PREFIX, _/binary>> ->
+ {DocsAcc, [Doc | NonRepDocsAcc]};
+ Id->
+ {[Doc | DocsAcc], NonRepDocsAcc}
+ end
+ end, {[], []}, Docs2),
+
+ DocBuckets = group_alike_docs(Docs3),
case (Db#db.validate_doc_funs /= []) orelse
lists:any(
@@ -492,7 +521,7 @@ update_docs(Db, Docs, Options, interactive_edit) ->
true;
(#doc{attachments=Atts}) ->
Atts /= []
- end, Docs) of
+ end, Docs3) of
true ->
% lookup the doc by id and get the most recent
Ids = [Id || [#doc{id=Id}|_] <- DocBuckets],
@@ -519,7 +548,7 @@ update_docs(Db, Docs, Options, interactive_edit) ->
true ->
Options2 = if AllOrNothing -> [merge_conflicts];
true -> [] end ++ Options,
- {ok, CommitFailures} = write_and_commit(Db, DocBuckets2, Options2),
+ {ok, CommitFailures} = write_and_commit(Db, DocBuckets2, NonRepDocs, Options2),
FailDict = dict:from_list(CommitFailures ++ Failures),
% the output for each is either {ok, NewRev} or Error
{ok, lists:map(
@@ -545,11 +574,12 @@ make_first_doc_on_disk(Db, Id, Pos, [{_Rev, {IsDel, Sp, _Seq}} |_]=DocPath) ->
write_and_commit(#db{update_pid=UpdatePid, user_ctx=Ctx}=Db, DocBuckets,
- Options) ->
+ NonRepDocs, Options) ->
% flush unwritten binaries to disk.
DocBuckets2 = [[doc_flush_binaries(Doc, Db#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets],
- case gen_server:call(UpdatePid, {update_docs, DocBuckets2, Options}, infinity) of
- {ok, Conflicts} -> {ok, Conflicts};
+ case gen_server:call(UpdatePid,
+ {update_docs, DocBuckets2, NonRepDocs, Options}, infinity) of
+ {ok, Failures} -> {ok, Failures};
retry ->
% This can happen if the db file we wrote to was swapped out by
% compaction. Retry by reopening the db and writing to the current file
@@ -557,8 +587,8 @@ write_and_commit(#db{update_pid=UpdatePid, user_ctx=Ctx}=Db, DocBuckets,
DocBuckets3 = [[doc_flush_binaries(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets],
% We only retry once
close(Db2),
- case gen_server:call(UpdatePid, {update_docs, DocBuckets3, Options}, infinity) of
- {ok, Conflicts} -> {ok, Conflicts};
+ case gen_server:call(UpdatePid, {update_docs, DocBuckets3, NonRepDocs, Options}, infinity) of
+ {ok, Failures} -> {ok, Failures};
retry -> throw({update_error, compaction_retry})
end
end.
@@ -572,19 +602,21 @@ doc_flush_binaries(Doc, Fd) ->
end, Doc#doc.attachments),
Doc#doc{attachments = NewAttachments}.
-flush_binary(Fd, {Fd0, StreamPointer, Len}) when Fd0 == Fd ->
+flush_binary(Fd, {Fd0, StreamPointer, Len, Md5}) when Fd0 == Fd ->
% already written to our file, nothing to write
- {Fd, StreamPointer, Len};
-
+ {Fd, StreamPointer, Len, Md5};
flush_binary(Fd, {OtherFd, StreamPointer, Len}) when is_tuple(StreamPointer) ->
- {NewStreamData, Len} =
+ {NewStreamData, Len, Md5} =
couch_stream:old_copy_to_new_stream(OtherFd, StreamPointer, Len, Fd),
- {Fd, NewStreamData, Len};
-
+ {Fd, NewStreamData, Len, Md5};
flush_binary(Fd, {OtherFd, StreamPointer, Len}) ->
- {NewStreamData, Len} =
+ {NewStreamData, Len, Md5} =
+ couch_stream:copy_to_new_stream(OtherFd, StreamPointer, Fd),
+ {Fd, NewStreamData, Len, Md5};
+flush_binary(Fd, {OtherFd, StreamPointer, Len, Md5}) ->
+ {NewStreamData, Len, Md5} =
couch_stream:copy_to_new_stream(OtherFd, StreamPointer, Fd),
- {Fd, NewStreamData, Len};
+ {Fd, NewStreamData, Len, Md5};
flush_binary(Fd, Bin) when is_binary(Bin) ->
with_stream(Fd, fun(OutputStream) ->
@@ -615,8 +647,8 @@ flush_binary(Fd, {Fun, Len}) when is_function(Fun) ->
with_stream(Fd, Fun) ->
{ok, OutputStream} = couch_stream:open(Fd),
Fun(OutputStream),
- {StreamInfo, Len} = couch_stream:close(OutputStream),
- {Fd, StreamInfo, Len}.
+ {StreamInfo, Len, Md5} = couch_stream:close(OutputStream),
+ {Fd, StreamInfo, Len, Md5}.
write_streamed_attachment(_Stream, _F, 0) ->
@@ -858,7 +890,12 @@ make_doc(#db{fd=Fd}, Id, Deleted, Bp, RevisionPath) ->
_ ->
{ok, {BodyData0, BinValues0}} = read_doc(Fd, Bp),
{BodyData0,
- [{Name,{Type,{Fd,Sp,Len}}} || {Name,{Type,Sp,Len}} <- BinValues0]}
+ lists:map(
+ fun({Name,{Type,Sp,Len,Md5}}) ->
+ {Name,{Type,{Fd,Sp,Len,Md5}}};
+ ({Name,{Type,Sp,Len}}) ->
+ {Name,{Type,{Fd,Sp,Len,<<>>}}}
+ end, BinValues0)}
end,
#doc{
id = Id,
View
76 src/couchdb/couch_db_updater.erl
@@ -44,12 +44,15 @@ terminate(Reason, _Srv) ->
handle_call(get_db, _From, Db) ->
{reply, {ok, Db}, Db};
-handle_call({update_docs, DocActions, Options}, _From, Db) ->
- try update_docs_int(Db, DocActions, Options) of
- {ok, Conflicts, Db2} ->
+handle_call({update_docs, GroupedDocs, NonRepDocs, Options}, _From, Db) ->
+ try update_docs_int(Db, GroupedDocs, NonRepDocs, Options) of
+ {ok, Failures, Db2} ->
ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}),
- couch_db_update_notifier:notify({updated, Db2#db.name}),
- {reply, {ok, Conflicts}, Db2}
+ if Db2#db.update_seq /= Db#db.update_seq ->
+ couch_db_update_notifier:notify({updated, Db2#db.name});
+ true -> ok
+ end,
+ {reply, {ok, Failures}, Db2}
catch
throw: retry ->
{reply, retry, Db}
@@ -372,15 +375,15 @@ flush_trees(#db{fd=Fd}=Db, [InfoUnflushed | RestUnflushed], AccFlushed) ->
#doc{attachments=Atts,deleted=IsDeleted}=Doc ->
% this node value is actually an unwritten document summary,
% write to disk.
- % make sure the Fd in the written bins is the same Fd we are.
+ % make sure the Fd in the written bins is the same Fd we are
+ % and convert bins, removing the FD.
+ % All bins should have been written to disk already.
Bins =
case Atts of
[] -> [];
- [{_BName, {_Type, {BinFd, _Sp, _Len}}} | _ ] when BinFd == Fd ->
- % convert bins, removing the FD.
- % All bins should have been flushed to disk already.
- [{BinName, {BinType, BinSp, BinLen}}
- || {BinName, {BinType, {_Fd, BinSp, BinLen}}}
+ [{_BName, {_Type, {BinFd, _Sp, _Len, _Md5}}} | _ ] when BinFd == Fd ->
+ [{BinName, {BinType, BinSp, BinLen, Md5}}
+ || {BinName, {BinType, {_Fd, BinSp, BinLen, Md5}}}
<- Atts];
_ ->
% BinFd must not equal our Fd. This can happen when a database
@@ -408,6 +411,26 @@ merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList],
{_NewTree, conflicts}
when (not OldDeleted) and (not MergeConflicts) ->
{AccTree, [{{Id, {Pos,Rev}}, conflict} | AccConflicts2]};
+ {NewTree, no_conflicts}
+ when (not MergeConflicts) andalso (AccTree == NewTree) ->
+ % the tree didn't change at all
+ % meaning we are saving a rev that's already
+ % been editted again.
+ if (Pos == 1) and OldDeleted ->
+ % this means we are recreating a brand new document
+ % into a state that already existed before.
+ % put the rev into a subsequent edit of the deletion
+ #doc_info{revs=[#rev_info{rev={OldPos,OldRev}}|_]} =
+ couch_doc:to_doc_info(OldDocInfo),
+ NewDoc2 = NewDoc#doc{revs={OldPos + 1, [Rev, OldRev]}},
+ {NewTree2, _} = couch_key_tree:merge(AccTree,
+ [couch_db:doc_to_tree(NewDoc2)]),
+ % we changed the rev id, this tells the caller we did.
+ {NewTree2, [{{Id, {Pos,Rev}}, {ok, {OldPos + 1, Rev}}}
+ | AccConflicts2]};
+ true ->
+ {AccTree, [{{Id, {Pos,Rev}}, conflict} | AccConflicts2]}
+ end;
{NewTree, _} ->
{NewTree, AccConflicts2}
end
@@ -444,26 +467,13 @@ stem_full_doc_infos(#db{revs_limit=Limit}, DocInfos) ->
[Info#full_doc_info{rev_tree=couch_key_tree:stem(Tree, Limit)} ||
#full_doc_info{rev_tree=Tree}=Info <- DocInfos].
-
-update_docs_int(Db, DocsList, Options) ->
+update_docs_int(Db, DocsList, NonRepDocs, Options) ->
#db{
fulldocinfo_by_id_btree = DocInfoByIdBTree,
docinfo_by_seq_btree = DocInfoBySeqBTree,
update_seq = LastSeq
} = Db,
- % separate out the NonRep documents from the rest of the documents
- {DocsList2, NonRepDocs} = lists:foldl(
- fun([#doc{id=Id}=Doc | _]=Docs, {DocsListAcc, NonRepDocsAcc}) ->
- case Id of
- <<?LOCAL_DOC_PREFIX, _/binary>> ->
- {DocsListAcc, [Doc | NonRepDocsAcc]};
- Id->
- {[Docs | DocsListAcc], NonRepDocsAcc}
- end
- end, {[], []}, DocsList),
-
- Ids = [Id || [#doc{id=Id}|_] <- DocsList2],
-
+ Ids = [Id || [#doc{id=Id}|_] <- DocsList],
% lookup up the old documents, if they exist.
OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids),
OldDocInfos = lists:zipwith(
@@ -477,7 +487,7 @@ update_docs_int(Db, DocsList, Options) ->
% Merge the new docs into the revision trees.
{ok, NewDocInfos0, RemoveSeqs, Conflicts, NewSeq} = merge_rev_trees(
lists:member(merge_conflicts, Options),
- DocsList2, OldDocInfos, [], [], [], LastSeq),
+ DocsList, OldDocInfos, [], [], [], LastSeq),
NewFullDocInfos = stem_full_doc_infos(Db, NewDocInfos0),
@@ -598,13 +608,13 @@ copy_doc_attachments(SrcFd, SrcSp, DestFd) ->
{ok, {BodyData, BinInfos}} = couch_db:read_doc(SrcFd, SrcSp),
% copy the bin values
NewBinInfos = lists:map(
- fun({Name, {Type, BinSp, Len}}) when is_tuple(BinSp) orelse BinSp == null ->
+ fun({Name, {Type, BinSp, Len, <<>>}}) when is_tuple(BinSp) orelse BinSp == null ->
% 09 UPGRADE CODE
- {NewBinSp, Len} = couch_stream:old_copy_to_new_stream(SrcFd, BinSp, Len, DestFd),
- {Name, {Type, NewBinSp, Len}};
- ({Name, {Type, BinSp, Len}}) ->
- {NewBinSp, Len} = couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
- {Name, {Type, NewBinSp, Len}}
+ {NewBinSp, Len, Md5} = couch_stream:old_copy_to_new_stream(SrcFd, BinSp, Len, DestFd),
+ {Name, {Type, NewBinSp, Len, Md5}};
+ ({Name, {Type, BinSp, Len, Md5}}) ->
+ {NewBinSp, Len, Md5} = couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
+ {Name, {Type, NewBinSp, Len, Md5}}
end, BinInfos),
{BodyData, NewBinInfos}.
View
17 src/couchdb/couch_doc.erl
@@ -15,7 +15,7 @@
-export([to_doc_info/1,to_doc_info_path/1,parse_rev/1,parse_revs/1,rev_to_str/1,rev_to_strs/1]).
-export([bin_foldl/3,bin_size/1,bin_to_binary/1,get_validate_doc_fun/1]).
-export([from_json_obj/1,to_json_obj/2,has_stubs/1, merge_stubs/2]).
--export([validate_docid/1]).
+-export([validate_docid/1,bin_md5/1]).
-include("couch_db.hrl").
@@ -255,20 +255,27 @@ to_doc_info_path(#full_doc_info{id=Id,rev_tree=Tree}) ->
bin_foldl(Bin, Fun, Acc) when is_binary(Bin) ->
Fun(Bin, Acc);
-bin_foldl({Fd, Sp, Len}, Fun, Acc) when is_tuple(Sp) orelse Sp == null ->
+bin_foldl({Fd, Sp, Len, _Md5}, Fun, Acc) when is_tuple(Sp) orelse Sp == null ->
% 09 UPGRADE CODE
couch_stream:old_foldl(Fd, Sp, Len, Fun, Acc);
-bin_foldl({Fd, Sp, _Len}, Fun, Acc) ->
+bin_foldl({Fd, Sp, _Len, _Md5}, Fun, Acc) ->
couch_stream:foldl(Fd, Sp, Fun, Acc).
bin_size(Bin) when is_binary(Bin) ->
size(Bin);
-bin_size({_Fd, _Sp, Len}) ->
+bin_size({_Fd, _Sp, Len, _Md5}) ->
Len.
+
+bin_md5(Bin) when is_binary(Bin) ->
+ {ok, erlang:md5(Bin)};
+bin_md5({_Fd, _Sp, _Len, <<>>}) ->
+ none;
+bin_md5({_Fd, _Sp, _Len, Md5}) ->
+ {ok, Md5}.
bin_to_binary(Bin) when is_binary(Bin) ->
Bin;
-bin_to_binary({Fd, Sp, _Len}) ->
+bin_to_binary({Fd, Sp, _Len, _Md5}) ->
couch_stream:foldl(Fd, Sp, fun(Bin, Acc) -> [Bin|Acc] end, []).
get_validate_doc_fun(#doc{body={Props}}) ->
View
16 src/couchdb/couch_httpd_db.erl
@@ -580,13 +580,15 @@ db_doc_req(#httpd{method='GET'}=Req, Db, DocId) ->
[] ->
Doc = couch_doc_open(Db, DocId, Rev, Options),
DiskEtag = couch_httpd:doc_etag(Doc),
- couch_httpd:etag_respond(Req, DiskEtag, fun() ->
- Headers = case Doc#doc.meta of
- [] -> [{"Etag", DiskEtag}]; % output etag only when we have no meta
- _ -> []
- end,
- send_json(Req, 200, Headers, couch_doc:to_json_obj(Doc, Options))
- end);
+ case Doc#doc.meta of
+ [] ->
+ % output etag only when we have no meta
+ couch_httpd:etag_respond(Req, DiskEtag, fun() ->
+ send_json(Req, 200, [{"Etag", DiskEtag}], couch_doc:to_json_obj(Doc, Options))
+ end);
+ _ ->
+ send_json(Req, 200, [], couch_doc:to_json_obj(Doc, Options))
+ end;
_ ->
{ok, Results} = couch_db:open_doc_revs(Db, DocId, Revs, Options),
{ok, Resp} = start_json_response(Req, 200),
View
26 src/couchdb/couch_stream.erl
@@ -37,7 +37,8 @@
buffer_list = [],
buffer_len = 0,
max_buffer = 4096,
- written_len = 0
+ written_len = 0,
+ md5
}).
@@ -86,7 +87,7 @@ write(Pid, Bin) ->
init(Fd) ->
- {ok, #stream{fd = Fd}}.
+ {ok, #stream{fd=Fd, md5=erlang:md5_init()}}.
terminate(_Reason, _Stream) ->
ok.
@@ -99,14 +100,18 @@ handle_call({write, Bin}, _From, Stream) ->
written_pointers = Written,
buffer_len = BufferLen,
buffer_list = Buffer,
- max_buffer = Max} = Stream,
+ max_buffer = Max,
+ md5 = Md5} = Stream,
if BinSize + BufferLen > Max ->
- {ok, Pos} = couch_file:append_binary(Fd, lists:reverse(Buffer, [Bin])),
+ WriteBin = lists:reverse(Buffer, [Bin]),
+ Md5_2 = erlang:md5_update(Md5, WriteBin),
+ {ok, Pos} = couch_file:append_binary(Fd, WriteBin),
{reply, ok, Stream#stream{
written_len=WrittenLen + BufferLen + BinSize,
written_pointers=[Pos|Written],
buffer_list=[],
- buffer_len=0}};
+ buffer_len=0,
+ md5=Md5_2}};
true ->
{reply, ok, Stream#stream{
buffer_list=[Bin|Buffer],
@@ -118,14 +123,17 @@ handle_call(close, _From, Stream) ->
written_len = WrittenLen,
written_pointers = Written,
buffer_len = BufferLen,
- buffer_list = Buffer} = Stream,
+ buffer_list = Buffer,
+ md5 = Md5} = Stream,
case Buffer of
[] ->
- Result = {lists:reverse(Written), WrittenLen};
+ Result = {lists:reverse(Written), WrittenLen, erlang:md5_final(Md5)};
_ ->
- {ok, Pos} = couch_file:append_binary(Fd, lists:reverse(Buffer)),
- Result = {lists:reverse(Written, [Pos]), WrittenLen + BufferLen}
+ WriteBin = lists:reverse(Buffer),
+ Md5Final = erlang:md5_final(erlang:md5_update(Md5, WriteBin)),
+ {ok, Pos} = couch_file:append_binary(Fd, WriteBin),
+ Result = {lists:reverse(Written, [Pos]), WrittenLen + BufferLen, Md5Final}
end,
{stop, normal, Result, Stream}.
Please sign in to comment.
Something went wrong with that request. Please try again.