Skip to content

Commit

Permalink
Merge pull request #4491 from apache/couch_index_fixes
Browse files Browse the repository at this point in the history
Couch index fixes
  • Loading branch information
rnewson committed Mar 22, 2023
2 parents 189db65 + 224d676 commit 0073e76
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 63 deletions.
25 changes: 18 additions & 7 deletions src/couch_event/src/couch_event_listener_mfa.erl
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,24 @@ terminate(_Reason, _MFA) ->
ok.

handle_event(DbName, Event, #st{mod = Mod, func = Func, state = State} = St) ->
case (catch Mod:Func(DbName, Event, State)) of
{ok, NewState} ->
{ok, St#st{state = NewState}};
stop ->
{stop, normal, St};
Else ->
erlang:error(Else)
try
case Mod:Func(DbName, Event, State) of
{ok, NewState} ->
{ok, St#st{state = NewState}};
stop ->
{stop, normal, St};
Else ->
couch_log:error("~p: else in handle_event for db ~p, event ~p, else ~p", [
?MODULE, DbName, Event, Else
]),
erlang:error(Else)
end
catch
Class:Reason:Stack ->
couch_log:error("~p: ~p in handle_event for db ~p, event ~p, reason ~p, stack ~p", [
?MODULE, Class, DbName, Event, Reason, Stack
]),
erlang:raise(Class, Reason, Stack)
end.

handle_cast(shutdown, St) ->
Expand Down
154 changes: 99 additions & 55 deletions src/couch_index/src/couch_index_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
-export([handle_call/3, handle_cast/2, handle_info/2]).

% Sharding functions
-export([num_servers/0, server_name/1, by_sig/1, by_pid/1, by_db/1]).
-export([num_servers/0, server_name/1, by_sig/1, by_pid/1, by_db/1, openers/1]).
-export([aggregate_queue_len/0, names/0]).

% Exported for callbacks
Expand All @@ -41,7 +41,8 @@
server_name,
by_sig,
by_pid,
by_db
by_db,
openers
}).

start_link(N) ->
Expand Down Expand Up @@ -129,6 +130,7 @@ init([N]) ->
ets:new(by_sig(N), [protected, set, named_table]),
ets:new(by_pid(N), [private, set, named_table]),
ets:new(by_db(N), [protected, bag, named_table]),
ets:new(openers(N), [protected, set, named_table]),
RootDir = couch_index_util:root_dir(),
% We only need one of the index servers to nuke this on startup.
case N of
Expand All @@ -140,7 +142,8 @@ init([N]) ->
server_name = server_name(N),
by_sig = by_sig(N),
by_pid = by_pid(N),
by_db = by_db(N)
by_db = by_db(N),
openers = openers(N)
},
ok = config:listen_for_changes(?MODULE, St),
couch_event:link_listener(?MODULE, handle_db_event, St, [all_dbs]),
Expand All @@ -154,7 +157,8 @@ terminate(_Reason, State) ->
handle_call({get_index, {_Mod, _IdxState, DbName, Sig} = Args}, From, State) ->
case ets:lookup(State#st.by_sig, {DbName, Sig}) of
[] ->
spawn_link(fun() -> new_index(Args) end),
Pid = spawn_link(fun() -> new_index(Args) end),
ets:insert(State#st.openers, {Pid, {DbName, Sig}}),
ets:insert(State#st.by_sig, {{DbName, Sig}, [From]}),
{noreply, State};
[{_, Waiters}] when is_list(Waiters) ->
Expand All @@ -163,15 +167,17 @@ handle_call({get_index, {_Mod, _IdxState, DbName, Sig} = Args}, From, State) ->
[{_, Pid}] when is_pid(Pid) ->
{reply, {ok, Pid}, State}
end;
handle_call({async_open, {DbName, DDocId, Sig}, {ok, Pid}}, _From, State) ->
handle_call({async_open, {DbName, DDocId, Sig}, {ok, Pid}}, {OpenerPid, _}, State) ->
[{_, Waiters}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
[gen_server:reply(From, {ok, Pid}) || From <- Waiters],
link(Pid),
ets:delete(State#st.openers, OpenerPid),
add_to_ets(DbName, Sig, DDocId, Pid, State),
{reply, ok, State};
handle_call({async_error, {DbName, _DDocId, Sig}, Error}, _From, State) ->
handle_call({async_error, {DbName, _DDocId, Sig}, Error}, {OpenerPid, _}, State) ->
[{_, Waiters}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
[gen_server:reply(From, Error) || From <- Waiters],
ets:delete(State#st.openers, OpenerPid),
ets:delete(State#st.by_sig, {DbName, Sig}),
{reply, ok, State};
handle_call({reset_indexes, DbName}, _From, State) ->
Expand All @@ -192,19 +198,31 @@ handle_cast({add_to_ets, [Pid, DbName, DDocId, Sig]}, State) ->
{noreply, State};
handle_cast({rem_from_ets, [DbName, DDocId, Sig]}, State) ->
ets:delete_object(State#st.by_db, {DbName, {DDocId, Sig}}),
{noreply, State};
handle_cast({rem_from_ets, [DbName]}, State) ->
rem_from_ets(DbName, State),
{noreply, State}.

handle_info({'EXIT', Pid, Reason}, Server) ->
Cleanup = fun(DbName, Sig) ->
DDocIds = [
DDocId
|| {_, {DDocId, _}} <-
ets:match_object(Server#st.by_db, {DbName, {'$1', Sig}})
],
rem_from_ets(DbName, Sig, DDocIds, Pid, Server)
end,
case ets:lookup(Server#st.by_pid, Pid) of
[{Pid, {DbName, Sig}}] ->
DDocIds = [
DDocId
|| {_, {DDocId, _}} <-
ets:match_object(Server#st.by_db, {DbName, {'$1', Sig}})
],
rem_from_ets(DbName, Sig, DDocIds, Pid, Server);
Cleanup(DbName, Sig);
[] when Reason /= normal ->
exit(Reason);
case ets:lookup(Server#st.openers, Pid) of
[{Pid, {DbName, Sig}}] ->
ets:delete(Server#st.openers, Pid),
Cleanup(DbName, Sig);
[] ->
exit(Reason)
end;
_Else ->
ok
end,
Expand Down Expand Up @@ -298,55 +316,78 @@ rem_from_ets(DbName, Sig, DDocIds, Pid, #st{} = St) ->
DDocIds
).

rem_from_ets(DbName, #st{} = State) ->
SigDDocIds = lists:foldl(
fun({_, {DDocId, Sig}}, DDict) ->
dict:append(Sig, DDocId, DDict)
end,
dict:new(),
ets:lookup(State#st.by_db, DbName)
),
Fun = fun({Sig, DDocIds}) ->
[{_, Pid}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
unlink(Pid),
receive
{'EXIT', Pid, _} ->
ok
after 0 ->
ok
end,
rem_from_ets(DbName, Sig, DDocIds, Pid, State)
end,
lists:foreach(Fun, dict:to_list(SigDDocIds)).

handle_db_event(DbName, created, St) ->
gen_server:cast(St#st.server_name, {reset_indexes, DbName}),
{ok, St};
handle_db_event(DbName, deleted, St) ->
gen_server:cast(St#st.server_name, {reset_indexes, DbName}),
{ok, St};
handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated, DDocId}, St) ->
DDocResult = couch_util:with_db(DbName, fun(Db) ->
couch_db:open_doc(Db, DDocId, [ejson_body, ?ADMIN_CTX])
end),
LocalShards =
try
mem3:local_shards(mem3:dbname(DbName))
catch
Class:Msg ->
couch_log:warning(
"~p got ~p:~p when fetching local shards for ~p",
[?MODULE, Class, Msg, DbName]
),
[]
end,
DbShards = [mem3:name(Sh) || Sh <- LocalShards],
lists:foreach(
fun(DbShard) ->
lists:foreach(
fun({_DbShard, {_DDocId, Sig}}) ->
% check if there are other ddocs with the same Sig for the same db
SigDDocs = ets:match_object(St#st.by_db, {DbShard, {'$1', Sig}}),
if
length(SigDDocs) > 1 ->
% remove records from by_db for this DDoc
Args = [DbShard, DDocId, Sig],
gen_server:cast(St#st.server_name, {rem_from_ets, Args});
true ->
% single DDoc with this Sig - close couch_index processes
case ets:lookup(St#st.by_sig, {DbShard, Sig}) of
[{_, IndexPid}] ->
(catch gen_server:cast(IndexPid, {ddoc_updated, DDocResult}));
[] ->
[]
end
end
end,
ets:match_object(St#st.by_db, {DbShard, {DDocId, '$1'}})
)
end,
DbShards
),
{ok, St};
%% this handle_db_event function must not crash (or it takes down the couch_index_server)
try
DDocResult = couch_util:with_db(DbName, fun(Db) ->
couch_db:open_doc(Db, DDocId, [ejson_body, ?ADMIN_CTX])
end),
LocalShards = mem3:local_shards(mem3:dbname(DbName)),
DbShards = [mem3:name(Sh) || Sh <- LocalShards],
lists:foreach(
fun(DbShard) ->
lists:foreach(
fun({_DbShard, {_DDocId, Sig}}) ->
% check if there are other ddocs with the same Sig for the same db
SigDDocs = ets:match_object(St#st.by_db, {DbShard, {'$1', Sig}}),
if
length(SigDDocs) > 1 ->
% remove records from by_db for this DDoc
Args = [DbShard, DDocId, Sig],
gen_server:cast(St#st.server_name, {rem_from_ets, Args});
true ->
% single DDoc with this Sig - close couch_index processes
case ets:lookup(St#st.by_sig, {DbShard, Sig}) of
[{_, IndexPid}] ->
(catch gen_server:cast(
IndexPid, {ddoc_updated, DDocResult}
));
[] ->
[]
end
end
end,
ets:match_object(St#st.by_db, {DbShard, {DDocId, '$1'}})
)
end,
DbShards
),
{ok, St}
catch
Class:Reason:Stack ->
couch_log:warning("~p: handle_db_event ~p for db ~p, reason ~p, stack ~p", [
?MODULE, Class, DbName, Reason, Stack
]),
gen_server:cast(St#st.server_name, {rem_from_ets, [DbName]}),
{ok, St}
end;
handle_db_event(DbName, {ddoc_updated, DDocId}, St) ->
lists:foreach(
fun({_DbName, {_DDocId, Sig}}) ->
Expand Down Expand Up @@ -378,6 +419,9 @@ by_pid(Arg) ->
by_db(Arg) ->
name("couchdb_indexes_by_db", Arg).

openers(Arg) ->
name("couchdb_indexes_openers", Arg).

name(BaseName, Arg) when is_list(Arg) ->
name(BaseName, ?l2b(Arg));
name(BaseName, Arg) when is_binary(Arg) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ check_all_indexers_exit_on_ddoc_change({_Ctx, DbName}) ->
couch_db:name(DbShard),
{ddoc_updated, DDocID},
{st, "", couch_index_server:server_name(I), couch_index_server:by_sig(I),
couch_index_server:by_pid(I), couch_index_server:by_db(I)}
couch_index_server:by_pid(I), couch_index_server:by_db(I),
couch_index_server:openers(I)}
)
end,
seq()
Expand Down

0 comments on commit 0073e76

Please sign in to comment.