Skip to content

Commit

Permalink
fix(buffer_worker): avoid sending late reply messages to callers
Browse files Browse the repository at this point in the history
Fixes https://emqx.atlassian.net/browse/EMQX-9635

During a sync call from process `A` to a buffer worker `B`, its call
to the underlying resource `C` can be very slow.  In those cases, `A`
will receive a timeout response and expect no more messages from `B`
nor `C`.  However, prior to this fix, if `B` is stuck in a long sync
call to `C` and then gets its response after `A` timed out, `B` would
still send the late response to `A`, polluting its mailbox.
  • Loading branch information
thalesmg committed Apr 19, 2023
1 parent 8ccfbe9 commit cb995e2
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 6 deletions.
2 changes: 1 addition & 1 deletion apps/emqx_resource/src/emqx_resource.app.src
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_resource, [
{description, "Manager for all external resources"},
{vsn, "0.1.13"},
{vsn, "0.1.14"},
{registered, []},
{mod, {emqx_resource_app, []}},
{applications, [
Expand Down
17 changes: 13 additions & 4 deletions apps/emqx_resource/src/emqx_resource_buffer_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@

-export([queue_item_marshaller/1, estimate_size/1]).

-export([handle_async_reply/2, handle_async_batch_reply/2]).
-export([handle_async_reply/2, handle_async_batch_reply/2, reply_call/2]).

-export([clear_disk_queue_dir/2]).

Expand Down Expand Up @@ -293,10 +293,8 @@ code_change(_OldVsn, State, _Extra) ->

pick_call(Id, Key, Query, Timeout) ->
?PICK(Id, Key, Pid, begin
Caller = self(),
MRef = erlang:monitor(process, Pid, [{alias, reply_demonitor}]),
From = {Caller, MRef},
ReplyTo = {fun gen_statem:reply/2, [From]},
ReplyTo = {fun ?MODULE:reply_call/2, [MRef]},
erlang:send(Pid, ?SEND_REQ(ReplyTo, Query)),
receive
{MRef, Response} ->
Expand Down Expand Up @@ -1703,6 +1701,17 @@ default_resume_interval(_RequestTimeout = infinity, HealthCheckInterval) ->
default_resume_interval(RequestTimeout, HealthCheckInterval) ->
max(1, min(HealthCheckInterval, RequestTimeout div 3)).

-spec reply_call(reference(), term()) -> ok.
reply_call(Alias, Response) ->
%% Since we use a reference created with `{alias,
%% reply_demonitor}', after we `demonitor' it in case of a
%% timeout, we won't send any more messages that the caller is not
%% expecting anymore. Using `gen_statem:reply({pid(),
%% reference()}, _)' would still send a late reply even after the
%% demonitor.
erlang:send(Alias, {Alias, Response}),
ok.

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
adjust_batch_time_test_() ->
Expand Down
6 changes: 5 additions & 1 deletion apps/emqx_resource/test/emqx_connector_demo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,11 @@ on_query(_InstId, {sleep_before_reply, For}, #{pid := Pid}) ->
Result
after 1000 ->
{error, timeout}
end.
end;
on_query(_InstId, {sync_sleep_before_reply, SleepFor}, _State) ->
%% This simulates a slow sync call
timer:sleep(SleepFor),
{ok, slept}.

on_query_async(_InstId, block, ReplyFun, #{pid := Pid}) ->
Pid ! {block, ReplyFun},
Expand Down
45 changes: 45 additions & 0 deletions apps/emqx_resource/test/emqx_resource_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2751,6 +2751,51 @@ t_volatile_offload_mode(_Config) ->
end
).

t_late_call_reply(_Config) ->
emqx_connector_demo:set_callback_mode(always_sync),
RequestTimeout = 500,
?assertMatch(
{ok, _},
emqx_resource:create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
buffer_mode => memory_only,
request_timeout => RequestTimeout,
query_mode => sync
}
)
),
?check_trace(
begin
%% Sleep for longer than the request timeout; the call reply will
%% have been already returned (a timeout), but the resource will
%% still send a message with the reply.
%% The demo connector will reply with `{error, timeout}' after 1 s.
SleepFor = RequestTimeout + 500,
?assertMatch(
{error, {resource_error, #{reason := timeout}}},
emqx_resource:query(
?ID,
{sync_sleep_before_reply, SleepFor},
#{timeout => RequestTimeout}
)
),
%% Our process shouldn't receive any late messages.
receive
LateReply ->
ct:fail("received late reply: ~p", [LateReply])
after SleepFor ->
ok
end,
ok
end,
[]
),
ok.

%%------------------------------------------------------------------------------
%% Helpers
%%------------------------------------------------------------------------------
Expand Down
9 changes: 9 additions & 0 deletions changes/ce/fix-10455.en.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Fixed an issue that could cause (otherwise harmless) noise in the logs.

During some particularly slow synchronous calls to bridges, some late replies could be sent to connections processes that were no longer expecting a reply, and then emit an error log like:

```
2023-04-19T18:24:35.350233+00:00 [error] msg: unexpected_info, mfa: emqx_channel:handle_info/2, line: 1278, peername: 172.22.0.1:36384, clientid: caribdis_bench_sub_1137967633_4788, info: {#Ref<0.408802983.1941504010.189402>,{ok,200,[{<<"cache-control">>,<<"max-age=0, ...">>}}
```

Those logs are harmless, but they could flood and worry the users without need.

0 comments on commit cb995e2

Please sign in to comment.