Skip to content

Commit

Permalink
MB-6612: open databases synchronously
Browse files Browse the repository at this point in the history
Problem that we see in MB-6612 is that all_known_databases_with_prefix
is scanning couch_dbs_by_name ets table. Given that previously we also
stored dbs being opened that lead to condition where some vbuckets
would be thought as present while in fact they are simply being tried
to be opened and do not actually exist.

Given that we don't need async db opening in couchbase fork of couchdb
for quite some time it seems logical to just get rid of async open and
thus never have onopened dbs in ets table.

Change-Id: I5d1dad5f60c64d197e143cf4a7be1996a4fc4ea2
Reviewed-on: http://review.couchbase.org/20812
Reviewed-by: Damien Katz <damien@couchbase.com>
Tested-by: Aliaksey Kandratsenka <alkondratenko@gmail.com>
  • Loading branch information
Aliaksey Kandratsenka authored and Peter Wansch committed Sep 25, 2012
1 parent 7587aa0 commit fa5b6fe
Showing 1 changed file with 26 additions and 126 deletions.
152 changes: 26 additions & 126 deletions src/couchdb/couch_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -210,109 +210,48 @@ all_known_databases_with_prefix_loop(Prefix, PreLen, K, Acc) ->
Acc
end.

open_async(Server, Froms, DbName, Filepath, Options) ->
Parent = self(),
Opener = spawn_link(fun() ->
Res = couch_db:start_link(DbName, Filepath, Options),
gen_server:call(
Parent, {open_result, DbName, Res, Options}, infinity
),
unlink(Parent),
case Res of
{ok, DbReader} ->
unlink(DbReader);
_ ->
do_open_db(DbName, Server, Options, {FromPid, _}) ->
DbNameList = binary_to_list(DbName),
case check_dbname(Server, DbNameList) of
ok ->
Filepath = get_full_filename(Server, DbNameList),
case couch_db:start_link(DbName, Filepath, Options) of
{ok, DbPid} ->
true = ets:insert(couch_dbs_by_name, {DbName, {opened, DbPid}}),
true = ets:insert(couch_dbs_by_pid, {DbPid, DbName}),
case lists:member(create, Options) of
true ->
couch_db_update_notifier:notify({created, DbName});
false ->
ok
end
end),
true = ets:insert(couch_dbs_by_name, {DbName, {opening, Opener, Froms}}),
true = ets:insert(couch_dbs_by_pid, {Opener, DbName}),
DbsOpen = case lists:member(sys_db, Options) of
true ->
true = ets:insert(couch_sys_dbs, {DbName, true}),
Server#server.dbs_open;
false ->
Server#server.dbs_open + 1
end,
Server#server{dbs_open = DbsOpen}.
end,
DbsOpen = Server#server.dbs_open + 1,
NewServer = Server#server{dbs_open = DbsOpen},
Reply = (catch couch_db:open_ref_counted(DbPid, FromPid)),
{reply, Reply, NewServer};
Error ->
{reply, Error, Server}
end;
Error ->
{reply, Error, Server}
end.

handle_call(get_server, _From, Server) ->
{reply, {ok, Server}, Server};
handle_call(get_known_databases, _From, Server) ->
Names = ets:match(couch_dbs_by_name, {'$1', '_'}),
{reply, [N || [N] <- Names], Server};
handle_call({open_result, DbName, {ok, OpenedDbPid}, Options}, From, Server) ->
case ets:lookup(couch_dbs_by_name, DbName) of
[{DbName, {opening, Opener, Froms}}] ->
link(OpenedDbPid),
lists:foreach(fun({FromPid, _} = From2) ->
gen_server:reply(From2,
catch couch_db:open_ref_counted(OpenedDbPid, FromPid))
end, Froms),
true = ets:insert(couch_dbs_by_name,
{DbName, {opened, OpenedDbPid}}),
true = ets:delete(couch_dbs_by_pid, Opener),
true = ets:insert(couch_dbs_by_pid, {OpenedDbPid, DbName}),
case lists:member(create, Options) of
true ->
couch_db_update_notifier:notify({created, DbName});
false ->
ok
end,
{reply, ok, Server};
[] ->
{OpenerPid, _Ref} = From,
false = is_process_alive(OpenerPid),
% db file previously deleted
couch_util:shutdown_sync(OpenedDbPid),
{noreply, Server}
end;
handle_call({open_result, DbName, Error, Options}, From, Server) ->
case ets:lookup(couch_dbs_by_name, DbName) of
[{DbName, {opening, Opener, Froms}}] ->
% only notify the first openner, retry for all others since it's possible
% that an external writer created the file after the first open request,
% but before the subsequent open requests
{FromsNext, [FirstOpen]} = lists:split(length(Froms) - 1, Froms),
gen_server:reply(FirstOpen, Error),
true = ets:delete(couch_dbs_by_name, DbName),
true = ets:delete(couch_dbs_by_pid, Opener),
DbsOpen = case lists:member(sys_db, Options) of
true ->
true = ets:delete(couch_sys_dbs, DbName),
Server#server.dbs_open;
false ->
Server#server.dbs_open - 1
end,
Server2 = Server#server{dbs_open = DbsOpen},
case FromsNext of
[] ->
Server3 = Server2;
_ ->
% Retry
Filepath = get_full_filename(Server, binary_to_list(DbName)),
Server3 = open_async(Server2, FromsNext, DbName, Filepath, Options)
end,
{reply, ok, Server3};
[] ->
{OpenerPid, _Ref} = From,
false = is_process_alive(OpenerPid),
{noreply, Server}
end;
handle_call({open, DbName, Options}, {FromPid,_}=From, Server) ->
case ets:lookup(couch_dbs_by_name, DbName) of
[] ->
open_db(DbName, Server, Options, [From]);
[{_, {opening, Opener, Froms}}] ->
true = ets:insert(couch_dbs_by_name, {DbName, {opening, Opener, [From|Froms]}}),
{noreply, Server};
do_open_db(DbName, Server, Options, From);
[{_, {opened, MainPid}}] ->
{reply, couch_db:open_ref_counted(MainPid, FromPid), Server}
end;
handle_call({create, DbName, Options}, From, Server) ->
case ets:lookup(couch_dbs_by_name, DbName) of
[] ->
open_db(DbName, Server, [create | Options], [From]);
do_open_db(DbName, Server, [create | Options], From);
[_AlreadyRunningDb] ->
{reply, file_exists, Server}
end;
Expand All @@ -332,12 +271,6 @@ handle_call({delete, DbName, _Options}, _From, Server) ->
UpdateState =
case ets:lookup(couch_dbs_by_name, DbName) of
[] -> false;
[{_, {opening, Pid, Froms}}] ->
couch_util:shutdown_sync(Pid),
true = ets:delete(couch_dbs_by_name, DbName),
true = ets:delete(couch_dbs_by_pid, Pid),
[gen_server:reply(F, not_found) || F <- Froms],
true;
[{_, {opened, Pid}}] ->
couch_util:shutdown_sync(Pid),
true = ets:delete(couch_dbs_by_name, DbName),
Expand Down Expand Up @@ -383,39 +316,6 @@ code_change(_OldVsn, State, _Extra) ->

handle_info({'EXIT', _Pid, config_change}, Server) ->
{stop, shutdown, Server};
handle_info({'EXIT', Pid, snappy_nif_not_loaded}, Server) ->
Server2 = case ets:lookup(couch_dbs_by_pid, Pid) of
[{Pid, Db}] ->
[{Db, {opening, Pid, Froms}}] = ets:lookup(couch_dbs_by_name, Db),
Msg = io_lib:format("To open the database `~s`, Apache CouchDB "
"must be built with Erlang OTP R13B04 or higher.", [Db]),
?LOG_ERROR(Msg, []),
lists:foreach(
fun(F) -> gen_server:reply(F, {bad_otp_release, Msg}) end,
Froms),
true = ets:delete(couch_dbs_by_name, Db),
true = ets:delete(couch_dbs_by_pid, Pid),
case ets:lookup(couch_sys_dbs, Db) of
[{Db, _}] ->
true = ets:delete(couch_sys_dbs, Db),
Server;
[] ->
Server#server{dbs_open = Server#server.dbs_open - 1}
end;
_ ->
Server
end,
{noreply, Server2};
handle_info(Error, _Server) ->
?LOG_ERROR("Unexpected message, restarting couch_server: ~p", [Error]),
exit(kill).

open_db(DbName, Server, Options, Froms) ->
DbNameList = binary_to_list(DbName),
case check_dbname(Server, DbNameList) of
ok ->
Filepath = get_full_filename(Server, DbNameList),
{noreply, open_async(Server, Froms, DbName, Filepath, Options)};
Error ->
{reply, Error, Server}
end.

0 comments on commit fa5b6fe

Please sign in to comment.