Skip to content

Commit

Permalink
Simpler and safer db open/closing in view group servers
Browse files Browse the repository at this point in the history
This makes the opening and closing of databases in the view
group server to be more friendly with the db reference counting
system, avoiding more potential db file leaking after compaction,
as we currently open a database in one process and use it on
another process (view compactor, view updater).

This significantly reduces the chances of failure when compacting
very large views as discussed in COUCHDB-994.

This relates to COUCHDB-926 and COUCHDB-994.




git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@1138796 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
fdmanana committed Jun 23, 2011
1 parent 0db6df9 commit c0de37d
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 55 deletions.
1 change: 0 additions & 1 deletion src/couchdb/couch_db.hrl
Expand Up @@ -225,7 +225,6 @@

-record(group, {
sig=nil,
db=nil,
fd=nil,
name,
def_lang,
Expand Down
8 changes: 4 additions & 4 deletions src/couchdb/couch_view_compactor.erl
Expand Up @@ -20,14 +20,14 @@
%% @doc Compacts the views. GroupId must not include the _design/ prefix
start_compact(DbName, GroupId) ->
Pid = couch_view:get_group_server(DbName, <<"_design/",GroupId/binary>>),
gen_server:cast(Pid, {start_compact, fun compact_group/2}).
gen_server:cast(Pid, {start_compact, fun compact_group/3}).

%%=============================================================================
%% internal functions
%%=============================================================================

%% @spec compact_group(Group, NewGroup) -> ok
compact_group(Group, EmptyGroup) ->
compact_group(Group, EmptyGroup, DbName) ->
#group{
current_seq = Seq,
id_btree = IdBtree,
Expand All @@ -36,16 +36,16 @@ compact_group(Group, EmptyGroup) ->
} = Group,

#group{
db = Db,
id_btree = EmptyIdBtree,
views = EmptyViews
} = EmptyGroup,

{ok, Db} = couch_db:open_int(DbName, []),
{ok, DbReduce} = couch_btree:full_reduce(Db#db.fulldocinfo_by_id_btree),
couch_db:close(Db),
Count = element(1, DbReduce),

<<"_design", ShortName/binary>> = GroupId,
DbName = couch_db:name(Db),
TaskName = <<DbName/binary, ShortName/binary>>,
couch_task_status:add_task(<<"View Group Compaction">>, TaskName, <<"">>),
BufferSize = list_to_integer(
Expand Down
70 changes: 26 additions & 44 deletions src/couchdb/couch_view_group.erl
Expand Up @@ -78,7 +78,7 @@ start_link(InitArgs) ->
init({{_, DbName, _} = InitArgs, ReturnPid, Ref}) ->
process_flag(trap_exit, true),
try prepare_group(InitArgs, false) of
{ok, #group{db=Db, fd=Fd, current_seq=Seq}=Group} ->
{ok, Db, #group{fd=Fd, current_seq=Seq}=Group} ->
case Seq > couch_db:get_update_seq(Db) of
true ->
ReturnPid ! {Ref, self(), {error, invalid_view_seq}},
Expand All @@ -90,7 +90,7 @@ init({{_, DbName, _} = InitArgs, ReturnPid, Ref}) ->
{ok, #group_state{
db_name=DbName,
init_args=InitArgs,
group=Group#group{db=nil},
group=Group,
ref_counter=RefCounter}}
end;
Error ->
Expand Down Expand Up @@ -124,14 +124,11 @@ handle_call({request_group, RequestSeq}, From,
updater_pid=nil,
waiting_list=WaitList
}=State) when RequestSeq > Seq ->
{ok, Db} = couch_db:open_int(DbName, []),
Group2 = Group#group{db=Db},
Owner = self(),
Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group2) end),
Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group, DbName) end),

{noreply, State#group_state{
updater_pid=Pid,
group=Group2,
waiting_list=[{From,RequestSeq}|WaitList]
}, infinity};

Expand Down Expand Up @@ -166,7 +163,8 @@ handle_cast({start_compact, CompactFun}, #group_state{compactor_pid=nil}
{ok, Db} = couch_db:open_int(DbName, []),
{ok, Fd} = open_index_file(compact, RootDir, DbName, GroupSig),
NewGroup = reset_file(Db, Fd, DbName, Group),
Pid = spawn_link(fun() -> CompactFun(Group, NewGroup) end),
couch_db:close(Db),
Pid = spawn_link(fun() -> CompactFun(Group, NewGroup, DbName) end),
{noreply, State#group_state{compactor_pid = Pid}};
handle_cast({start_compact, _}, State) ->
%% compact already running, this is a no-op
Expand All @@ -176,7 +174,7 @@ handle_cast({compact_done, #group{current_seq=NewSeq} = NewGroup},
#group_state{group = #group{current_seq=OldSeq}} = State)
when NewSeq >= OldSeq ->
#group_state{
group = #group{name=GroupId, fd=OldFd, sig=GroupSig} = Group,
group = #group{name=GroupId, fd=OldFd, sig=GroupSig},
init_args = {RootDir, DbName, _},
updater_pid = UpdaterPid,
compactor_pid = CompactorPid,
Expand All @@ -195,7 +193,7 @@ handle_cast({compact_done, #group{current_seq=NewSeq} = NewGroup},
unlink(UpdaterPid),
exit(UpdaterPid, view_compaction_complete),
Owner = self(),
spawn_link(fun()-> couch_view_updater:update(Owner, NewGroup) end);
spawn_link(fun()-> couch_view_updater:update(Owner, NewGroup, DbName) end);
true ->
nil
end,
Expand All @@ -206,19 +204,10 @@ handle_cast({compact_done, #group{current_seq=NewSeq} = NewGroup},
unlink(OldFd),
couch_ref_counter:drop(RefCounter),
{ok, NewRefCounter} = couch_ref_counter:start([NewGroup#group.fd]),
case Group#group.db of
nil -> ok;
Else -> couch_db:close(Else)
end,

case NewGroup#group.db of
nil -> ok;
_ -> couch_db:close(NewGroup#group.db)
end,

self() ! delayed_commit,
{noreply, State#group_state{
group=NewGroup#group{db = nil},
group=NewGroup,
ref_counter=NewRefCounter,
compactor_pid=nil,
updater_pid=NewUpdaterPid
Expand All @@ -230,18 +219,15 @@ handle_cast({compact_done, NewGroup}, State) ->
} = State,
?LOG_INFO("View index compaction still behind for ~s ~s -- current: ~p " ++
"compact: ~p", [DbName, GroupId, CurrentSeq, NewGroup#group.current_seq]),
couch_db:close(NewGroup#group.db),
Pid = spawn_link(fun() ->
{ok, Db} = couch_db:open_int(DbName, []),
{_,Ref} = erlang:spawn_monitor(fun() ->
couch_view_updater:update(nil, NewGroup#group{db = Db})
couch_view_updater:update(nil, NewGroup, DbName)
end),
receive
{'DOWN', Ref, _, _, {new_group, NewGroup2}} ->
couch_db:close(Db),
#group{name=GroupId} = NewGroup2,
Pid2 = couch_view:get_group_server(DbName, GroupId),
gen_server:cast(Pid2, {compact_done, NewGroup2#group{db = nil}})
gen_server:cast(Pid2, {compact_done, NewGroup2})
end
end),
{noreply, State#group_state{compactor_pid = Pid}};
Expand Down Expand Up @@ -283,44 +269,40 @@ handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) ->
{noreply, State#group_state{waiting_commit=true}}
end;

handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}},
handle_info({'EXIT', FromPid, {new_group, Group}},
#group_state{db_name=DbName,
updater_pid=UpPid,
ref_counter=RefCounter,
waiting_list=WaitList,
waiting_commit=WaitingCommit}=State) when UpPid == FromPid ->
ok = couch_db:close(Db),
if not WaitingCommit ->
erlang:send_after(1000, self(), delayed_commit);
true -> ok
end,
case reply_with_group(Group, WaitList, [], RefCounter) of
[] ->
{noreply, State#group_state{waiting_commit=true, waiting_list=[],
group=Group#group{db=nil}, updater_pid=nil}};
group=Group, updater_pid=nil}};
StillWaiting ->
% we still have some waiters, reopen the database and reupdate the index
{ok, Db2} = couch_db:open_int(DbName, []),
Group2 = Group#group{db=Db2},
Owner = self(),
Pid = spawn_link(fun() -> couch_view_updater:update(Owner, Group2) end),
Pid = spawn_link(fun() -> couch_view_updater:update(Owner, Group, DbName) end),
{noreply, State#group_state{waiting_commit=true,
waiting_list=StillWaiting, group=Group2, updater_pid=Pid}}
waiting_list=StillWaiting, updater_pid=Pid}}
end;
handle_info({'EXIT', _, {new_group, _}}, State) ->
%% message from an old (probably pre-compaction) updater; ignore
{noreply, State};

handle_info({'EXIT', FromPid, reset},
#group_state{
init_args=InitArgs,
updater_pid=UpPid,
group=Group}=State) when UpPid == FromPid ->
ok = couch_db:close(Group#group.db),
handle_info({'EXIT', UpPid, reset},
#group_state{init_args=InitArgs, updater_pid=UpPid} = State) ->
case prepare_group(InitArgs, true) of
{ok, ResetGroup} ->
{ok, Db, ResetGroup} ->
Owner = self(),
Pid = spawn_link(fun()-> couch_view_updater:update(Owner, ResetGroup) end),
couch_db:close(Db),
Pid = spawn_link(fun() ->
couch_view_updater:update(Owner, ResetGroup, Db#db.name)
end),
{noreply, State#group_state{
updater_pid=Pid,
group=ResetGroup}};
Expand Down Expand Up @@ -386,15 +368,15 @@ prepare_group({RootDir, DbName, #group{sig=Sig}=Group}, ForceReset)->
{ok, Fd} ->
if ForceReset ->
% this can happen if we missed a purge
{ok, reset_file(Db, Fd, DbName, Group)};
{ok, Db, reset_file(Db, Fd, DbName, Group)};
true ->
case (catch couch_file:read_header(Fd)) of
{ok, {Sig, HeaderInfo}} ->
% sigs match!
{ok, init_group(Db, Fd, Group, HeaderInfo)};
{ok, Db, init_group(Db, Fd, Group, HeaderInfo)};
_ ->
% this happens on a new file
{ok, reset_file(Db, Fd, DbName, Group)}
{ok, Db, reset_file(Db, Fd, DbName, Group)}
end
end;
Error ->
Expand Down Expand Up @@ -598,7 +580,7 @@ design_doc_to_view_group(#doc{id=Id,body={Fields}}) ->

reset_group(#group{views=Views}=Group) ->
Views2 = [View#view{btree=nil} || View <- Views],
Group#group{db=nil,fd=nil,query_server=nil,current_seq=0,
Group#group{fd=nil,query_server=nil,current_seq=0,
id_btree=nil,views=Views2}.

reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) ->
Expand Down Expand Up @@ -656,5 +638,5 @@ init_group(Db, Fd, #group{def_lang=Lang,views=Views}=
View#view{btree=Btree, update_seq=USeq, purge_seq=PSeq}
end,
ViewStates2, Views),
Group#group{db=Db, fd=Fd, current_seq=Seq, purge_seq=PurgeSeq,
Group#group{fd=Fd, current_seq=Seq, purge_seq=PurgeSeq,
id_btree=IdBtree, views=Views2}.
14 changes: 8 additions & 6 deletions src/couchdb/couch_view_updater.erl
Expand Up @@ -12,30 +12,31 @@

-module(couch_view_updater).

-export([update/2]).
-export([update/3]).

-include("couch_db.hrl").

-spec update(_, #group{}) -> no_return().
-spec update(_, #group{}, Dbname::binary()) -> no_return().

update(Owner, Group) ->
update(Owner, Group, DbName) ->
#group{
db = #db{name=DbName} = Db,
name = GroupName,
current_seq = Seq,
purge_seq = PurgeSeq
} = Group,
couch_task_status:add_task(<<"View Group Indexer">>, <<DbName/binary," ",GroupName/binary>>, <<"Starting index update">>),

{ok, Db} = couch_db:open_int(DbName, []),
DbPurgeSeq = couch_db:get_purge_seq(Db),
Group2 =
if DbPurgeSeq == PurgeSeq ->
Group;
DbPurgeSeq == PurgeSeq + 1 ->
couch_task_status:update(<<"Removing purged entries from view index.">>),
purge_index(Group);
purge_index(Group, Db);
true ->
couch_task_status:update(<<"Resetting view index due to lost purge entries.">>),
couch_db:close(Db),
exit(reset)
end,
{ok, MapQueue} = couch_work_queue:new(
Expand Down Expand Up @@ -73,13 +74,14 @@ update(Owner, Group) ->
couch_task_status:set_update_frequency(0),
couch_task_status:update("Finishing."),
couch_work_queue:close(MapQueue),
couch_db:close(Db),
receive {new_group, NewGroup} ->
exit({new_group,
NewGroup#group{current_seq=couch_db:get_update_seq(Db)}})
end.


purge_index(#group{db=Db, views=Views, id_btree=IdBtree}=Group) ->
purge_index(#group{views=Views, id_btree=IdBtree}=Group, Db) ->
{ok, PurgedIdsRevs} = couch_db:get_last_purged(Db),
Ids = [Id || {Id, _Revs} <- PurgedIdsRevs],
{ok, Lookups, IdBtree2} = couch_btree:query_modify(IdBtree, Ids, [], Ids),
Expand Down

0 comments on commit c0de37d

Please sign in to comment.