Skip to content

Commit

Permalink
Prevent delayed opener error from crashing index servers
Browse files Browse the repository at this point in the history
Previously, an index and opener process dying could have resulted in the index
gen_server crashing. This was observed in the CI test as described in:
#4809

The process in more detail was as follows:

  * When an async opener result is handled in the index server, there is a
    period of time when the index server is linked to both the index and the
    opener process.

  * After we reply early to the waiting clients, a client may do something to
    cause the indexer to crash, which would crash the opener along with it.
    That would generate two `{'EXIT', Pid, _}` messages queued in the indexer
    process' mailbox.

  * The index gen_server, is still processing the async opener result callback,
    where it would remove the openener from the `openers` table, then it
    returns `ok` to the async opener.

  * Index gen_server continues processing queued `EXIT` messages in `handle_info`:
     - The one for the indexer Pid is handled appropriately
     - The one for the opener leads to an eexit(...)` clause since we ended
       with an unknown process exiting.

To avoid the race condition, and the extra opener `EXIT` message, unlink and
reply early to the opener, as soon we linked to the indexer or had received the
error. To avoid the small chance of still getting an `EXIT` message from the
opener, in case it crashed right before we unlinked, flush any exit messages
from it. We do a similar flushing in two other places so create small utility
function to avoid duplicating the code too much.

Fix: #4809
  • Loading branch information
nickva committed Oct 17, 2023
1 parent 6c2e503 commit 2310555
Showing 1 changed file with 28 additions and 20 deletions.
48 changes: 28 additions & 20 deletions src/couch_index/src/couch_index_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -167,19 +167,29 @@ handle_call({get_index, {_Mod, _IdxState, DbName, Sig} = Args}, From, State) ->
[{_, Pid}] when is_pid(Pid) ->
{reply, {ok, Pid}, State}
end;
handle_call({async_open, {DbName, DDocId, Sig}, {ok, Pid}}, {OpenerPid, _}, State) ->
[{_, Waiters}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
[gen_server:reply(From, {ok, Pid}) || From <- Waiters],
handle_call({async_open, {DbName, DDocId, Sig}, {ok, Pid}}, {OpenerPid, _} = FromOpener, State) ->
link(Pid),
% Once linked with the indexer, dismiss and ignore the opener.
unlink(OpenerPid),
ets:delete(State#st.openers, OpenerPid),
add_to_ets(DbName, Sig, DDocId, Pid, State),
{reply, ok, State};
handle_call({async_error, {DbName, _DDocId, Sig}, Error}, {OpenerPid, _}, State) ->
gen_server:reply(FromOpener, ok),
[{_, Waiters}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
[gen_server:reply(From, Error) || From <- Waiters],
add_to_ets(DbName, Sig, DDocId, Pid, State),
[gen_server:reply(From, {ok, Pid}) || From <- Waiters],
% Flush opener exit messages in case it died before we unlinked it
ok = flush_exit_messages_from(OpenerPid),
{noreply, State};
handle_call({async_error, {DbName, _DDocId, Sig}, Error}, {OpenerPid, _} = FromOpener, State) ->
% Once opener reported the error, we can dismiss the opener
unlink(OpenerPid),
ets:delete(State#st.openers, OpenerPid),
gen_server:reply(FromOpener, ok),
[{_, Waiters}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
ets:delete(State#st.by_sig, {DbName, Sig}),
{reply, ok, State};
[gen_server:reply(From, Error) || From <- Waiters],
% Flush opener exit messages in case it died before we unlinked it
ok = flush_exit_messages_from(OpenerPid),
{noreply, State};
handle_call({reset_indexes, DbName}, _From, State) ->
reset_indexes(DbName, State),
{reply, ok, State}.
Expand Down Expand Up @@ -283,12 +293,7 @@ reset_indexes(DbName, #st{} = State) ->
[{_, Pid}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
unlink(Pid),
gen_server:cast(Pid, delete),
receive
{'EXIT', Pid, _} ->
ok
after 0 ->
ok
end,
ok = flush_exit_messages_from(Pid),
rem_from_ets(DbName, Sig, DDocIds, Pid, State)
end,
lists:foreach(Fun, dict:to_list(SigDDocIds)),
Expand Down Expand Up @@ -327,12 +332,7 @@ rem_from_ets(DbName, #st{} = State) ->
Fun = fun({Sig, DDocIds}) ->
[{_, Pid}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
unlink(Pid),
receive
{'EXIT', Pid, _} ->
ok
after 0 ->
ok
end,
ok = flush_exit_messages_from(Pid),
rem_from_ets(DbName, Sig, DDocIds, Pid, State)
end,
lists:foreach(Fun, dict:to_list(SigDDocIds)).
Expand Down Expand Up @@ -442,3 +442,11 @@ aggregate_queue_len() ->
|| Name <- Names
],
lists:sum([X || {_, X} <- MQs]).

flush_exit_messages_from(Pid) ->
receive
{'EXIT', Pid, _} ->
ok
after 0 ->
ok
end.

0 comments on commit 2310555

Please sign in to comment.