Skip to content

Commit

Permalink
Add dedicated couch_file to the DB updater process.
Browse files Browse the repository at this point in the history
  • Loading branch information
fdmanana committed Dec 4, 2010
1 parent 03ede5b commit e4bdf2c
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 74 deletions.
10 changes: 1 addition & 9 deletions src/couchdb/couch_btree.erl
Expand Up @@ -16,17 +16,9 @@
-export([fold/4, full_reduce/1, final_reduce/2, foldl/3, foldl/4]).
-export([fold_reduce/4, lookup/2, get_state/1, set_options/2]).

-include("couch_db.hrl").
-define(CHUNK_THRESHOLD, 16#4ff).

-record(btree,
{fd,
root,
extract_kv = fun({Key, Value}) -> {Key, Value} end,
assemble_kv = fun(Key, Value) -> {Key, Value} end,
less = fun(A, B) -> A < B end,
reduce = nil
}).

extract(#btree{extract_kv=Extract}, Value) ->
Extract(Value).

Expand Down
55 changes: 36 additions & 19 deletions src/couchdb/couch_db.erl
Expand Up @@ -223,7 +223,7 @@ get_full_doc_info(Db, Id) ->
Result.

get_full_doc_infos(Db, Ids) ->
couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids).
couch_btree:lookup(by_id_btree(Db), Ids).

increment_update_seq(#db{update_pid=UpdatePid}) ->
gen_server:call(UpdatePid, increment_update_seq).
Expand Down Expand Up @@ -251,11 +251,10 @@ get_db_info(Db) ->
compactor_pid=Compactor,
update_seq=SeqNum,
name=Name,
fulldocinfo_by_id_btree=FullDocBtree,
instance_start_time=StartTime,
committed_update_seq=CommittedUpdateSeq} = Db,
{ok, Size} = couch_file:bytes(Fd),
{ok, {Count, DelCount}} = couch_btree:full_reduce(FullDocBtree),
{ok, {Count, DelCount}} = couch_btree:full_reduce(by_id_btree(Db)),
InfoList = [
{db_name, Name},
{doc_count, Count},
Expand All @@ -270,8 +269,8 @@ get_db_info(Db) ->
],
{ok, InfoList}.

get_design_docs(#db{fulldocinfo_by_id_btree=Btree}=Db) ->
{ok,_, Docs} = couch_btree:fold(Btree,
get_design_docs(Db) ->
{ok,_, Docs} = couch_btree:fold(by_id_btree(Db),
fun(#full_doc_info{id= <<"_design/",_/binary>>}=FullDocInfo, _Reds, AccDocs) ->
{ok, Doc} = couch_db:open_doc_int(Db, FullDocInfo, []),
{ok, [Doc | AccDocs]};
Expand Down Expand Up @@ -662,7 +661,7 @@ update_docs(Db, Docs, Options, replicated_changes) ->
DocErrors = [],
DocBuckets3 = DocBuckets
end,
DocBuckets4 = [[doc_flush_atts(check_dup_atts(Doc), Db#db.fd)
DocBuckets4 = [[doc_flush_atts(check_dup_atts(Doc), Db#db.updater_fd)
|| Doc <- Bucket] || Bucket <- DocBuckets3],
{ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts | Options]),
{ok, DocErrors};
Expand Down Expand Up @@ -718,7 +717,7 @@ update_docs(Db, Docs, Options, interactive_edit) ->
true -> [] end ++ Options,
DocBuckets3 = [[
doc_flush_atts(set_new_att_revpos(
check_dup_atts(Doc)), Db#db.fd)
check_dup_atts(Doc)), Db#db.updater_fd)
|| Doc <- B] || B <- DocBuckets2],
{DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []),

Expand Down Expand Up @@ -786,7 +785,10 @@ write_and_commit(#db{update_pid=UpdatePid}=Db, DocBuckets,
% This can happen if the db file we wrote to was swapped out by
% compaction. Retry by reopening the db and writing to the current file
{ok, Db2} = open_ref_counted(Db#db.main_pid, self()),
DocBuckets2 = [[doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets],
DocBuckets2 = [
[doc_flush_atts(Doc, Db2#db.updater_fd) || Doc <- Bucket] ||
Bucket <- DocBuckets
],
% We only retry once
close(Db2),
UpdatePid ! {update_docs, self(), DocBuckets2, NonRepDocs, MergeConflicts, FullCommit},
Expand Down Expand Up @@ -954,25 +956,28 @@ changes_since(Db, Style, StartSeq, Fun, Options, Acc) ->
end,
Fun(DocInfo2, Acc2)
end,
{ok, _LastReduction, AccOut} = couch_btree:fold(Db#db.docinfo_by_seq_btree,
{ok, _LastReduction, AccOut} = couch_btree:fold(by_seq_btree(Db),
Wrapper, Acc, [{start_key, StartSeq + 1}] ++ Options),
{ok, AccOut}.

count_changes_since(Db, SinceSeq) ->
BTree = by_seq_btree(Db),
{ok, Changes} =
couch_btree:fold_reduce(Db#db.docinfo_by_seq_btree,
couch_btree:fold_reduce(BTree,
fun(_SeqStart, PartialReds, 0) ->
{ok, couch_btree:final_reduce(Db#db.docinfo_by_seq_btree, PartialReds)}
{ok, couch_btree:final_reduce(BTree, PartialReds)}
end,
0, [{start_key, SinceSeq + 1}]),
Changes.

enum_docs_since(Db, SinceSeq, InFun, Acc, Options) ->
{ok, LastReduction, AccOut} = couch_btree:fold(Db#db.docinfo_by_seq_btree, InFun, Acc, [{start_key, SinceSeq + 1} | Options]),
{ok, LastReduction, AccOut} = couch_btree:fold(
by_seq_btree(Db), InFun, Acc, [{start_key, SinceSeq + 1} | Options]),
{ok, enum_docs_since_reduce_to_count(LastReduction), AccOut}.

enum_docs(Db, InFun, InAcc, Options) ->
{ok, LastReduce, OutAcc} = couch_btree:fold(Db#db.fulldocinfo_by_id_btree, InFun, InAcc, Options),
{ok, LastReduce, OutAcc} = couch_btree:fold(
by_id_btree(Db), InFun, InAcc, Options),
{ok, enum_docs_reduce_to_count(LastReduce), OutAcc}.

% server functions
Expand Down Expand Up @@ -1072,7 +1077,7 @@ open_doc_revs_int(Db, IdRevs, Options) ->
IdRevs, LookupResults).

open_doc_int(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = Id, _Options) ->
case couch_btree:lookup(Db#db.local_docs_btree, [Id]) of
case couch_btree:lookup(local_btree(Db), [Id]) of
[{ok, {_, {Rev, BodyData}}}] ->
{ok, #doc{id=Id, revs={0, [list_to_binary(integer_to_list(Rev))]}, body=BodyData}};
[not_found] ->
Expand Down Expand Up @@ -1151,7 +1156,7 @@ doc_to_tree_simple(Doc, [RevId | Rest]) ->
[{RevId, ?REV_MISSING, doc_to_tree_simple(Doc, Rest)}].


make_doc(#db{fd=Fd}=Db, Id, Deleted, Bp, RevisionPath) ->
make_doc(#db{updater_fd = Fd} = Db, Id, Deleted, Bp, RevisionPath) ->
{BodyData, Atts} =
case Bp of
nil ->
Expand Down Expand Up @@ -1207,7 +1212,19 @@ make_doc(#db{fd=Fd}=Db, Id, Deleted, Bp, RevisionPath) ->
}.


increment_stat(#db{is_sys_db = true}, _Stat) ->
ok;
increment_stat(#db{}, Stat) ->
couch_stats_collector:increment(Stat).
increment_stat(#db{options = Options}, Stat) ->
case lists:member(sys_db, Options) of
true ->
ok;
false ->
couch_stats_collector:increment(Stat)
end.

local_btree(#db{local_docs_btree = BTree, fd = ReaderFd}) ->
BTree#btree{fd = ReaderFd}.

by_seq_btree(#db{docinfo_by_seq_btree = BTree, fd = ReaderFd}) ->
BTree#btree{fd = ReaderFd}.

by_id_btree(#db{fulldocinfo_by_id_btree = BTree, fd = ReaderFd}) ->
BTree#btree{fd = ReaderFd}.
11 changes: 10 additions & 1 deletion src/couchdb/couch_db.hrl
Expand Up @@ -158,6 +158,7 @@
compactor_pid = nil,
instance_start_time, % number of microsecs since jan 1 1970 as a binary string
fd,
updater_fd,
fd_ref_counter,
header = #db_header{},
committed_update_seq,
Expand All @@ -174,7 +175,7 @@
waiting_delayed_commit = nil,
revs_limit = 1000,
fsync_options = [],
is_sys_db = false
options = []
}).


Expand Down Expand Up @@ -297,3 +298,11 @@
db_open_options = []
}).

-record(btree, {
fd,
root,
extract_kv = fun({_Key, _Value} = KV) -> KV end,
assemble_kv = fun(Key, Value) -> {Key, Value} end,
less = fun(A, B) -> A < B end,
reduce = nil
}).
47 changes: 31 additions & 16 deletions src/couchdb/couch_db_updater.erl
Expand Up @@ -42,13 +42,14 @@ init({MainPid, DbName, Filepath, Fd, Options}) ->
file:delete(Filepath ++ ".compact")
end
end,

Db = init_db(DbName, Filepath, Fd, Header),
ReaderFd = open_reader_fd(Filepath, Options),
Db = init_db(DbName, Filepath, Fd, ReaderFd, Header, Options),
Db2 = refresh_validate_doc_funs(Db),
{ok, Db2#db{main_pid = MainPid, is_sys_db = lists:member(sys_db, Options)}}.
{ok, Db2#db{main_pid = MainPid}}.


terminate(_Reason, Db) ->
couch_file:close(Db#db.updater_fd),
couch_file:close(Db#db.fd),
couch_util:shutdown_sync(Db#db.compactor_pid),
couch_util:shutdown_sync(Db#db.fd_ref_counter),
Expand All @@ -67,7 +68,7 @@ handle_call(increment_update_seq, _From, Db) ->
{reply, {ok, Db2#db.update_seq}, Db2};

handle_call({set_security, NewSec}, _From, Db) ->
{ok, Ptr} = couch_file:append_term(Db#db.fd, NewSec),
{ok, Ptr} = couch_file:append_term(Db#db.updater_fd, NewSec),
Db2 = commit_data(Db#db{security=NewSec, security_ptr=Ptr,
update_seq=Db#db.update_seq+1}),
ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
Expand All @@ -84,7 +85,7 @@ handle_call({purge_docs, _IdRevs}, _From,
{reply, {error, purge_during_compaction}, Db};
handle_call({purge_docs, IdRevs}, _From, Db) ->
#db{
fd=Fd,
updater_fd = Fd,
fulldocinfo_by_id_btree = DocInfoByIdBTree,
docinfo_by_seq_btree = DocInfoBySeqBTree,
update_seq = LastSeq,
Expand Down Expand Up @@ -161,9 +162,10 @@ handle_call(start_compact, _From, Db) ->

handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
{ok, NewFd} = couch_file:open(CompactFilepath),
ReaderFd = open_reader_fd(CompactFilepath, Db#db.options),
{ok, NewHeader} = couch_file:read_header(NewFd),
#db{update_seq=NewSeq} = NewDb =
init_db(Db#db.name, Filepath, NewFd, NewHeader),
init_db(Db#db.name, Filepath, NewFd, ReaderFd, NewHeader, Db#db.options),
unlink(NewFd),
case Db#db.update_seq == NewSeq of
true ->
Expand Down Expand Up @@ -362,7 +364,7 @@ simple_upgrade_record(Old, New) when tuple_size(Old) < tuple_size(New) ->
list_to_tuple(tuple_to_list(Old) ++ NewValuesTail).


init_db(DbName, Filepath, Fd, Header0) ->
init_db(DbName, Filepath, Fd, ReaderFd, Header0, Options) ->
Header1 = simple_upgrade_record(Header0, #db_header{}),
Header =
case element(2, Header1) of
Expand Down Expand Up @@ -403,10 +405,11 @@ init_db(DbName, Filepath, Fd, Header0) ->
{MegaSecs, Secs, MicroSecs} = now(),
StartTime = ?l2b(io_lib:format("~p",
[(MegaSecs*1000000*1000000) + (Secs*1000000) + MicroSecs])),
{ok, RefCntr} = couch_ref_counter:start([Fd]),
{ok, RefCntr} = couch_ref_counter:start([Fd, ReaderFd]),
#db{
update_pid=self(),
fd=Fd,
fd = ReaderFd,
updater_fd = Fd,
fd_ref_counter = RefCntr,
header=Header,
fulldocinfo_by_id_btree = IdBtree,
Expand All @@ -420,9 +423,19 @@ init_db(DbName, Filepath, Fd, Header0) ->
security_ptr = SecurityPtr,
instance_start_time = StartTime,
revs_limit = Header#db_header.revs_limit,
fsync_options = FsyncOptions
fsync_options = FsyncOptions,
options = Options
}.

open_reader_fd(Filepath, Options) ->
{ok, Fd} = case lists:member(sys_db, Options) of
true ->
couch_file:open(Filepath, [read_only, sys_db]);
false ->
couch_file:open(Filepath, [read_only])
end,
unlink(Fd),
Fd.

close_db(#db{fd_ref_counter = RefCntr}) ->
couch_ref_counter:drop(RefCntr).
Expand All @@ -443,7 +456,7 @@ refresh_validate_doc_funs(Db) ->

flush_trees(_Db, [], AccFlushedTrees) ->
{ok, lists:reverse(AccFlushedTrees)};
flush_trees(#db{fd=Fd,header=Header}=Db,
flush_trees(#db{updater_fd = Fd, header = Header} = Db,
[InfoUnflushed | RestUnflushed], AccFlushed) ->
#full_doc_info{update_seq=UpdateSeq, rev_tree=Unflushed} = InfoUnflushed,
Flushed = couch_key_tree:map(
Expand Down Expand Up @@ -694,7 +707,7 @@ commit_data(Db, true) ->
Db;
commit_data(Db, _) ->
#db{
fd = Fd,
updater_fd = Fd,
filepath = Filepath,
header = OldHeader,
fsync_options = FsyncOptions,
Expand Down Expand Up @@ -723,7 +736,8 @@ commit_data(Db, _) ->
end.


copy_doc_attachments(#db{fd=SrcFd}=SrcDb, {Pos,_RevId}, SrcSp, DestFd) ->
copy_doc_attachments(#db{updater_fd = SrcFd} = SrcDb, {Pos,_RevId},
SrcSp, DestFd) ->
{ok, {BodyData, BinInfos}} = couch_db:read_doc(SrcDb, SrcSp),
% copy the bin values
NewBinInfos = lists:map(
Expand Down Expand Up @@ -775,7 +789,7 @@ copy_rev_tree_attachments(SrcDb, DestFd, Tree) ->
end, Tree).


copy_docs(Db, #db{fd=DestFd}=NewDb, InfoBySeq, Retry) ->
copy_docs(Db, #db{updater_fd = DestFd} = NewDb, InfoBySeq, Retry) ->
Ids = [Id || #doc_info{id=Id} <- InfoBySeq],
LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids),

Expand Down Expand Up @@ -851,7 +865,7 @@ copy_compact(Db, NewDb0, Retry) ->

% copy misc header values
if NewDb3#db.security /= Db#db.security ->
{ok, Ptr} = couch_file:append_term(NewDb3#db.fd, Db#db.security),
{ok, Ptr} = couch_file:append_term(NewDb3#db.updater_fd, Db#db.security),
NewDb4 = NewDb3#db{security=Db#db.security, security_ptr=Ptr};
true ->
NewDb4 = NewDb3
Expand All @@ -873,7 +887,8 @@ start_copy_compact(#db{name=Name,filepath=Filepath}=Db) ->
Retry = false,
ok = couch_file:write_header(Fd, Header=#db_header{})
end,
NewDb = init_db(Name, CompactFile, Fd, Header),
ReaderFd = open_reader_fd(CompactFile, Db#db.options),
NewDb = init_db(Name, CompactFile, Fd, ReaderFd, Header, Db#db.options),
unlink(Fd),
NewDb2 = copy_compact(Db, NewDb, Retry),
close_db(NewDb2),
Expand Down

0 comments on commit e4bdf2c

Please sign in to comment.