Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

We’re showing branches in this repository, but you can also compare across forks.

base fork: apache/couchdb
base: master
...
head fork: fdmanana/couchdb
  • 5 commits
  • 3 files changed
  • 0 commit comments
  • 1 contributor
6 src/couchdb/couch_btree.erl
View
@@ -326,14 +326,16 @@ write_node(Bt, NodeType, NodeList) ->
% split up nodes into smaller sizes
NodeListList = chunkify(NodeList),
% now write out each chunk and return the KeyPointer pairs for those nodes
+ {ok, NodePointers} = couch_file:append_binaries(
+ Bt#btree.fd,
+ [term_to_binary({NodeType, ANodeList}) || ANodeList <- NodeListList]),
ResultList = [
begin
- {ok, Pointer} = couch_file:append_term(Bt#btree.fd, {NodeType, ANodeList}),
{LastKey, _} = lists:last(ANodeList),
{LastKey, {Pointer, reduce_node(Bt, NodeType, ANodeList)}}
end
||
- ANodeList <- NodeListList
+ {ANodeList, Pointer} <- lists:zip(NodeListList, NodePointers)
],
{ok, ResultList, Bt}.
116 src/couchdb/couch_db_updater.erl
View
@@ -446,63 +446,68 @@ refresh_validate_doc_funs(Db) ->
% rev tree functions
-flush_trees(_Db, [], AccFlushedTrees) ->
+flush_trees(_SummaryPointers, [], AccFlushedTrees) ->
{ok, lists:reverse(AccFlushedTrees)};
-flush_trees(#db{updater_fd = Fd} = Db,
+flush_trees(SummaryPointers,
[InfoUnflushed | RestUnflushed], AccFlushed) ->
#full_doc_info{update_seq=UpdateSeq, rev_tree=Unflushed} = InfoUnflushed,
Flushed = couch_key_tree:map(
fun(_Rev, Value) ->
case Value of
- #doc{atts=Atts,deleted=IsDeleted}=Doc ->
- % this node value is actually an unwritten document summary,
- % write to disk.
- % make sure the Fd in the written bins is the same Fd we are
- % and convert bins, removing the FD.
- % All bins should have been written to disk already.
- DiskAtts =
- case Atts of
- [] -> [];
- [#att{data={BinFd, _Sp}} | _ ] when BinFd == Fd ->
- [{N,T,P,AL,DL,R,M,E}
- || #att{name=N,type=T,data={_,P},md5=M,revpos=R,
- att_len=AL,disk_len=DL,encoding=E}
- <- Atts];
- _ ->
- % BinFd must not equal our Fd. This can happen when a database
- % is being switched out during a compaction
- ?LOG_DEBUG("File where the attachments are written has"
- " changed. Possibly retrying.", []),
- throw(retry)
- end,
- {ok, NewSummaryPointer} =
- couch_file:append_term_md5(Fd, {Doc#doc.body, DiskAtts}),
- {IsDeleted, NewSummaryPointer, UpdateSeq};
+ #doc{body = {summary, SummaryRef}, deleted = IsDeleted} ->
+ {IsDeleted, dict:fetch(SummaryRef, SummaryPointers), UpdateSeq};
_ ->
Value
end
end, Unflushed),
- flush_trees(Db, RestUnflushed, [InfoUnflushed#full_doc_info{rev_tree=Flushed} | AccFlushed]).
+ flush_trees(SummaryPointers, RestUnflushed,
+ [InfoUnflushed#full_doc_info{rev_tree=Flushed} | AccFlushed]).
+
+
+doc_summary(#db{updater_fd = Fd}, #doc{atts = Atts, body = Body}) ->
+ DiskAtts = case Atts of
+ [] ->
+ [];
+ [#att{data = {Fd, _Sp}} | _ ] ->
+ [{N, T, P, AL, DL, R, M, E}
+ || #att{name = N, type = T, data = {_, P}, md5 = M, revpos = R,
+ att_len = AL, disk_len = DL, encoding = E} <- Atts];
+ _ ->
+ % Attachment flushed to the pre-compaction database file.
+ ?LOG_DEBUG("File where the attachments are written has"
+ " changed. Possibly retrying.", []),
+ throw(retry)
+ end,
+ {Body, DiskAtts}.
send_result(Client, Id, OriginalRevs, NewResult) ->
% used to send a result to the client
catch(Client ! {result, self(), {{Id, OriginalRevs}, NewResult}}).
-merge_rev_trees(_Limit, _Merge, [], [], AccNewInfos, AccRemoveSeqs, AccSeq) ->
- {ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccSeq};
-merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList],
- [OldDocInfo|RestOldInfo], AccNewInfos, AccRemoveSeqs, AccSeq) ->
+merge_rev_trees(_Db, _Limit, _Merge, [], [],
+ AccNewInfos, AccRemoveSeqs, AccSeq, AccSummaries, AccSummaryRefs) ->
+ {ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccSeq,
+ AccSummaries, AccSummaryRefs};
+merge_rev_trees(Db, Limit, MergeConflicts, [NewDocs|RestDocsList],
+ [OldDocInfo|RestOldInfo], AccNewInfos, AccRemoveSeqs, AccSeq,
+ AccSummaries, AccSummaryRefs) ->
#full_doc_info{id=Id,rev_tree=OldTree,deleted=OldDeleted,update_seq=OldSeq}
= OldDocInfo,
- NewRevTree = lists:foldl(
- fun({Client, #doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc}, AccTree) ->
+ {NewRevTree, NewSummaryBins, NewSummaryRefs} = lists:foldl(
+ fun({Client, #doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc1},
+ {AccTree, AccSumBins, AccSumRefs}) ->
+ DocSummary = doc_summary(Db, NewDoc1),
+ SummaryRef = make_ref(),
+ NewAccSumBins = [term_to_binary(DocSummary) | AccSumBins],
+ NewAccSumRefs = [SummaryRef | AccSumRefs],
+ NewDoc = NewDoc1#doc{body = {summary, SummaryRef}},
if not MergeConflicts ->
case couch_key_tree:merge(AccTree, couch_doc:to_path(NewDoc),
Limit) of
{_NewTree, conflicts} when (not OldDeleted) ->
send_result(Client, Id, {Pos-1,PrevRevs}, conflict),
- AccTree;
+ {AccTree, AccSumBins, AccSumRefs};
{NewTree, conflicts} when PrevRevs /= [] ->
% Check to be sure if prev revision was specified, it's
% a leaf node in the tree
@@ -511,10 +516,10 @@ merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList],
{LeafPos, LeafRevId} == {Pos-1, hd(PrevRevs)}
end, Leafs),
if IsPrevLeaf ->
- NewTree;
+ {NewTree, NewAccSumBins, NewAccSumRefs};
true ->
send_result(Client, Id, {Pos-1,PrevRevs}, conflict),
- AccTree
+ {AccTree, AccSumBins, AccSumRefs}
end;
{NewTree, no_conflicts} when AccTree == NewTree ->
% the tree didn't change at all
@@ -534,25 +539,25 @@ merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList],
% we changed the rev id, this tells the caller we did
send_result(Client, Id, {Pos-1,PrevRevs},
{ok, {OldPos + 1, NewRevId}}),
- NewTree2;
+ {NewTree2, NewAccSumBins, NewAccSumRefs};
true ->
send_result(Client, Id, {Pos-1,PrevRevs}, conflict),
- AccTree
+ {AccTree, AccSumBins, AccSumRefs}
end;
{NewTree, _} ->
- NewTree
+ {NewTree, NewAccSumBins, NewAccSumRefs}
end;
true ->
{NewTree, _} = couch_key_tree:merge(AccTree,
couch_doc:to_path(NewDoc), Limit),
- NewTree
+ {NewTree, NewAccSumBins, NewAccSumRefs}
end
end,
- OldTree, NewDocs),
+ {OldTree, AccSummaries, AccSummaryRefs}, NewDocs),
if NewRevTree == OldTree ->
% nothing changed
- merge_rev_trees(Limit, MergeConflicts, RestDocsList, RestOldInfo,
- AccNewInfos, AccRemoveSeqs, AccSeq);
+ merge_rev_trees(Db, Limit, MergeConflicts, RestDocsList, RestOldInfo,
+ AccNewInfos, AccRemoveSeqs, AccSeq, NewSummaryBins, NewSummaryRefs);
true ->
% we have updated the document, give it a new seq #
NewInfo = #full_doc_info{id=Id,update_seq=AccSeq+1,rev_tree=NewRevTree},
@@ -560,8 +565,9 @@ merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList],
0 -> AccRemoveSeqs;
_ -> [OldSeq | AccRemoveSeqs]
end,
- merge_rev_trees(Limit, MergeConflicts, RestDocsList, RestOldInfo,
- [NewInfo|AccNewInfos], RemoveSeqs, AccSeq+1)
+ merge_rev_trees(Db, Limit, MergeConflicts, RestDocsList, RestOldInfo,
+ [NewInfo|AccNewInfos], RemoveSeqs, AccSeq+1,
+ NewSummaryBins, NewSummaryRefs)
end.
@@ -585,7 +591,8 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
fulldocinfo_by_id_btree = DocInfoByIdBTree,
docinfo_by_seq_btree = DocInfoBySeqBTree,
update_seq = LastSeq,
- revs_limit = RevsLimit
+ revs_limit = RevsLimit,
+ updater_fd = Fd
} = Db,
Ids = [Id || [{_Client, #doc{id=Id}}|_] <- DocsList],
% lookup up the old documents, if they exist.
@@ -598,16 +605,23 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
end,
Ids, OldDocLookups),
% Merge the new docs into the revision trees.
- {ok, NewFullDocInfos, RemoveSeqs, NewSeq} = merge_rev_trees(RevsLimit,
- MergeConflicts, DocsList, OldDocInfos, [], [], LastSeq),
+ {ok, NewFullDocInfos, RemoveSeqs, NewSeq, DocSummaryBins, DocSummaryRefs} =
+ merge_rev_trees(Db, RevsLimit, MergeConflicts, DocsList, OldDocInfos,
+ [], [], LastSeq, [], []),
+
+ % Write out the document summaries (the bodies are stored in the nodes of
+ % the trees, the attachments are already written to disk)
+ {ok, DocSummaryPointers} =
+ couch_file:append_binaries_md5(Fd, DocSummaryBins),
+ DocSummaryPointersDict = dict:from_list(
+ lists:zip(DocSummaryRefs, DocSummaryPointers)),
% All documents are now ready to write.
{ok, Db2} = update_local_docs(Db, NonRepDocs),
- % Write out the document summaries (the bodies are stored in the nodes of
- % the trees, the attachments are already written to disk)
- {ok, FlushedFullDocInfos} = flush_trees(Db2, NewFullDocInfos, []),
+ {ok, FlushedFullDocInfos} =
+ flush_trees(DocSummaryPointersDict, NewFullDocInfos, []),
{IndexFullDocInfos, IndexDocInfos} =
new_index_entries(FlushedFullDocInfos, [], []),
64 src/couchdb/couch_file.erl
View
@@ -26,6 +26,7 @@
-export([open/1, open/2, close/1, bytes/1, sync/1, truncate/2]).
-export([pread_term/2, pread_iolist/2, pread_binary/2]).
-export([append_binary/2, append_binary_md5/2]).
+-export([append_binaries/2, append_binaries_md5/2]).
-export([append_term/2, append_term_md5/2]).
-export([write_header/2, read_header/1]).
-export([delete/2, delete/3, init_delete_dir/1]).
@@ -34,6 +35,33 @@
-export([init/1, terminate/2, code_change/3]).
-export([handle_call/3, handle_cast/2, handle_info/2]).
+-export([test/2]).
+
+test(BinSize, NumBins) ->
+ Bins = [crypto:rand_bytes(BinSize) || _ <- lists:seq(1, NumBins)],
+ test_multi_writes(Bins),
+ test_batch_write(Bins).
+
+test_multi_writes(Bins) ->
+ {ok, F} = open("foo", [create, overwrite]),
+ T0 = erlang:now(),
+ [{ok, _} = append_binary_md5(F, Bin) || Bin <- Bins],
+ T1 = erlang:now(),
+ Diff = timer:now_diff(T1, T0),
+ close(F),
+ io:format("multi writes of ~p binaries, each of size ~p bytes, took ~pus~n",
+ [length(Bins), byte_size(hd(Bins)), Diff]).
+
+test_batch_write(Bins) ->
+ {ok, F} = open("bar", [create, overwrite]),
+ T0 = erlang:now(),
+ {ok, _} = append_binaries_md5(F, Bins),
+ T1 = erlang:now(),
+ Diff = timer:now_diff(T1, T0),
+ close(F),
+ io:format("batch write of ~p binaries, each of size ~p bytes, took ~pus~n",
+ [length(Bins), byte_size(hd(Bins)), Diff]).
+
%%----------------------------------------------------------------------
%% Args: Valid Options are [create] and [create,overwrite].
%% Files are opened in read/write mode.
@@ -100,6 +128,25 @@ append_binary_md5(Fd, Bin) ->
gen_server:call(Fd, {append_bin,
[<<1:1/integer,Size:31/integer>>, couch_util:md5(Bin), Bin]}, infinity).
+append_binaries(Fd, BinList) ->
+ BinList2 = [[<<0:1/integer, (iolist_size(Bin)):31/integer>>, Bin]
+ || Bin <- BinList],
+ case gen_server:call(Fd, {append_bin_list, BinList2}, infinity) of
+ {ok, RevPosList} ->
+ {ok, lists:reverse(RevPosList)};
+ Error ->
+ Error
+ end.
+
+append_binaries_md5(Fd, BinList) ->
+ BinList2 = [[<<1:1/integer, (iolist_size(Bin)):31/integer>>,
+ couch_util:md5(Bin), Bin] || Bin <- BinList],
+ case gen_server:call(Fd, {append_bin_list, BinList2}, infinity) of
+ {ok, RevPosList} ->
+ {ok, lists:reverse(RevPosList)};
+ Error ->
+ Error
+ end.
%%----------------------------------------------------------------------
%% Purpose: Reads a term from a file that was written with append_term
@@ -344,6 +391,23 @@ handle_call({append_bin, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
{reply, Error, File}
end;
+handle_call({append_bin_list, BinList}, _From,
+ #file{fd = Fd, eof = Pos} = File) ->
+ {FinalEof, PosList, BlockList} = lists:foldl(
+ fun(Bin, {Eof, PosAcc, BinAcc}) ->
+ Blocks = make_blocks(Eof rem ?SIZE_BLOCK, Bin),
+ NextEof = Eof + iolist_size(Blocks),
+ {NextEof, [Eof | PosAcc], [Blocks | BinAcc]}
+ end,
+ {Pos, [], []},
+ BinList),
+ case file:write(Fd, lists:reverse(BlockList)) of
+ ok ->
+ {reply, {ok, PosList}, File#file{eof = FinalEof}};
+ Error ->
+ {reply, Error, File}
+ end;
+
handle_call({write_header, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
BinSize = byte_size(Bin),
case Pos rem ?SIZE_BLOCK of

No commit comments for this range

Something went wrong with that request. Please try again.