Skip to content

Commit

Permalink
View groups: only open the databases when needed
Browse files Browse the repository at this point in the history
View groups keep the databases open all the time. This is a problem
once the server reaches max_dbs_open open databases, as it prevents
the server from closing inactive databases via the LRU system.

Closes COUCHDB-1138.



git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@1096252 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
fdmanana committed Apr 23, 2011
1 parent 6bdfd36 commit 0ce2c54
Show file tree
Hide file tree
Showing 3 changed files with 263 additions and 38 deletions.
56 changes: 19 additions & 37 deletions src/couchdb/couch_view_group.erl
Expand Up @@ -32,8 +32,7 @@
compactor_pid=nil,
waiting_commit=false,
waiting_list=[],
ref_counter=nil,
db_update_notifier=nil
ref_counter=nil
}).

% api methods
Expand Down Expand Up @@ -85,20 +84,12 @@ init({{_, DbName, _} = InitArgs, ReturnPid, Ref}) ->
ReturnPid ! {Ref, self(), {error, invalid_view_seq}},
ignore;
_ ->
couch_db:monitor(Db),
couch_db:close(Db),
{ok, RefCounter} = couch_ref_counter:start([Fd]),
Server = self(),
{ok, Notifier} = couch_db_update_notifier:start_link(
fun({compacted, DbName1}) when DbName1 =:= DbName ->
ok = gen_server:cast(Server, reopen_db);
(_) ->
ok
end),
{ok, #group_state{
db_update_notifier=Notifier,
db_name=couch_db:name(Db),
db_name=DbName,
init_args=InitArgs,
group=Group,
group=Group#group{db=nil},
ref_counter=RefCounter}}
end;
Error ->
Expand Down Expand Up @@ -128,11 +119,11 @@ init({{_, DbName, _} = InitArgs, ReturnPid, Ref}) ->
handle_call({request_group, RequestSeq}, From,
#group_state{
db_name=DbName,
group=#group{current_seq=Seq, db=OldDb}=Group,
group=#group{current_seq=Seq}=Group,
updater_pid=nil,
waiting_list=WaitList
}=State) when RequestSeq > Seq ->
{ok, Db} = reopen_db(DbName, OldDb),
{ok, Db} = couch_db:open_int(DbName, []),
Group2 = Group#group{db=Db},
Owner = self(),
Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group2) end),
Expand Down Expand Up @@ -167,11 +158,11 @@ handle_call(request_group_info, _From, State) ->
handle_cast({start_compact, CompactFun}, #group_state{compactor_pid=nil}
= State) ->
#group_state{
group = #group{name = GroupId, sig = GroupSig, db = OldDb} = Group,
group = #group{name = GroupId, sig = GroupSig} = Group,
init_args = {RootDir, DbName, _}
} = State,
?LOG_INFO("View index compaction starting for ~s ~s", [DbName, GroupId]),
{ok, Db} = reopen_db(DbName, OldDb),
{ok, Db} = couch_db:open_int(DbName, []),
{ok, Fd} = open_index_file(compact, RootDir, DbName, GroupSig),
NewGroup = reset_file(Db, Fd, DbName, Group),
Pid = spawn_link(fun() -> CompactFun(Group, NewGroup) end),
Expand Down Expand Up @@ -219,9 +210,14 @@ handle_cast({compact_done, #group{current_seq=NewSeq} = NewGroup},
Else -> couch_db:close(Else)
end,

case NewGroup#group.db of
nil -> ok;
_ -> couch_db:close(NewGroup#group.db)
end,

self() ! delayed_commit,
{noreply, State#group_state{
group=NewGroup,
group=NewGroup#group{db = nil},
ref_counter=NewRefCounter,
compactor_pid=nil,
updater_pid=NewUpdaterPid
Expand All @@ -244,7 +240,7 @@ handle_cast({compact_done, NewGroup}, State) ->
couch_db:close(Db),
#group{name=GroupId} = NewGroup2,
Pid2 = couch_view:get_group_server(DbName, GroupId),
gen_server:cast(Pid2, {compact_done, NewGroup2})
gen_server:cast(Pid2, {compact_done, NewGroup2#group{db = nil}})
end
end),
{noreply, State#group_state{compactor_pid = Pid}};
Expand All @@ -265,11 +261,7 @@ handle_cast({partial_update, Pid, NewGroup}, #group_state{updater_pid=Pid}
{noreply, State#group_state{group=NewGroup, waiting_commit=true}};
handle_cast({partial_update, _, _}, State) ->
%% message from an old (probably pre-compaction) updater; ignore
{noreply, State};

handle_cast(reopen_db, #group_state{group = Group, db_name = DbName} = State) ->
{ok, Db} = reopen_db(DbName, Group#group.db),
{noreply, State#group_state{group = Group#group{db = Db}}}.
{noreply, State}.

handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) ->
{ok, Db} = couch_db:open_int(DbName, []),
Expand Down Expand Up @@ -347,15 +339,10 @@ handle_info({'EXIT', FromPid, {{nocatch, Reason}, _Trace}}, State) ->

handle_info({'EXIT', FromPid, Reason}, State) ->
?LOG_DEBUG("Exit from linked pid: ~p", [{FromPid, Reason}]),
{stop, Reason, State};

handle_info({'DOWN',_,_,_,_}, State) ->
?LOG_INFO("Shutting down view group server, monitored db is closing.", []),
{stop, normal, reply_all(State, shutdown)}.
{stop, Reason, State}.


terminate(Reason, #group_state{updater_pid=Update, compactor_pid=Compact}=S) ->
couch_db_update_notifier:stop(S#group_state.db_update_notifier),
reply_all(S, Reason),
couch_util:shutdown_sync(Update),
couch_util:shutdown_sync(Compact),
Expand Down Expand Up @@ -387,8 +374,8 @@ reply_all(#group_state{waiting_list=WaitList}=State, Reply) ->
[catch gen_server:reply(Pid, Reply) || {Pid, _} <- WaitList],
State#group_state{waiting_list=[]}.

prepare_group({RootDir, DbName, #group{sig=Sig, db=OldDb}=Group}, ForceReset)->
case reopen_db(DbName, OldDb) of
prepare_group({RootDir, DbName, #group{sig=Sig}=Group}, ForceReset)->
case couch_db:open_int(DbName, []) of
{ok, Db} ->
case open_index_file(RootDir, DbName, Sig) of
{ok, Fd} ->
Expand Down Expand Up @@ -664,8 +651,3 @@ init_group(Db, Fd, #group{def_lang=Lang,views=Views}=
ViewStates2, Views),
Group#group{db=Db, fd=Fd, current_seq=Seq, purge_seq=PurgeSeq,
id_btree=IdBtree, views=Views2}.

reopen_db(DbName, nil) ->
couch_db:open_int(DbName, []);
reopen_db(_DbName, Db) ->
couch_db:reopen(Db).
242 changes: 242 additions & 0 deletions test/etap/200-view-group-no-db-leaks.t
@@ -0,0 +1,242 @@
#!/usr/bin/env escript
%% -*- erlang -*-

% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-record(user_ctx, {
name = null,
roles = [],
handler
}).

-define(LATEST_DISK_VERSION, 6).

-record(db_header,
{disk_version = ?LATEST_DISK_VERSION,
update_seq = 0,
unused = 0,
fulldocinfo_by_id_btree_state = nil,
docinfo_by_seq_btree_state = nil,
local_docs_btree_state = nil,
purge_seq = 0,
purged_docs = nil,
security_ptr = nil,
revs_limit = 1000
}).

-record(db, {
main_pid = nil,
update_pid = nil,
compactor_pid = nil,
instance_start_time, % number of microsecs since jan 1 1970 as a binary string
fd,
updater_fd,
fd_ref_counter,
header = #db_header{},
committed_update_seq,
fulldocinfo_by_id_btree,
docinfo_by_seq_btree,
local_docs_btree,
update_seq,
name,
filepath,
validate_doc_funs = [],
security = [],
security_ptr = nil,
user_ctx = #user_ctx{},
waiting_delayed_commit = nil,
revs_limit = 1000,
fsync_options = [],
options = []
}).

test_db_name() -> <<"couch_test_view_group_db_leaks">>.
ddoc_name() -> <<"foo">>.

main(_) ->
test_util:init_code_path(),

etap:plan(13),
case (catch test()) of
ok ->
etap:end_tests();
Other ->
etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
etap:bail(Other)
end,
ok.

test() ->
couch_server_sup:start_link(test_util:config_files()),
timer:sleep(1000),
put(addr, couch_config:get("httpd", "bind_address", "127.0.0.1")),
put(port, integer_to_list(mochiweb_socket_server:get(couch_httpd, port))),
application:start(inets),

delete_db(),
create_db(),

create_docs(),
create_design_doc(),
query_view(),
check_db_ref_count(),

create_new_doc(<<"doc1000">>),
query_view(),
check_db_ref_count(),

Ref1 = get_db_ref_counter(),
compact_db(),
check_db_ref_count(),
Ref2 = get_db_ref_counter(),
etap:isnt(Ref1, Ref2, "DB ref counter changed"),
etap:is(false, is_process_alive(Ref1), "old DB ref counter is not alive"),

compact_view_group(),
check_db_ref_count(),
Ref3 = get_db_ref_counter(),
etap:is(Ref3, Ref2, "DB ref counter didn't change"),

create_new_doc(<<"doc1001">>),
query_view(),
check_db_ref_count(),

ok = timer:sleep(1000),
delete_db(),
couch_server_sup:stop(),
ok.

admin_user_ctx() ->
{user_ctx, #user_ctx{roles=[<<"_admin">>]}}.

create_db() ->
{ok, #db{main_pid = Pid} = Db} = couch_db:create(
test_db_name(), [admin_user_ctx()]),
put(db_main_pid, Pid),
ok = couch_db:close(Db).

delete_db() ->
couch_server:delete(test_db_name(), [admin_user_ctx()]).

compact_db() ->
{ok, Db} = couch_db:open_int(test_db_name(), []),
ok = couch_db:start_compact(Db),
ok = couch_db:close(Db),
wait_db_compact_done(10).

wait_db_compact_done(0) ->
etap:is(true, false, "DB compaction didn't finish");
wait_db_compact_done(N) ->
{ok, Db} = couch_db:open_int(test_db_name(), []),
ok = couch_db:close(Db),
case is_pid(Db#db.compactor_pid) of
false ->
ok;
true ->
ok = timer:sleep(500),
wait_db_compact_done(N - 1)
end.

compact_view_group() ->
ok = couch_view_compactor:start_compact(test_db_name(), ddoc_name()),
wait_view_compact_done(10).

wait_view_compact_done(0) ->
etap:is(true, false, "view group compaction didn't finish");
wait_view_compact_done(N) ->
{ok, {{_, Code, _}, _Headers, Body}} = http:request(
get,
{db_url() ++ "/_design/" ++ binary_to_list(ddoc_name()) ++ "/_info", []},
[],
[{sync, true}]),
etap:is(Code, 200, "got view group info"),
{Info} = ejson:decode(Body),
{IndexInfo} = couch_util:get_value(<<"view_index">>, Info),
CompactRunning = couch_util:get_value(<<"compact_running">>, IndexInfo),
case CompactRunning of
false ->
ok;
true ->
ok = timer:sleep(500),
wait_view_compact_done(N - 1)
end.

get_db_ref_counter() ->
{ok, #db{fd_ref_counter = Ref} = Db} = couch_db:open_int(test_db_name(), []),
ok = couch_db:close(Db),
Ref.

check_db_ref_count() ->
{ok, #db{fd_ref_counter = Ref} = Db} = couch_db:open_int(test_db_name(), []),
ok = couch_db:close(Db),
etap:is(couch_ref_counter:count(Ref), 2,
"DB ref counter is only held by couch_db and couch_db_updater"),
ok.

create_docs() ->
{ok, Db} = couch_db:open(test_db_name(), [admin_user_ctx()]),
Doc1 = couch_doc:from_json_obj({[
{<<"_id">>, <<"doc1">>},
{<<"value">>, 1}
]}),
Doc2 = couch_doc:from_json_obj({[
{<<"_id">>, <<"doc2">>},
{<<"value">>, 2}

]}),
Doc3 = couch_doc:from_json_obj({[
{<<"_id">>, <<"doc3">>},
{<<"value">>, 3}
]}),
{ok, _} = couch_db:update_docs(Db, [Doc1, Doc2, Doc3]),
couch_db:ensure_full_commit(Db),
couch_db:close(Db).

create_design_doc() ->
{ok, Db} = couch_db:open(test_db_name(), [admin_user_ctx()]),
DDoc = couch_doc:from_json_obj({[
{<<"_id">>, <<"_design/", (ddoc_name())/binary>>},
{<<"language">>, <<"javascript">>},
{<<"views">>, {[
{<<"bar">>, {[
{<<"map">>, <<"function(doc) { emit(doc._id, null); }">>}
]}}
]}}
]}),
{ok, _} = couch_db:update_docs(Db, [DDoc]),
couch_db:ensure_full_commit(Db),
couch_db:close(Db).

create_new_doc(Id) ->
{ok, Db} = couch_db:open(test_db_name(), [admin_user_ctx()]),
Doc666 = couch_doc:from_json_obj({[
{<<"_id">>, Id},
{<<"value">>, 999}
]}),
{ok, _} = couch_db:update_docs(Db, [Doc666]),
couch_db:ensure_full_commit(Db),
couch_db:close(Db).

db_url() ->
"http://" ++ get(addr) ++ ":" ++ get(port) ++ "/" ++
binary_to_list(test_db_name()).

query_view() ->
{ok, {{_, Code, _}, _Headers, _Body}} = http:request(
get,
{db_url() ++ "/_design/" ++ binary_to_list(ddoc_name()) ++
"/_view/bar", []},
[],
[{sync, true}]),
etap:is(Code, 200, "got view response"),
ok.
3 changes: 2 additions & 1 deletion test/etap/Makefile.am
Expand Up @@ -81,4 +81,5 @@ EXTRA_DIST = \
173-os-daemon-cfg-register.t \
180-http-proxy.ini \
180-http-proxy.t \
190-json-stream-parse.t
190-json-stream-parse.t \
200-view-group-no-db-leaks.t

0 comments on commit 0ce2c54

Please sign in to comment.