Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
  • 5 commits
  • 3 files changed
  • 0 comments
  • 1 contributor
6 src/couchdb/couch_btree.erl
@@ -326,14 +326,16 @@ write_node(Bt, NodeType, NodeList) ->
326 326 % split up nodes into smaller sizes
327 327 NodeListList = chunkify(NodeList),
328 328 % now write out each chunk and return the KeyPointer pairs for those nodes
  329 + {ok, NodePointers} = couch_file:append_binaries(
  330 + Bt#btree.fd,
  331 + [term_to_binary({NodeType, ANodeList}) || ANodeList <- NodeListList]),
329 332 ResultList = [
330 333 begin
331   - {ok, Pointer} = couch_file:append_term(Bt#btree.fd, {NodeType, ANodeList}),
332 334 {LastKey, _} = lists:last(ANodeList),
333 335 {LastKey, {Pointer, reduce_node(Bt, NodeType, ANodeList)}}
334 336 end
335 337 ||
336   - ANodeList <- NodeListList
  338 + {ANodeList, Pointer} <- lists:zip(NodeListList, NodePointers)
337 339 ],
338 340 {ok, ResultList, Bt}.
339 341
116 src/couchdb/couch_db_updater.erl
@@ -446,63 +446,68 @@ refresh_validate_doc_funs(Db) ->
446 446
447 447 % rev tree functions
448 448
449   -flush_trees(_Db, [], AccFlushedTrees) ->
  449 +flush_trees(_SummaryPointers, [], AccFlushedTrees) ->
450 450 {ok, lists:reverse(AccFlushedTrees)};
451   -flush_trees(#db{updater_fd = Fd} = Db,
  451 +flush_trees(SummaryPointers,
452 452 [InfoUnflushed | RestUnflushed], AccFlushed) ->
453 453 #full_doc_info{update_seq=UpdateSeq, rev_tree=Unflushed} = InfoUnflushed,
454 454 Flushed = couch_key_tree:map(
455 455 fun(_Rev, Value) ->
456 456 case Value of
457   - #doc{atts=Atts,deleted=IsDeleted}=Doc ->
458   - % this node value is actually an unwritten document summary,
459   - % write to disk.
460   - % make sure the Fd in the written bins is the same Fd we are
461   - % and convert bins, removing the FD.
462   - % All bins should have been written to disk already.
463   - DiskAtts =
464   - case Atts of
465   - [] -> [];
466   - [#att{data={BinFd, _Sp}} | _ ] when BinFd == Fd ->
467   - [{N,T,P,AL,DL,R,M,E}
468   - || #att{name=N,type=T,data={_,P},md5=M,revpos=R,
469   - att_len=AL,disk_len=DL,encoding=E}
470   - <- Atts];
471   - _ ->
472   - % BinFd must not equal our Fd. This can happen when a database
473   - % is being switched out during a compaction
474   - ?LOG_DEBUG("File where the attachments are written has"
475   - " changed. Possibly retrying.", []),
476   - throw(retry)
477   - end,
478   - {ok, NewSummaryPointer} =
479   - couch_file:append_term_md5(Fd, {Doc#doc.body, DiskAtts}),
480   - {IsDeleted, NewSummaryPointer, UpdateSeq};
  457 + #doc{body = {summary, SummaryRef}, deleted = IsDeleted} ->
  458 + {IsDeleted, dict:fetch(SummaryRef, SummaryPointers), UpdateSeq};
481 459 _ ->
482 460 Value
483 461 end
484 462 end, Unflushed),
485   - flush_trees(Db, RestUnflushed, [InfoUnflushed#full_doc_info{rev_tree=Flushed} | AccFlushed]).
  463 + flush_trees(SummaryPointers, RestUnflushed,
  464 + [InfoUnflushed#full_doc_info{rev_tree=Flushed} | AccFlushed]).
  465 +
  466 +
  467 +doc_summary(#db{updater_fd = Fd}, #doc{atts = Atts, body = Body}) ->
  468 + DiskAtts = case Atts of
  469 + [] ->
  470 + [];
  471 + [#att{data = {Fd, _Sp}} | _ ] ->
  472 + [{N, T, P, AL, DL, R, M, E}
  473 + || #att{name = N, type = T, data = {_, P}, md5 = M, revpos = R,
  474 + att_len = AL, disk_len = DL, encoding = E} <- Atts];
  475 + _ ->
  476 + % Attachment flushed to the pre-compaction database file.
  477 + ?LOG_DEBUG("File where the attachments are written has"
  478 + " changed. Possibly retrying.", []),
  479 + throw(retry)
  480 + end,
  481 + {Body, DiskAtts}.
486 482
487 483
488 484 send_result(Client, Id, OriginalRevs, NewResult) ->
489 485 % used to send a result to the client
490 486 catch(Client ! {result, self(), {{Id, OriginalRevs}, NewResult}}).
491 487
492   -merge_rev_trees(_Limit, _Merge, [], [], AccNewInfos, AccRemoveSeqs, AccSeq) ->
493   - {ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccSeq};
494   -merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList],
495   - [OldDocInfo|RestOldInfo], AccNewInfos, AccRemoveSeqs, AccSeq) ->
  488 +merge_rev_trees(_Db, _Limit, _Merge, [], [],
  489 + AccNewInfos, AccRemoveSeqs, AccSeq, AccSummaries, AccSummaryRefs) ->
  490 + {ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccSeq,
  491 + AccSummaries, AccSummaryRefs};
  492 +merge_rev_trees(Db, Limit, MergeConflicts, [NewDocs|RestDocsList],
  493 + [OldDocInfo|RestOldInfo], AccNewInfos, AccRemoveSeqs, AccSeq,
  494 + AccSummaries, AccSummaryRefs) ->
496 495 #full_doc_info{id=Id,rev_tree=OldTree,deleted=OldDeleted,update_seq=OldSeq}
497 496 = OldDocInfo,
498   - NewRevTree = lists:foldl(
499   - fun({Client, #doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc}, AccTree) ->
  497 + {NewRevTree, NewSummaryBins, NewSummaryRefs} = lists:foldl(
  498 + fun({Client, #doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc1},
  499 + {AccTree, AccSumBins, AccSumRefs}) ->
  500 + DocSummary = doc_summary(Db, NewDoc1),
  501 + SummaryRef = make_ref(),
  502 + NewAccSumBins = [term_to_binary(DocSummary) | AccSumBins],
  503 + NewAccSumRefs = [SummaryRef | AccSumRefs],
  504 + NewDoc = NewDoc1#doc{body = {summary, SummaryRef}},
500 505 if not MergeConflicts ->
501 506 case couch_key_tree:merge(AccTree, couch_doc:to_path(NewDoc),
502 507 Limit) of
503 508 {_NewTree, conflicts} when (not OldDeleted) ->
504 509 send_result(Client, Id, {Pos-1,PrevRevs}, conflict),
505   - AccTree;
  510 + {AccTree, AccSumBins, AccSumRefs};
506 511 {NewTree, conflicts} when PrevRevs /= [] ->
507 512 % Check to be sure if prev revision was specified, it's
508 513 % a leaf node in the tree
@@ -511,10 +516,10 @@ merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList],
511 516 {LeafPos, LeafRevId} == {Pos-1, hd(PrevRevs)}
512 517 end, Leafs),
513 518 if IsPrevLeaf ->
514   - NewTree;
  519 + {NewTree, NewAccSumBins, NewAccSumRefs};
515 520 true ->
516 521 send_result(Client, Id, {Pos-1,PrevRevs}, conflict),
517   - AccTree
  522 + {AccTree, AccSumBins, AccSumRefs}
518 523 end;
519 524 {NewTree, no_conflicts} when AccTree == NewTree ->
520 525 % the tree didn't change at all
@@ -534,25 +539,25 @@ merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList],
534 539 % we changed the rev id, this tells the caller we did
535 540 send_result(Client, Id, {Pos-1,PrevRevs},
536 541 {ok, {OldPos + 1, NewRevId}}),
537   - NewTree2;
  542 + {NewTree2, NewAccSumBins, NewAccSumRefs};
538 543 true ->
539 544 send_result(Client, Id, {Pos-1,PrevRevs}, conflict),
540   - AccTree
  545 + {AccTree, AccSumBins, AccSumRefs}
541 546 end;
542 547 {NewTree, _} ->
543   - NewTree
  548 + {NewTree, NewAccSumBins, NewAccSumRefs}
544 549 end;
545 550 true ->
546 551 {NewTree, _} = couch_key_tree:merge(AccTree,
547 552 couch_doc:to_path(NewDoc), Limit),
548   - NewTree
  553 + {NewTree, NewAccSumBins, NewAccSumRefs}
549 554 end
550 555 end,
551   - OldTree, NewDocs),
  556 + {OldTree, AccSummaries, AccSummaryRefs}, NewDocs),
552 557 if NewRevTree == OldTree ->
553 558 % nothing changed
554   - merge_rev_trees(Limit, MergeConflicts, RestDocsList, RestOldInfo,
555   - AccNewInfos, AccRemoveSeqs, AccSeq);
  559 + merge_rev_trees(Db, Limit, MergeConflicts, RestDocsList, RestOldInfo,
  560 + AccNewInfos, AccRemoveSeqs, AccSeq, NewSummaryBins, NewSummaryRefs);
556 561 true ->
557 562 % we have updated the document, give it a new seq #
558 563 NewInfo = #full_doc_info{id=Id,update_seq=AccSeq+1,rev_tree=NewRevTree},
@@ -560,8 +565,9 @@ merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList],
560 565 0 -> AccRemoveSeqs;
561 566 _ -> [OldSeq | AccRemoveSeqs]
562 567 end,
563   - merge_rev_trees(Limit, MergeConflicts, RestDocsList, RestOldInfo,
564   - [NewInfo|AccNewInfos], RemoveSeqs, AccSeq+1)
  568 + merge_rev_trees(Db, Limit, MergeConflicts, RestDocsList, RestOldInfo,
  569 + [NewInfo|AccNewInfos], RemoveSeqs, AccSeq+1,
  570 + NewSummaryBins, NewSummaryRefs)
565 571 end.
566 572
567 573
@@ -585,7 +591,8 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
585 591 fulldocinfo_by_id_btree = DocInfoByIdBTree,
586 592 docinfo_by_seq_btree = DocInfoBySeqBTree,
587 593 update_seq = LastSeq,
588   - revs_limit = RevsLimit
  594 + revs_limit = RevsLimit,
  595 + updater_fd = Fd
589 596 } = Db,
590 597 Ids = [Id || [{_Client, #doc{id=Id}}|_] <- DocsList],
591 598 % lookup up the old documents, if they exist.
@@ -598,16 +605,23 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
598 605 end,
599 606 Ids, OldDocLookups),
600 607 % Merge the new docs into the revision trees.
601   - {ok, NewFullDocInfos, RemoveSeqs, NewSeq} = merge_rev_trees(RevsLimit,
602   - MergeConflicts, DocsList, OldDocInfos, [], [], LastSeq),
  608 + {ok, NewFullDocInfos, RemoveSeqs, NewSeq, DocSummaryBins, DocSummaryRefs} =
  609 + merge_rev_trees(Db, RevsLimit, MergeConflicts, DocsList, OldDocInfos,
  610 + [], [], LastSeq, [], []),
  611 +
  612 + % Write out the document summaries (the bodies are stored in the nodes of
  613 + % the trees, the attachments are already written to disk)
  614 + {ok, DocSummaryPointers} =
  615 + couch_file:append_binaries_md5(Fd, DocSummaryBins),
  616 + DocSummaryPointersDict = dict:from_list(
  617 + lists:zip(DocSummaryRefs, DocSummaryPointers)),
603 618
604 619 % All documents are now ready to write.
605 620
606 621 {ok, Db2} = update_local_docs(Db, NonRepDocs),
607 622
608   - % Write out the document summaries (the bodies are stored in the nodes of
609   - % the trees, the attachments are already written to disk)
610   - {ok, FlushedFullDocInfos} = flush_trees(Db2, NewFullDocInfos, []),
  623 + {ok, FlushedFullDocInfos} =
  624 + flush_trees(DocSummaryPointersDict, NewFullDocInfos, []),
611 625
612 626 {IndexFullDocInfos, IndexDocInfos} =
613 627 new_index_entries(FlushedFullDocInfos, [], []),
64 src/couchdb/couch_file.erl
@@ -26,6 +26,7 @@
26 26 -export([open/1, open/2, close/1, bytes/1, sync/1, truncate/2]).
27 27 -export([pread_term/2, pread_iolist/2, pread_binary/2]).
28 28 -export([append_binary/2, append_binary_md5/2]).
  29 +-export([append_binaries/2, append_binaries_md5/2]).
29 30 -export([append_term/2, append_term_md5/2]).
30 31 -export([write_header/2, read_header/1]).
31 32 -export([delete/2, delete/3, init_delete_dir/1]).
@@ -34,6 +35,33 @@
34 35 -export([init/1, terminate/2, code_change/3]).
35 36 -export([handle_call/3, handle_cast/2, handle_info/2]).
36 37
  38 +-export([test/2]).
  39 +
  40 +test(BinSize, NumBins) ->
  41 + Bins = [crypto:rand_bytes(BinSize) || _ <- lists:seq(1, NumBins)],
  42 + test_multi_writes(Bins),
  43 + test_batch_write(Bins).
  44 +
  45 +test_multi_writes(Bins) ->
  46 + {ok, F} = open("foo", [create, overwrite]),
  47 + T0 = erlang:now(),
  48 + [{ok, _} = append_binary_md5(F, Bin) || Bin <- Bins],
  49 + T1 = erlang:now(),
  50 + Diff = timer:now_diff(T1, T0),
  51 + close(F),
  52 + io:format("multi writes of ~p binaries, each of size ~p bytes, took ~pus~n",
  53 + [length(Bins), byte_size(hd(Bins)), Diff]).
  54 +
  55 +test_batch_write(Bins) ->
  56 + {ok, F} = open("bar", [create, overwrite]),
  57 + T0 = erlang:now(),
  58 + {ok, _} = append_binaries_md5(F, Bins),
  59 + T1 = erlang:now(),
  60 + Diff = timer:now_diff(T1, T0),
  61 + close(F),
  62 + io:format("batch write of ~p binaries, each of size ~p bytes, took ~pus~n",
  63 + [length(Bins), byte_size(hd(Bins)), Diff]).
  64 +
37 65 %%----------------------------------------------------------------------
38 66 %% Args: Valid Options are [create] and [create,overwrite].
39 67 %% Files are opened in read/write mode.
@@ -100,6 +128,25 @@ append_binary_md5(Fd, Bin) ->
100 128 gen_server:call(Fd, {append_bin,
101 129 [<<1:1/integer,Size:31/integer>>, couch_util:md5(Bin), Bin]}, infinity).
102 130
  131 +append_binaries(Fd, BinList) ->
  132 + BinList2 = [[<<0:1/integer, (iolist_size(Bin)):31/integer>>, Bin]
  133 + || Bin <- BinList],
  134 + case gen_server:call(Fd, {append_bin_list, BinList2}, infinity) of
  135 + {ok, RevPosList} ->
  136 + {ok, lists:reverse(RevPosList)};
  137 + Error ->
  138 + Error
  139 + end.
  140 +
  141 +append_binaries_md5(Fd, BinList) ->
  142 + BinList2 = [[<<1:1/integer, (iolist_size(Bin)):31/integer>>,
  143 + couch_util:md5(Bin), Bin] || Bin <- BinList],
  144 + case gen_server:call(Fd, {append_bin_list, BinList2}, infinity) of
  145 + {ok, RevPosList} ->
  146 + {ok, lists:reverse(RevPosList)};
  147 + Error ->
  148 + Error
  149 + end.
103 150
104 151 %%----------------------------------------------------------------------
105 152 %% Purpose: Reads a term from a file that was written with append_term
@@ -344,6 +391,23 @@ handle_call({append_bin, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
344 391 {reply, Error, File}
345 392 end;
346 393
  394 +handle_call({append_bin_list, BinList}, _From,
  395 + #file{fd = Fd, eof = Pos} = File) ->
  396 + {FinalEof, PosList, BlockList} = lists:foldl(
  397 + fun(Bin, {Eof, PosAcc, BinAcc}) ->
  398 + Blocks = make_blocks(Eof rem ?SIZE_BLOCK, Bin),
  399 + NextEof = Eof + iolist_size(Blocks),
  400 + {NextEof, [Eof | PosAcc], [Blocks | BinAcc]}
  401 + end,
  402 + {Pos, [], []},
  403 + BinList),
  404 + case file:write(Fd, lists:reverse(BlockList)) of
  405 + ok ->
  406 + {reply, {ok, PosList}, File#file{eof = FinalEof}};
  407 + Error ->
  408 + {reply, Error, File}
  409 + end;
  410 +
347 411 handle_call({write_header, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
348 412 BinSize = byte_size(Bin),
349 413 case Pos rem ?SIZE_BLOCK of

No commit comments for this range

Something went wrong with that request. Please try again.