From 8a37c27864ea001514b8a1fb3acc389a6e20efb4 Mon Sep 17 00:00:00 2001 From: Nick Vatamaniuc Date: Fri, 10 Mar 2017 14:24:01 -0500 Subject: [PATCH] Close idle dbs Previously idle dbs, especially sys_dbs like _replicator shards once opened for scanning would stay open forever. In a large cluster with many _replicator shards that can add up to a significant overhead, mostly in terms of number of active processes. This adds a mechanism to close dbs which have an idle db updater. Before hibernation was used to limit the memory pressure, however that is often not enough in practice. Idle timeout limit can be configured via `couchdb.idle_check_timeout` configuration value. The default is 60 seconds. "infinity" means restoring previous behavior where no idle closing would happen. Due to pending PSE (Pluggable Storage Engine) merge and in order to avoid modifying the #db{} record use process dictionary to store idle timer ref and also the latest configuration value. Configuration value is update from config application every timeout expiry period. (Original idea for this belongs to Paul Davis) COUCHDB-3323 --- src/couch_db_updater.erl | 99 ++++++++++++++++++++++++++++------------ src/couch_server.erl | 28 ++++++++++++ 2 files changed, 99 insertions(+), 28 deletions(-) diff --git a/src/couch_db_updater.erl b/src/couch_db_updater.erl index 78726358..43d51216 100644 --- a/src/couch_db_updater.erl +++ b/src/couch_db_updater.erl @@ -21,6 +21,8 @@ -include_lib("couch/include/couch_db.hrl"). +-define(IDLE_LIMIT_DEFAULT, 60000). + -record(comp_header, { db_header, meta_state @@ -36,6 +38,7 @@ init({DbName, Filepath, Fd, Options}) -> erlang:put(io_priority, {db_update, DbName}), + update_idle_limit_from_config(), case lists:member(create, Options) of true -> % create a new header and writes it to the file @@ -70,7 +73,7 @@ init({DbName, Filepath, Fd, Options}) -> % we don't load validation funs here because the fabric query is liable to % race conditions. Instead see couch_db:validate_doc_update, which loads % them lazily - {ok, Db#db{main_pid = self()}}. + {ok, Db#db{main_pid = self()}, idle_limit()}. terminate(_Reason, Db) -> @@ -84,23 +87,23 @@ terminate(_Reason, Db) -> ok. handle_call(get_db, _From, Db) -> - {reply, {ok, Db}, Db}; + {reply, {ok, Db}, Db, idle_limit()}; handle_call(full_commit, _From, #db{waiting_delayed_commit=nil}=Db) -> - {reply, ok, Db}; % no data waiting, return ok immediately + {reply, ok, Db, idle_limit()}; % no data waiting, return ok immediately handle_call(full_commit, _From, Db) -> - {reply, ok, commit_data(Db)}; + {reply, ok, commit_data(Db), idle_limit()}; handle_call({full_commit, RequiredSeq}, _From, Db) when RequiredSeq =< Db#db.committed_update_seq -> - {reply, ok, Db}; + {reply, ok, Db, idle_limit()}; handle_call({full_commit, _}, _, Db) -> - {reply, ok, commit_data(Db)}; % commit the data and return ok + {reply, ok, commit_data(Db), idle_limit()}; % commit the data and return ok handle_call(start_compact, _From, Db) -> - {noreply, NewDb} = handle_cast(start_compact, Db), - {reply, {ok, NewDb#db.compactor_pid}, NewDb}; + {noreply, NewDb, _Timeout} = handle_cast(start_compact, Db), + {reply, {ok, NewDb#db.compactor_pid}, NewDb, idle_limit()}; handle_call(compactor_pid, _From, #db{compactor_pid = Pid} = Db) -> - {reply, Pid, Db}; + {reply, Pid, Db, idle_limit()}; handle_call(cancel_compact, _From, #db{compactor_pid = nil} = Db) -> - {reply, ok, Db}; + {reply, ok, Db, idle_limit()}; handle_call(cancel_compact, _From, #db{compactor_pid = Pid} = Db) -> unlink(Pid), exit(Pid, kill), @@ -108,12 +111,12 @@ handle_call(cancel_compact, _From, #db{compactor_pid = Pid} = Db) -> ok = couch_file:delete(RootDir, Db#db.filepath ++ ".compact"), Db2 = Db#db{compactor_pid = nil}, ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), - {reply, ok, Db2}; + {reply, ok, Db2, idle_limit()}; handle_call(increment_update_seq, _From, Db) -> Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}), ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), couch_event:notify(Db#db.name, updated), - {reply, {ok, Db2#db.update_seq}, Db2}; + {reply, {ok, Db2#db.update_seq}, Db2, idle_limit()}; handle_call({set_security, NewSec}, _From, #db{compression = Comp} = Db) -> {ok, Ptr, _} = couch_file:append_term( @@ -121,17 +124,17 @@ handle_call({set_security, NewSec}, _From, #db{compression = Comp} = Db) -> Db2 = commit_data(Db#db{security=NewSec, security_ptr=Ptr, update_seq=Db#db.update_seq+1}), ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), - {reply, ok, Db2}; + {reply, ok, Db2, idle_limit()}; handle_call({set_revs_limit, Limit}, _From, Db) -> Db2 = commit_data(Db#db{revs_limit=Limit, update_seq=Db#db.update_seq+1}), ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), - {reply, ok, Db2}; + {reply, ok, Db2, idle_limit()}; handle_call({purge_docs, _IdRevs}, _From, #db{compactor_pid=Pid}=Db) when Pid /= nil -> - {reply, {error, purge_during_compaction}, Db}; + {reply, {error, purge_during_compaction}, Db, idle_limit()}; handle_call({purge_docs, IdRevs}, _From, Db) -> #db{ fd = Fd, @@ -199,13 +202,13 @@ handle_call({purge_docs, IdRevs}, _From, Db) -> ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), couch_event:notify(Db#db.name, updated), - {reply, {ok, couch_db_header:purge_seq(NewHeader), IdRevsPurged}, Db2}. + {reply, {ok, couch_db_header:purge_seq(NewHeader), IdRevsPurged}, Db2, idle_limit()}. handle_cast({load_validation_funs, ValidationFuns}, Db) -> Db2 = Db#db{validate_doc_funs = ValidationFuns}, ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), - {noreply, Db2}; + {noreply, Db2, idle_limit()}; handle_cast(start_compact, Db) -> case Db#db.compactor_pid of nil -> @@ -213,10 +216,10 @@ handle_cast(start_compact, Db) -> Pid = spawn_link(fun() -> start_copy_compact(Db) end), Db2 = Db#db{compactor_pid=Pid}, ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), - {noreply, Db2}; + {noreply, Db2, idle_limit()}; _ -> % compact currently running, this is a no-op - {noreply, Db} + {noreply, Db, idle_limit()} end; handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) -> {ok, NewFd} = couch_file:open(CompactFilepath), @@ -256,7 +259,7 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) -> ok = gen_server:call(couch_server, {db_updated, NewDb3}, infinity), couch_event:notify(NewDb3#db.name, compacted), couch_log:info("Compaction for db \"~s\" completed.", [Db#db.name]), - {noreply, NewDb3#db{compactor_pid=nil}}; + {noreply, NewDb3#db{compactor_pid=nil}, idle_limit()}; false -> couch_log:info("Compaction file still behind main file " "(update seq=~p. compact update seq=~p). Retrying.", @@ -265,9 +268,12 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) -> Pid = spawn_link(fun() -> start_copy_compact(Db) end), Db2 = Db#db{compactor_pid=Pid}, ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), - {noreply, Db2} + {noreply, Db2, idle_limit()} end; +handle_cast(wakeup, Db) -> + {noreply, Db, idle_limit()}; + handle_cast(Msg, #db{name = Name} = Db) -> couch_log:error("Database `~s` updater received unexpected cast: ~p", [Name, Msg]), @@ -314,30 +320,41 @@ handle_info({update_docs, Client, GroupedDocs, NonRepDocs, MergeConflicts, false -> Db2 end, - {noreply, Db3, hibernate} + {noreply, Db3, hibernate_if_no_idle_limit()} catch throw: retry -> [catch(ClientPid ! {retry, self()}) || ClientPid <- Clients], - {noreply, Db, hibernate} + {noreply, Db, hibernate_if_no_idle_limit()} end; handle_info(delayed_commit, #db{waiting_delayed_commit=nil}=Db) -> %no outstanding delayed commits, ignore - {noreply, Db}; + {noreply, Db, idle_limit()}; handle_info(delayed_commit, Db) -> case commit_data(Db) of Db -> - {noreply, Db}; + {noreply, Db, idle_limit()}; Db2 -> ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), - {noreply, Db2} + {noreply, Db2, idle_limit()} end; handle_info({'EXIT', _Pid, normal}, Db) -> - {noreply, Db}; + {noreply, Db, idle_limit()}; handle_info({'EXIT', _Pid, Reason}, Db) -> {stop, Reason, Db}; handle_info({'DOWN', Ref, _, _, Reason}, #db{fd_monitor=Ref, name=Name} = Db) -> couch_log:error("DB ~s shutting down - Fd ~p", [Name, Reason]), - {stop, normal, Db#db{fd=undefined, fd_monitor=closed}}. + {stop, normal, Db#db{fd=undefined, fd_monitor=closed}}; +handle_info(timeout, #db{name = DbName} = Db) -> + case couch_db:is_idle(Db) of + true -> + ok = couch_server:close_db_if_idle(DbName); + false -> + ok + end, + update_idle_limit_from_config(), + gen_server:cast(self(), wakeup), + {noreply, Db, hibernate}. + code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -1454,3 +1471,29 @@ default_security_object(_DbName) -> "everyone" -> [] end. + +% These functions rely on using the process dictionary. This is usually frowned +% upon howver in this case it is done to avoid changing to a different server state +% record. Once PSE (Pluggable Storage Engine) code lands this should be moved to the +% #db{} record. + +update_idle_limit_from_config() -> + Default = integer_to_list(?IDLE_LIMIT_DEFAULT), + IdleLimit = case config:get("couchdb", "idle_check_timeout", Default) of + "infinity" -> + infinity; + Milliseconds -> + list_to_integer(Milliseconds) + end, + put(idle_limit, IdleLimit). + +idle_limit() -> + get(idle_limit). + +hibernate_if_no_idle_limit() -> + case idle_limit() of + infinity -> + hibernate; + Timeout when is_integer(Timeout) -> + Timeout + end. diff --git a/src/couch_server.erl b/src/couch_server.erl index 893b957d..78d9efa9 100644 --- a/src/couch_server.erl +++ b/src/couch_server.erl @@ -21,6 +21,7 @@ -export([handle_cast/2,code_change/3,handle_info/2,terminate/2]). -export([dev_start/0,is_admin/2,has_admins/0,get_stats/0]). -export([close_lru/0]). +-export([close_db_if_idle/1]). % config_listener api -export([handle_config_change/5, handle_config_terminate/3]). @@ -173,6 +174,15 @@ hash_admin_passwords(Persist) -> config:set("admins", User, ?b2l(HashedPassword), Persist) end, couch_passwords:get_unhashed_admins()). +close_db_if_idle(DbName) -> + case ets:lookup(couch_dbs, DbName) of + [#db{}] -> + gen_server:cast(couch_server, {close_db_if_idle, DbName}); + _ -> + ok + end. + + init([]) -> % read config and register for configuration changes @@ -506,6 +516,24 @@ handle_cast({update_lru, DbName}, #server{lru = Lru, update_lru_on_read=true} = {noreply, Server#server{lru = couch_lru:update(DbName, Lru)}}; handle_cast({update_lru, _DbName}, Server) -> {noreply, Server}; +handle_cast({close_db_if_idle, DbName}, Server) -> + case ets:update_element(couch_dbs, DbName, {#db.fd_monitor, locked}) of + true -> + [#db{main_pid = Pid} = Db] = ets:lookup(couch_dbs, DbName), + case couch_db:is_idle(Db) of + true -> + true = ets:delete(couch_dbs, DbName), + true = ets:delete(couch_dbs_pid_to_name, Pid), + exit(Pid, kill), + {noreply, db_closed(Server, Db#db.options)}; + false -> + true = ets:update_element(couch_dbs, DbName, {#db.fd_monitor, nil}), + {noreply, Server} + end; + false -> + {noreply, Server} + end; + handle_cast(Msg, Server) -> {stop, {unknown_cast_message, Msg}, Server}.