From e4bdf2c3d031e38adffe5da27b9a5029130a28a5 Mon Sep 17 00:00:00 2001 From: Filipe David Manana Date: Sat, 4 Dec 2010 16:35:13 +0000 Subject: [PATCH] Add dedicated couch_file to the DB updater process. --- src/couchdb/couch_btree.erl | 10 +----- src/couchdb/couch_db.erl | 55 +++++++++++++++++++----------- src/couchdb/couch_db.hrl | 11 +++++- src/couchdb/couch_db_updater.erl | 47 +++++++++++++++++--------- src/couchdb/couch_file.erl | 57 ++++++++++++++++---------------- 5 files changed, 106 insertions(+), 74 deletions(-) diff --git a/src/couchdb/couch_btree.erl b/src/couchdb/couch_btree.erl index c63cd8cfdec..c9a6dc75128 100644 --- a/src/couchdb/couch_btree.erl +++ b/src/couchdb/couch_btree.erl @@ -16,17 +16,9 @@ -export([fold/4, full_reduce/1, final_reduce/2, foldl/3, foldl/4]). -export([fold_reduce/4, lookup/2, get_state/1, set_options/2]). +-include("couch_db.hrl"). -define(CHUNK_THRESHOLD, 16#4ff). --record(btree, - {fd, - root, - extract_kv = fun({Key, Value}) -> {Key, Value} end, - assemble_kv = fun(Key, Value) -> {Key, Value} end, - less = fun(A, B) -> A < B end, - reduce = nil - }). - extract(#btree{extract_kv=Extract}, Value) -> Extract(Value). diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index 31948af66a9..13345c9e055 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -223,7 +223,7 @@ get_full_doc_info(Db, Id) -> Result. get_full_doc_infos(Db, Ids) -> - couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids). + couch_btree:lookup(by_id_btree(Db), Ids). increment_update_seq(#db{update_pid=UpdatePid}) -> gen_server:call(UpdatePid, increment_update_seq). @@ -251,11 +251,10 @@ get_db_info(Db) -> compactor_pid=Compactor, update_seq=SeqNum, name=Name, - fulldocinfo_by_id_btree=FullDocBtree, instance_start_time=StartTime, committed_update_seq=CommittedUpdateSeq} = Db, {ok, Size} = couch_file:bytes(Fd), - {ok, {Count, DelCount}} = couch_btree:full_reduce(FullDocBtree), + {ok, {Count, DelCount}} = couch_btree:full_reduce(by_id_btree(Db)), InfoList = [ {db_name, Name}, {doc_count, Count}, @@ -270,8 +269,8 @@ get_db_info(Db) -> ], {ok, InfoList}. -get_design_docs(#db{fulldocinfo_by_id_btree=Btree}=Db) -> - {ok,_, Docs} = couch_btree:fold(Btree, +get_design_docs(Db) -> + {ok,_, Docs} = couch_btree:fold(by_id_btree(Db), fun(#full_doc_info{id= <<"_design/",_/binary>>}=FullDocInfo, _Reds, AccDocs) -> {ok, Doc} = couch_db:open_doc_int(Db, FullDocInfo, []), {ok, [Doc | AccDocs]}; @@ -662,7 +661,7 @@ update_docs(Db, Docs, Options, replicated_changes) -> DocErrors = [], DocBuckets3 = DocBuckets end, - DocBuckets4 = [[doc_flush_atts(check_dup_atts(Doc), Db#db.fd) + DocBuckets4 = [[doc_flush_atts(check_dup_atts(Doc), Db#db.updater_fd) || Doc <- Bucket] || Bucket <- DocBuckets3], {ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts | Options]), {ok, DocErrors}; @@ -718,7 +717,7 @@ update_docs(Db, Docs, Options, interactive_edit) -> true -> [] end ++ Options, DocBuckets3 = [[ doc_flush_atts(set_new_att_revpos( - check_dup_atts(Doc)), Db#db.fd) + check_dup_atts(Doc)), Db#db.updater_fd) || Doc <- B] || B <- DocBuckets2], {DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []), @@ -786,7 +785,10 @@ write_and_commit(#db{update_pid=UpdatePid}=Db, DocBuckets, % This can happen if the db file we wrote to was swapped out by % compaction. Retry by reopening the db and writing to the current file {ok, Db2} = open_ref_counted(Db#db.main_pid, self()), - DocBuckets2 = [[doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets], + DocBuckets2 = [ + [doc_flush_atts(Doc, Db2#db.updater_fd) || Doc <- Bucket] || + Bucket <- DocBuckets + ], % We only retry once close(Db2), UpdatePid ! {update_docs, self(), DocBuckets2, NonRepDocs, MergeConflicts, FullCommit}, @@ -954,25 +956,28 @@ changes_since(Db, Style, StartSeq, Fun, Options, Acc) -> end, Fun(DocInfo2, Acc2) end, - {ok, _LastReduction, AccOut} = couch_btree:fold(Db#db.docinfo_by_seq_btree, + {ok, _LastReduction, AccOut} = couch_btree:fold(by_seq_btree(Db), Wrapper, Acc, [{start_key, StartSeq + 1}] ++ Options), {ok, AccOut}. count_changes_since(Db, SinceSeq) -> + BTree = by_seq_btree(Db), {ok, Changes} = - couch_btree:fold_reduce(Db#db.docinfo_by_seq_btree, + couch_btree:fold_reduce(BTree, fun(_SeqStart, PartialReds, 0) -> - {ok, couch_btree:final_reduce(Db#db.docinfo_by_seq_btree, PartialReds)} + {ok, couch_btree:final_reduce(BTree, PartialReds)} end, 0, [{start_key, SinceSeq + 1}]), Changes. enum_docs_since(Db, SinceSeq, InFun, Acc, Options) -> - {ok, LastReduction, AccOut} = couch_btree:fold(Db#db.docinfo_by_seq_btree, InFun, Acc, [{start_key, SinceSeq + 1} | Options]), + {ok, LastReduction, AccOut} = couch_btree:fold( + by_seq_btree(Db), InFun, Acc, [{start_key, SinceSeq + 1} | Options]), {ok, enum_docs_since_reduce_to_count(LastReduction), AccOut}. enum_docs(Db, InFun, InAcc, Options) -> - {ok, LastReduce, OutAcc} = couch_btree:fold(Db#db.fulldocinfo_by_id_btree, InFun, InAcc, Options), + {ok, LastReduce, OutAcc} = couch_btree:fold( + by_id_btree(Db), InFun, InAcc, Options), {ok, enum_docs_reduce_to_count(LastReduce), OutAcc}. % server functions @@ -1072,7 +1077,7 @@ open_doc_revs_int(Db, IdRevs, Options) -> IdRevs, LookupResults). open_doc_int(Db, <> = Id, _Options) -> - case couch_btree:lookup(Db#db.local_docs_btree, [Id]) of + case couch_btree:lookup(local_btree(Db), [Id]) of [{ok, {_, {Rev, BodyData}}}] -> {ok, #doc{id=Id, revs={0, [list_to_binary(integer_to_list(Rev))]}, body=BodyData}}; [not_found] -> @@ -1151,7 +1156,7 @@ doc_to_tree_simple(Doc, [RevId | Rest]) -> [{RevId, ?REV_MISSING, doc_to_tree_simple(Doc, Rest)}]. -make_doc(#db{fd=Fd}=Db, Id, Deleted, Bp, RevisionPath) -> +make_doc(#db{updater_fd = Fd} = Db, Id, Deleted, Bp, RevisionPath) -> {BodyData, Atts} = case Bp of nil -> @@ -1207,7 +1212,19 @@ make_doc(#db{fd=Fd}=Db, Id, Deleted, Bp, RevisionPath) -> }. -increment_stat(#db{is_sys_db = true}, _Stat) -> - ok; -increment_stat(#db{}, Stat) -> - couch_stats_collector:increment(Stat). +increment_stat(#db{options = Options}, Stat) -> + case lists:member(sys_db, Options) of + true -> + ok; + false -> + couch_stats_collector:increment(Stat) + end. + +local_btree(#db{local_docs_btree = BTree, fd = ReaderFd}) -> + BTree#btree{fd = ReaderFd}. + +by_seq_btree(#db{docinfo_by_seq_btree = BTree, fd = ReaderFd}) -> + BTree#btree{fd = ReaderFd}. + +by_id_btree(#db{fulldocinfo_by_id_btree = BTree, fd = ReaderFd}) -> + BTree#btree{fd = ReaderFd}. diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl index 74d2c6301aa..c857b7ae725 100644 --- a/src/couchdb/couch_db.hrl +++ b/src/couchdb/couch_db.hrl @@ -158,6 +158,7 @@ compactor_pid = nil, instance_start_time, % number of microsecs since jan 1 1970 as a binary string fd, + updater_fd, fd_ref_counter, header = #db_header{}, committed_update_seq, @@ -174,7 +175,7 @@ waiting_delayed_commit = nil, revs_limit = 1000, fsync_options = [], - is_sys_db = false + options = [] }). @@ -297,3 +298,11 @@ db_open_options = [] }). +-record(btree, { + fd, + root, + extract_kv = fun({_Key, _Value} = KV) -> KV end, + assemble_kv = fun(Key, Value) -> {Key, Value} end, + less = fun(A, B) -> A < B end, + reduce = nil +}). diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl index e5c6019a24f..55f2df7a6e8 100644 --- a/src/couchdb/couch_db_updater.erl +++ b/src/couchdb/couch_db_updater.erl @@ -42,13 +42,14 @@ init({MainPid, DbName, Filepath, Fd, Options}) -> file:delete(Filepath ++ ".compact") end end, - - Db = init_db(DbName, Filepath, Fd, Header), + ReaderFd = open_reader_fd(Filepath, Options), + Db = init_db(DbName, Filepath, Fd, ReaderFd, Header, Options), Db2 = refresh_validate_doc_funs(Db), - {ok, Db2#db{main_pid = MainPid, is_sys_db = lists:member(sys_db, Options)}}. + {ok, Db2#db{main_pid = MainPid}}. terminate(_Reason, Db) -> + couch_file:close(Db#db.updater_fd), couch_file:close(Db#db.fd), couch_util:shutdown_sync(Db#db.compactor_pid), couch_util:shutdown_sync(Db#db.fd_ref_counter), @@ -67,7 +68,7 @@ handle_call(increment_update_seq, _From, Db) -> {reply, {ok, Db2#db.update_seq}, Db2}; handle_call({set_security, NewSec}, _From, Db) -> - {ok, Ptr} = couch_file:append_term(Db#db.fd, NewSec), + {ok, Ptr} = couch_file:append_term(Db#db.updater_fd, NewSec), Db2 = commit_data(Db#db{security=NewSec, security_ptr=Ptr, update_seq=Db#db.update_seq+1}), ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}), @@ -84,7 +85,7 @@ handle_call({purge_docs, _IdRevs}, _From, {reply, {error, purge_during_compaction}, Db}; handle_call({purge_docs, IdRevs}, _From, Db) -> #db{ - fd=Fd, + updater_fd = Fd, fulldocinfo_by_id_btree = DocInfoByIdBTree, docinfo_by_seq_btree = DocInfoBySeqBTree, update_seq = LastSeq, @@ -161,9 +162,10 @@ handle_call(start_compact, _From, Db) -> handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) -> {ok, NewFd} = couch_file:open(CompactFilepath), + ReaderFd = open_reader_fd(CompactFilepath, Db#db.options), {ok, NewHeader} = couch_file:read_header(NewFd), #db{update_seq=NewSeq} = NewDb = - init_db(Db#db.name, Filepath, NewFd, NewHeader), + init_db(Db#db.name, Filepath, NewFd, ReaderFd, NewHeader, Db#db.options), unlink(NewFd), case Db#db.update_seq == NewSeq of true -> @@ -362,7 +364,7 @@ simple_upgrade_record(Old, New) when tuple_size(Old) < tuple_size(New) -> list_to_tuple(tuple_to_list(Old) ++ NewValuesTail). -init_db(DbName, Filepath, Fd, Header0) -> +init_db(DbName, Filepath, Fd, ReaderFd, Header0, Options) -> Header1 = simple_upgrade_record(Header0, #db_header{}), Header = case element(2, Header1) of @@ -403,10 +405,11 @@ init_db(DbName, Filepath, Fd, Header0) -> {MegaSecs, Secs, MicroSecs} = now(), StartTime = ?l2b(io_lib:format("~p", [(MegaSecs*1000000*1000000) + (Secs*1000000) + MicroSecs])), - {ok, RefCntr} = couch_ref_counter:start([Fd]), + {ok, RefCntr} = couch_ref_counter:start([Fd, ReaderFd]), #db{ update_pid=self(), - fd=Fd, + fd = ReaderFd, + updater_fd = Fd, fd_ref_counter = RefCntr, header=Header, fulldocinfo_by_id_btree = IdBtree, @@ -420,9 +423,19 @@ init_db(DbName, Filepath, Fd, Header0) -> security_ptr = SecurityPtr, instance_start_time = StartTime, revs_limit = Header#db_header.revs_limit, - fsync_options = FsyncOptions + fsync_options = FsyncOptions, + options = Options }. +open_reader_fd(Filepath, Options) -> + {ok, Fd} = case lists:member(sys_db, Options) of + true -> + couch_file:open(Filepath, [read_only, sys_db]); + false -> + couch_file:open(Filepath, [read_only]) + end, + unlink(Fd), + Fd. close_db(#db{fd_ref_counter = RefCntr}) -> couch_ref_counter:drop(RefCntr). @@ -443,7 +456,7 @@ refresh_validate_doc_funs(Db) -> flush_trees(_Db, [], AccFlushedTrees) -> {ok, lists:reverse(AccFlushedTrees)}; -flush_trees(#db{fd=Fd,header=Header}=Db, +flush_trees(#db{updater_fd = Fd, header = Header} = Db, [InfoUnflushed | RestUnflushed], AccFlushed) -> #full_doc_info{update_seq=UpdateSeq, rev_tree=Unflushed} = InfoUnflushed, Flushed = couch_key_tree:map( @@ -694,7 +707,7 @@ commit_data(Db, true) -> Db; commit_data(Db, _) -> #db{ - fd = Fd, + updater_fd = Fd, filepath = Filepath, header = OldHeader, fsync_options = FsyncOptions, @@ -723,7 +736,8 @@ commit_data(Db, _) -> end. -copy_doc_attachments(#db{fd=SrcFd}=SrcDb, {Pos,_RevId}, SrcSp, DestFd) -> +copy_doc_attachments(#db{updater_fd = SrcFd} = SrcDb, {Pos,_RevId}, + SrcSp, DestFd) -> {ok, {BodyData, BinInfos}} = couch_db:read_doc(SrcDb, SrcSp), % copy the bin values NewBinInfos = lists:map( @@ -775,7 +789,7 @@ copy_rev_tree_attachments(SrcDb, DestFd, Tree) -> end, Tree). -copy_docs(Db, #db{fd=DestFd}=NewDb, InfoBySeq, Retry) -> +copy_docs(Db, #db{updater_fd = DestFd} = NewDb, InfoBySeq, Retry) -> Ids = [Id || #doc_info{id=Id} <- InfoBySeq], LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids), @@ -851,7 +865,7 @@ copy_compact(Db, NewDb0, Retry) -> % copy misc header values if NewDb3#db.security /= Db#db.security -> - {ok, Ptr} = couch_file:append_term(NewDb3#db.fd, Db#db.security), + {ok, Ptr} = couch_file:append_term(NewDb3#db.updater_fd, Db#db.security), NewDb4 = NewDb3#db{security=Db#db.security, security_ptr=Ptr}; true -> NewDb4 = NewDb3 @@ -873,7 +887,8 @@ start_copy_compact(#db{name=Name,filepath=Filepath}=Db) -> Retry = false, ok = couch_file:write_header(Fd, Header=#db_header{}) end, - NewDb = init_db(Name, CompactFile, Fd, Header), + ReaderFd = open_reader_fd(CompactFile, Db#db.options), + NewDb = init_db(Name, CompactFile, Fd, ReaderFd, Header, Db#db.options), unlink(Fd), NewDb2 = copy_compact(Db, NewDb, Retry), close_db(NewDb2), diff --git a/src/couchdb/couch_file.erl b/src/couchdb/couch_file.erl index 9d2c42ca191..f3b60a7e2ff 100644 --- a/src/couchdb/couch_file.erl +++ b/src/couchdb/couch_file.erl @@ -19,8 +19,7 @@ -record(file, { fd, - tail_append_begin = 0, % 09 UPGRADE CODE - eof = 0 + tail_append_begin = 0 % 09 UPGRADE CODE }). -export([open/1, open/2, close/1, bytes/1, sync/1, append_binary/2,old_pread/3]). @@ -227,10 +226,11 @@ init_status_error(ReturnPid, Ref, Error) -> init({Filepath, Options, ReturnPid, Ref}) -> process_flag(trap_exit, true), + OpenOptions = file_open_options(Options), case lists:member(create, Options) of true -> filelib:ensure_dir(Filepath), - case file:open(Filepath, [read, append, raw, binary]) of + case file:open(Filepath, OpenOptions) of {ok, Fd} -> {ok, Length} = file:position(Fd, eof), case Length > 0 of @@ -244,14 +244,14 @@ init({Filepath, Options, ReturnPid, Ref}) -> ok = file:truncate(Fd), ok = file:sync(Fd), maybe_track_open_os_files(Options), - {ok, #file{fd=Fd}}; + {ok, #file{fd = Fd}}; false -> ok = file:close(Fd), init_status_error(ReturnPid, Ref, file_exists) end; false -> maybe_track_open_os_files(Options), - {ok, #file{fd=Fd}} + {ok, #file{fd = Fd}} end; Error -> init_status_error(ReturnPid, Ref, Error) @@ -260,16 +260,23 @@ init({Filepath, Options, ReturnPid, Ref}) -> % open in read mode first, so we don't create the file if it doesn't exist. case file:open(Filepath, [read, raw]) of {ok, Fd_Read} -> - {ok, Fd} = file:open(Filepath, [read, append, raw, binary]), + {ok, Fd} = file:open(Filepath, OpenOptions), ok = file:close(Fd_Read), maybe_track_open_os_files(Options), - {ok, Length} = file:position(Fd, eof), - {ok, #file{fd=Fd, eof=Length}}; + {ok, #file{fd = Fd}}; Error -> init_status_error(ReturnPid, Ref, Error) end end. +file_open_options(Options) -> + [read, raw, binary] ++ case lists:member(read_only, Options) of + true -> + []; + false -> + [append] + end. + maybe_track_open_os_files(FileOptions) -> case lists:member(sys_db, FileOptions) of true -> @@ -309,28 +316,25 @@ handle_call({pread_iolist, Pos}, _From, File) -> handle_call({pread, Pos, Bytes}, _From, #file{fd=Fd,tail_append_begin=TailAppendBegin}=File) -> {ok, Bin} = file:pread(Fd, Pos, Bytes), {reply, {ok, Bin, Pos >= TailAppendBegin}, File}; -handle_call(bytes, _From, #file{eof=Length}=File) -> - {reply, {ok, Length}, File}; +handle_call(bytes, _From, #file{fd = Fd} = File) -> + {reply, file:position(Fd, eof), File}; handle_call(sync, _From, #file{fd=Fd}=File) -> {reply, file:sync(Fd), File}; handle_call({truncate, Pos}, _From, #file{fd=Fd}=File) -> {ok, Pos} = file:position(Fd, Pos), - case file:truncate(Fd) of - ok -> - {reply, ok, File#file{eof=Pos}}; - Error -> - {reply, Error, File} - end; -handle_call({append_bin, Bin}, _From, #file{fd=Fd, eof=Pos}=File) -> + {reply, file:truncate(Fd), File}; +handle_call({append_bin, Bin}, _From, #file{fd = Fd} = File) -> + {ok, Pos} = file:position(Fd, eof), Blocks = make_blocks(Pos rem ?SIZE_BLOCK, Bin), case file:write(Fd, Blocks) of ok -> - {reply, {ok, Pos}, File#file{eof=Pos+iolist_size(Blocks)}}; + {reply, {ok, Pos}, File}; Error -> {reply, Error, File} end; -handle_call({write_header, Bin}, _From, #file{fd=Fd, eof=Pos}=File) -> - BinSize = size(Bin), +handle_call({write_header, Bin}, _From, #file{fd = Fd} = File) -> + {ok, Pos} = file:position(Fd, eof), + BinSize = byte_size(Bin), case Pos rem ?SIZE_BLOCK of 0 -> Padding = <<>>; @@ -338,18 +342,12 @@ handle_call({write_header, Bin}, _From, #file{fd=Fd, eof=Pos}=File) -> Padding = <<0:(8*(?SIZE_BLOCK-BlockOffset))>> end, FinalBin = [Padding, <<1, BinSize:32/integer>> | make_blocks(5, [Bin])], - case file:write(Fd, FinalBin) of - ok -> - {reply, ok, File#file{eof=Pos+iolist_size(FinalBin)}}; - Error -> - {reply, Error, File} - end; - + {reply, file:write(Fd, FinalBin), File}; handle_call({upgrade_old_header, Prefix}, _From, #file{fd=Fd}=File) -> case (catch read_old_header(Fd, Prefix)) of {ok, Header} -> - TailAppendBegin = File#file.eof, + {ok, TailAppendBegin} = file:position(Fd, eof), Bin = term_to_binary(Header), Md5 = couch_util:md5(Bin), % now we assemble the final header binary and write to disk @@ -367,7 +365,8 @@ handle_call({upgrade_old_header, Prefix}, _From, #file{fd=Fd}=File) -> end; -handle_call(find_header, _From, #file{fd=Fd, eof=Pos}=File) -> +handle_call(find_header, _From, #file{fd = Fd} = File) -> + {ok, Pos} = file:position(Fd, eof), {reply, find_header(Fd, Pos div ?SIZE_BLOCK), File}. % 09 UPGRADE CODE