Skip to content

Commit

Permalink
fix(buffer_worker): avoid sending late reply messages to callers
Browse files Browse the repository at this point in the history
Fixes https://emqx.atlassian.net/browse/EMQX-9635

During a sync call from process `A` to a buffer worker `B`, its call
to the underlying resource `C` can be very slow.  In those cases, `A`
will receive a timeout response and expect no more messages from `B`
nor `C`.  However, prior to this fix, if `B` is stuck in a long sync
call to `C` and then gets its response after `A` timed out, `B` would
still send the late response to `A`, polluting its mailbox.
  • Loading branch information
thalesmg committed Apr 26, 2023
1 parent 9da5331 commit c53741a
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 5 deletions.
17 changes: 13 additions & 4 deletions apps/emqx_resource/src/emqx_resource_buffer_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@

-export([queue_item_marshaller/1, estimate_size/1]).

-export([handle_async_reply/2, handle_async_batch_reply/2]).
-export([handle_async_reply/2, handle_async_batch_reply/2, reply_call/2]).

-export([clear_disk_queue_dir/2]).

Expand Down Expand Up @@ -293,10 +293,8 @@ code_change(_OldVsn, State, _Extra) ->

pick_call(Id, Key, Query, Timeout) ->
?PICK(Id, Key, Pid, begin
Caller = self(),
MRef = erlang:monitor(process, Pid, [{alias, reply_demonitor}]),
From = {Caller, MRef},
ReplyTo = {fun gen_statem:reply/2, [From]},
ReplyTo = {fun ?MODULE:reply_call/2, [MRef]},
erlang:send(Pid, ?SEND_REQ(ReplyTo, Query)),
receive
{MRef, Response} ->
Expand Down Expand Up @@ -1703,6 +1701,17 @@ default_resume_interval(_RequestTimeout = infinity, HealthCheckInterval) ->
default_resume_interval(RequestTimeout, HealthCheckInterval) ->
max(1, min(HealthCheckInterval, RequestTimeout div 3)).

-spec reply_call(reference(), term()) -> ok.
reply_call(Alias, Response) ->
%% Since we use a reference created with `{alias,
%% reply_demonitor}', after we `demonitor' it in case of a
%% timeout, we won't send any more messages that the caller is not
%% expecting anymore. Using `gen_statem:reply({pid(),
%% reference()}, _)' would still send a late reply even after the
%% demonitor.
erlang:send(Alias, {Alias, Response}),
ok.

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
adjust_batch_time_test_() ->
Expand Down
6 changes: 5 additions & 1 deletion apps/emqx_resource/test/emqx_connector_demo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,11 @@ on_query(_InstId, {sleep_before_reply, For}, #{pid := Pid}) ->
Result
after 1000 ->
{error, timeout}
end.
end;
on_query(_InstId, {sync_sleep_before_reply, SleepFor}, _State) ->
%% This simulates a slow sync call
timer:sleep(SleepFor),
{ok, slept}.

on_query_async(_InstId, block, ReplyFun, #{pid := Pid}) ->
Pid ! {block, ReplyFun},
Expand Down
45 changes: 45 additions & 0 deletions apps/emqx_resource/test/emqx_resource_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2751,6 +2751,51 @@ t_volatile_offload_mode(_Config) ->
end
).

t_late_call_reply(_Config) ->
emqx_connector_demo:set_callback_mode(always_sync),
RequestTimeout = 500,
?assertMatch(
{ok, _},
emqx_resource:create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
buffer_mode => memory_only,
request_timeout => RequestTimeout,
query_mode => sync
}
)
),
?check_trace(
begin
%% Sleep for longer than the request timeout; the call reply will
%% have been already returned (a timeout), but the resource will
%% still send a message with the reply.
%% The demo connector will reply with `{error, timeout}' after 1 s.
SleepFor = RequestTimeout + 500,
?assertMatch(
{error, {resource_error, #{reason := timeout}}},
emqx_resource:query(
?ID,
{sync_sleep_before_reply, SleepFor},
#{timeout => RequestTimeout}
)
),
%% Our process shouldn't receive any late messages.
receive
LateReply ->
ct:fail("received late reply: ~p", [LateReply])
after SleepFor ->
ok
end,
ok
end,
[]
),
ok.

%%------------------------------------------------------------------------------
%% Helpers
%%------------------------------------------------------------------------------
Expand Down
9 changes: 9 additions & 0 deletions changes/ce/fix-10455.en.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Fixed an issue that could cause (otherwise harmless) noise in the logs.

During some particularly slow synchronous calls to bridges, some late replies could be sent to connections processes that were no longer expecting a reply, and then emit an error log like:

```
2023-04-19T18:24:35.350233+00:00 [error] msg: unexpected_info, mfa: emqx_channel:handle_info/2, line: 1278, peername: 172.22.0.1:36384, clientid: caribdis_bench_sub_1137967633_4788, info: {#Ref<0.408802983.1941504010.189402>,{ok,200,[{<<"cache-control">>,<<"max-age=0, ...">>}}
```

Those logs are harmless, but they could flood and worry the users without need.

0 comments on commit c53741a

Please sign in to comment.