Skip to content

Commit

Permalink
Avoid mem3_rpc:rexi_call selective receive
Browse files Browse the repository at this point in the history
We need to potentially extract the usage delta from the incoming RPC
message. Rather than pattern match on all possible message formats that
could potentially include usage deltas, we instead utilize
rexi_util:extract_delta which matches against tuples ending in
`{delta, Delta}`, and separates that out from the underlying message.

The subtlety here is that receiving the message to extract the delta
changes the behavior as this was previously doing a selective receive
keyed off of the Ref, and then ignoring any other messages that arrived.
I don't know if the selective receive was intended, but I don't think
it's appropriate to leave unexpected messages floating around,
especially given things like issue #4909.

Instead of utilizing a selective receive, this switches to extracting
the message and delta like we need to do, and then in the event it finds
unexpected messages they're logged and skipped.

This selective receive was masking the lack of unlink on the linked
rexi_mon pid in fix #4906. I've also noticed some rpc responses arriving
late as well, but I haven't tracked that down, so let's log when it does
happen.
  • Loading branch information
chewbranca committed Feb 12, 2024
1 parent 90514a1 commit 88d2d6e
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 10 deletions.
1 change: 1 addition & 0 deletions src/couch/include/couch_db.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@
meta = []
}).

-define(LOG_UNEXPECTED_MSG(Msg), couch_log:warning("[~p:~p:~p/~p]{~p[~p]} Unexpected message: ~w", [?MODULE, ?LINE, ?FUNCTION_NAME, ?FUNCTION_ARITY, self(), element(2, process_info(self(), message_queue_len)), Msg])).

-record(user_ctx, {
name=null,
Expand Down
34 changes: 24 additions & 10 deletions src/mem3/src/mem3_rpc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -378,20 +378,34 @@ rexi_call(Node, MFA, Timeout) ->
Mon = rexi_monitor:start([rexi_utils:server_pid(Node)]),
Ref = rexi:cast(Node, self(), MFA, [sync]),
try
receive
{Ref, {ok, Reply}} ->
Reply;
{Ref, Error} ->
erlang:error(Error);
{rexi_DOWN, Mon, _, Reason} ->
erlang:error({rexi_DOWN, {Node, Reason}})
after Timeout ->
erlang:error(timeout)
end
wait_message(Node, Ref, Mon, Timeout)
after
rexi_monitor:stop(Mon)
end.

wait_message(Node, Ref, Mon, Timeout) ->
receive
Msg ->
process_raw_message(Msg, Node, Ref, Mon, Timeout)
after Timeout ->
erlang:error(timeout)
end.

process_raw_message(Msg0, Node, Ref, Mon, Timeout) ->
{Msg, Delta} = rexi_utils:extract_delta(Msg0),
couch_stats_resource_tracker:accumulate_delta(Delta),
case Msg of
{Ref, {ok, Reply}} ->
Reply;
{Ref, Error} ->
erlang:error(Error);
{rexi_DOWN, Mon, _, Reason} ->
erlang:error({rexi_DOWN, {Node, Reason}});
Other ->
?LOG_UNEXPECTED_MSG(Other),
wait_message(Node, Ref, Mon, Timeout)
end.

get_or_create_db(DbName, Options) ->
mem3_util:get_or_create_db_int(DbName, Options).

Expand Down

0 comments on commit 88d2d6e

Please sign in to comment.