Skip to content
Browse files

Add an index keyed on LRU for faster candidate ID

Our current implementation for closing an LRU DB involves a full scan
of a public ets table.  This scan blocks all other activity in
couch_server and can become a serious bottleneck when the LRU cache hit
rate drops too low.  In the worst-case all_dbs_active scenario we end up
with O(N**2) algorithmic complexity.

This patch adds a new index keyed on LRU for faster access to the least
recently used databases.  It also moves the ets table to a dict on the
couch_server heap.  The downside is an increased message rate inbound on
the couch_server, as clients are no longer allowed to update the LRU
data structures without sending a message.

BugzID: 12879

Conflicts:
	apps/couch/src/couch_server.erl
  • Loading branch information...
1 parent 7967c2f commit 7bd071836a08722cb14479261507aec1d071e2d2 @kocolosk kocolosk committed with Robert Newson Feb 6, 2012
Showing with 71 additions and 59 deletions.
  1. +49 −0 apps/couch/src/couch_lru.erl
  2. +22 −59 apps/couch/src/couch_server.erl
View
49 apps/couch/src/couch_lru.erl
@@ -0,0 +1,49 @@
+-module(couch_lru).
+-export([new/0, insert/2, update/2, close/1]).
+
+-include("couch_db.hrl").
+
+new() ->
+ {gb_trees:empty(), dict:new()}.
+
+insert(DbName, {Tree0, Dict0}) ->
+ Lru = now(),
+ {gb_trees:insert(Lru, DbName, Tree0), dict:store(DbName, Lru, Dict0)}.
+
+update(DbName, {Tree0, Dict0}) ->
+ case dict:find(DbName, Dict0) of
+ {ok, Old} ->
+ New = now(),
+ Tree = gb_trees:insert(New, DbName, gb_trees:delete(Old, Tree0)),
+ Dict = dict:store(DbName, New, Dict0),
+ {Tree, Dict};
+ error ->
+ % We closed this database before processing the update. Ignore
+ {Tree0, Dict0}
+ end.
+
+close({Tree, _} = Cache) ->
+ close_int(gb_trees:next(gb_trees:iterator(Tree)), Cache).
+
+%% internals
+
+close_int(none, _) ->
+ erlang:error(all_dbs_active);
+close_int({Lru, DbName, Iter}, {Tree, Dict} = Cache) ->
+ case ets:update_element(couch_dbs, DbName, {#db.fd_monitor, locked}) of
+ true ->
+ [#db{main_pid = Pid} = Db] = ets:lookup(couch_dbs, DbName),
+ case couch_db:is_idle(Db) of true ->
+ true = ets:delete(couch_dbs, DbName),
+ exit(Pid, kill),
+ {gb_trees:delete(Lru, Tree), dict:erase(DbName, Dict)};
+ false ->
+ true = ets:update_element(couch_dbs, DbName, {#db.fd_monitor, nil}),
+ twig:log(warn, "~p old active ~s", [?MODULE, Db#db.name]),
+ close_int(gb_trees:next(Iter), update(DbName, Cache))
+ end;
+ false ->
+ NewTree = gb_trees:delete(Lru, Tree),
+ NewIter = gb_trees:iterator(NewTree),
+ close_int(gb_trees:next(NewIter), {NewTree, dict:erase(DbName, Dict)})
+ end.
View
81 apps/couch/src/couch_server.erl
@@ -26,7 +26,8 @@
dbname_regexp,
max_dbs_open=100,
dbs_open=0,
- start_time=""
+ start_time="",
+ lru = couch_lru:new()
}).
dev_start() ->
@@ -47,19 +48,22 @@ open(DbName, Options) ->
Ctx = couch_util:get_value(user_ctx, Options, #user_ctx{}),
case ets:lookup(couch_dbs, DbName) of
[#db{fd=Fd, fd_monitor=Lock} = Db] when Lock =/= locked ->
- ets:insert(couch_lru, {DbName, now()}),
+ update_lru(DbName),
{ok, Db#db{user_ctx=Ctx, fd_monitor=erlang:monitor(process,Fd)}};
_ ->
Timeout = couch_util:get_value(timeout, Options, infinity),
case gen_server:call(couch_server, {open, DbName, Options}, Timeout) of
{ok, #db{fd=Fd} = Db} ->
- ets:insert(couch_lru, {DbName, now()}),
+ update_lru(DbName),
{ok, Db#db{user_ctx=Ctx, fd_monitor=erlang:monitor(process,Fd)}};
Error ->
Error
end
end.
+update_lru(DbName) ->
+ gen_server:cast(couch_server, {update_lru, DbName}).
+
close_lru() ->
gen_server:call(couch_server, close_lru).
@@ -134,7 +138,6 @@ init([]) ->
"(\\.[0-9]{10,})?$" % but allow an optional shard timestamp at the end
),
ets:new(couch_dbs, [set, protected, named_table, {keypos, #db.name}]),
- ets:new(couch_lru, [set, public, named_table]),
process_flag(trap_exit, true),
{ok, #server{root_dir=RootDir,
dbname_regexp=RegExp,
@@ -180,49 +183,11 @@ all_databases(Prefix) ->
maybe_close_lru_db(#server{dbs_open=NumOpen, max_dbs_open=MaxOpen}=Server)
when NumOpen < MaxOpen ->
{ok, Server};
-maybe_close_lru_db(#server{dbs_open=NumOpen}=Server) ->
- % must free up the lru db.
- case try_close_lru(now()) of
- ok ->
- {ok, Server#server{dbs_open=NumOpen - 1}};
- Error -> Error
- end.
-
-find_oldest_db({DbName, Lru}, Acc) ->
- erlang:min({Lru, DbName}, Acc).
-
-try_close_lru(StartTime) ->
- case ets:foldl(fun find_oldest_db/2, {StartTime, nil}, couch_lru) of
- {StartTime, nil} ->
- {error, all_dbs_active};
- {_, DbName} ->
- % There may exist an extremely small possibility of a race
- % condition here, if a process could lookup the DB before the lock,
- % but fail to monitor the fd before the is_idle check.
- %
- % If we do hit this race condition the behavior is that the process
- % grabbing the database will end up inserting a value into the
- % couch_lru table. Its possible that we end up picking that up
- % as the DbName above to close. So we here we'll just remove the
- % couch_lru entry and ignore it.
- case ets:update_element(couch_dbs, DbName, {#db.fd_monitor, locked}) of
- true ->
- [#db{main_pid = Pid} = Db] = ets:lookup(couch_dbs, DbName),
- case couch_db:is_idle(Db) of true ->
- true = ets:delete(couch_dbs, DbName),
- true = ets:delete(couch_lru, DbName),
- exit(Pid, kill),
- ok;
- false ->
- Update = {#db.fd_monitor, nil},
- true = ets:update_element(couch_dbs, DbName, Update),
- true = ets:insert(couch_lru, {DbName, now()}),
- try_close_lru(StartTime)
- end;
- false ->
- true = ets:delete(couch_lru, DbName),
- try_close_lru(StartTime)
- end
+maybe_close_lru_db(#server{dbs_open=NumOpen, lru=Lru}=Server) ->
+ try
+ {ok, Server#server{dbs_open = NumOpen - 1, lru = couch_lru:close(Lru)}}
+ catch error:all_dbs_active ->
+ {error, all_dbs_active}
end.
open_async(Server, From, DbName, Filepath, Options) ->
@@ -241,12 +206,11 @@ open_async(Server, From, DbName, Filepath, Options) ->
}),
Server#server{dbs_open=Server#server.dbs_open + 1}.
-handle_call(close_lru, _From, #server{dbs_open=N} = Server) ->
- case try_close_lru(now()) of
- ok ->
- {reply, ok, Server#server{dbs_open = N-1}};
- Error ->
- {reply, Error, Server}
+handle_call(close_lru, _From, #server{dbs_open=N, lru=Lru} = Server) ->
+ try
+ {reply, ok, Server#server{dbs_open = N-1, lru = couch_lru:close(Lru)}}
+ catch error:all_dbs_active ->
+ {reply, {error, all_dbs_active}, Server}
end;
handle_call(open_dbs_count, _From, Server) ->
{reply, Server#server.dbs_open, Server};
@@ -262,14 +226,13 @@ handle_call({open_result, DbName, {ok, Db}, Options}, _From, Server) ->
[#db{compactor_pid=Froms}] = ets:lookup(couch_dbs, DbName),
[gen_server:reply(From, {ok, Db}) || From <- Froms],
true = ets:insert(couch_dbs, Db),
- true = ets:insert(couch_lru, {DbName, now()}),
case lists:member(create, Options) of
true ->
couch_db_update_notifier:notify({created, DbName});
false ->
ok
end,
- {reply, ok, Server};
+ {reply, ok, Server#server{lru = couch_lru:insert(DbName, Server#server.lru)}};
handle_call({open_result, DbName, Error, _Options}, _From, Server) ->
% icky hack of field values - compactor_pid used to store clients
[#db{compactor_pid=Froms}] = ets:lookup(couch_dbs, DbName),
@@ -357,12 +320,12 @@ handle_call({delete, DbName, _Options}, _From, Server) ->
Error ->
{reply, Error, Server}
end;
-handle_call({db_updated, #db{name = DbName} = Db}, _From, Server) ->
+handle_call({db_updated, #db{name = DbName} = Db}, _, #server{lru=Lru}=Server) ->
true = ets:insert(couch_dbs, Db),
- true = ets:insert(couch_lru, {DbName, now()}),
- {reply, ok, Server}.
-
+ {reply, ok, Server#server{lru = couch_lru:update(DbName, Lru)}}.
+handle_cast({update_lru, DbName}, #server{lru = Lru} = Server) ->
+ {noreply, Server#server{lru = couch_lru:update(DbName, Lru)}};
handle_cast(Msg, Server) ->
{stop, {unknown_cast_message, Msg}, Server}.

0 comments on commit 7bd0718

Please sign in to comment.
Something went wrong with that request. Please try again.