Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(webhook): add async retries and evaluate reply callback in fresh process #10690

Merged
merged 4 commits into from
May 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
110 changes: 106 additions & 4 deletions apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
-compile(export_all).

-import(emqx_mgmt_api_test_util, [request/3, uri/1]).
-import(emqx_common_test_helpers, [on_exit/1]).

-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
Expand Down Expand Up @@ -52,6 +53,13 @@ end_per_suite(_Config) ->
suite() ->
[{timetrap, {seconds, 60}}].

init_per_testcase(_TestCase, Config) ->
Config.

end_per_testcase(_TestCase, _Config) ->
emqx_common_test_helpers:call_janitor(),
ok.

%%------------------------------------------------------------------------------
%% HTTP server for testing
%% (Orginally copied from emqx_bridge_api_SUITE)
Expand Down Expand Up @@ -158,7 +166,8 @@ bridge_async_config(#{port := Port} = Config) ->
QueryMode = maps:get(query_mode, Config, "async"),
ConnectTimeout = maps:get(connect_timeout, Config, 1),
RequestTimeout = maps:get(request_timeout, Config, 10000),
ResourceRequestTimeout = maps:get(resouce_request_timeout, Config, "infinity"),
ResumeInterval = maps:get(resume_interval, Config, "1s"),
ResourceRequestTimeout = maps:get(resource_request_timeout, Config, "infinity"),
ConfigString = io_lib:format(
"bridges.~s.~s {\n"
" url = \"http://localhost:~p\"\n"
Expand All @@ -177,7 +186,8 @@ bridge_async_config(#{port := Port} = Config) ->
" health_check_interval = \"15s\"\n"
" max_buffer_bytes = \"1GB\"\n"
" query_mode = \"~s\"\n"
" request_timeout = \"~s\"\n"
" request_timeout = \"~p\"\n"
" resume_interval = \"~s\"\n"
" start_after_created = \"true\"\n"
" start_timeout = \"5s\"\n"
" worker_pool_size = \"1\"\n"
Expand All @@ -194,7 +204,8 @@ bridge_async_config(#{port := Port} = Config) ->
PoolSize,
RequestTimeout,
QueryMode,
ResourceRequestTimeout
ResourceRequestTimeout,
ResumeInterval
]
),
ct:pal(ConfigString),
Expand Down Expand Up @@ -236,7 +247,7 @@ t_send_async_connection_timeout(_Config) ->
query_mode => "async",
connect_timeout => ResponseDelayMS * 2,
request_timeout => 10000,
resouce_request_timeout => "infinity"
resource_request_timeout => "infinity"
}),
NumberOfMessagesToSend = 10,
[
Expand All @@ -250,6 +261,97 @@ t_send_async_connection_timeout(_Config) ->
stop_http_server(Server),
ok.

t_async_free_retries(_Config) ->
#{port := Port} = start_http_server(#{response_delay_ms => 0}),
BridgeID = make_bridge(#{
port => Port,
pool_size => 1,
query_mode => "sync",
connect_timeout => 1_000,
request_timeout => 10_000,
resource_request_timeout => "10000s"
}),
%% Fail 5 times then succeed.
Context = #{error_attempts => 5},
ExpectedAttempts = 6,
Fn = fun(Get, Error) ->
?assertMatch(
{ok, 200, _, _},
emqx_bridge:send_message(BridgeID, #{<<"hello">> => <<"world">>}),
#{error => Error}
),
?assertEqual(ExpectedAttempts, Get(), #{error => Error})
end,
do_t_async_retries(Context, {error, normal}, Fn),
do_t_async_retries(Context, {error, {shutdown, normal}}, Fn),
ok.

t_async_common_retries(_Config) ->
#{port := Port} = start_http_server(#{response_delay_ms => 0}),
BridgeID = make_bridge(#{
port => Port,
pool_size => 1,
query_mode => "sync",
resume_interval => "100ms",
connect_timeout => 1_000,
request_timeout => 10_000,
resource_request_timeout => "10000s"
}),
%% Keeps failing until connector gives up.
Context = #{error_attempts => infinity},
ExpectedAttempts = 3,
FnSucceed = fun(Get, Error) ->
?assertMatch(
{ok, 200, _, _},
emqx_bridge:send_message(BridgeID, #{<<"hello">> => <<"world">>}),
#{error => Error, attempts => Get()}
),
?assertEqual(ExpectedAttempts, Get(), #{error => Error})
end,
FnFail = fun(Get, Error) ->
?assertMatch(
Error,
emqx_bridge:send_message(BridgeID, #{<<"hello">> => <<"world">>}),
#{error => Error, attempts => Get()}
),
?assertEqual(ExpectedAttempts, Get(), #{error => Error})
end,
%% These two succeed because they're further retried by the buffer
%% worker synchronously, and we're not mock that call.
do_t_async_retries(Context, {error, {closed, "The connection was lost."}}, FnSucceed),
do_t_async_retries(Context, {error, {shutdown, closed}}, FnSucceed),
%% This fails because this error is treated as unrecoverable.
do_t_async_retries(Context, {error, something_else}, FnFail),
ok.

do_t_async_retries(TestContext, Error, Fn) ->
#{error_attempts := ErrorAttempts} = TestContext,
persistent_term:put({?MODULE, ?FUNCTION_NAME, attempts}, 0),
on_exit(fun() -> persistent_term:erase({?MODULE, ?FUNCTION_NAME, attempts}) end),
Get = fun() -> persistent_term:get({?MODULE, ?FUNCTION_NAME, attempts}) end,
GetAndBump = fun() ->
Attempts = persistent_term:get({?MODULE, ?FUNCTION_NAME, attempts}),
persistent_term:put({?MODULE, ?FUNCTION_NAME, attempts}, Attempts + 1),
Attempts + 1
end,
emqx_common_test_helpers:with_mock(
emqx_connector_http,
reply_delegator,
fun(Context, ReplyFunAndArgs, Result) ->
Attempts = GetAndBump(),
case Attempts > ErrorAttempts of
true ->
ct:pal("succeeding ~p : ~p", [Error, Attempts]),
meck:passthrough([Context, ReplyFunAndArgs, Result]);
false ->
ct:pal("failing ~p : ~p", [Error, Attempts]),
meck:passthrough([Context, ReplyFunAndArgs, Error])
end
end,
fun() -> Fn(Get, Error) end
),
ok.

receive_request_notifications(MessageIDs, _ResponseDelay) when map_size(MessageIDs) =:= 0 ->
ok;
receive_request_notifications(MessageIDs, ResponseDelay) ->
Expand Down
68 changes: 57 additions & 11 deletions apps/emqx_connector/src/emqx_connector_http.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
on_query/3,
on_query_async/4,
on_get_status/2,
reply_delegator/2
reply_delegator/3
]).

-type url() :: emqx_http_lib:uri_map().
Expand Down Expand Up @@ -267,10 +267,11 @@ on_query(InstId, {send_message, Msg}, State) ->
request_timeout := Timeout
} = process_request(Request, Msg),
%% bridge buffer worker has retry, do not let ehttpc retry
Retry = 0,
Retry = 2,
ClientId = maps:get(clientid, Msg, undefined),
on_query(
InstId,
{undefined, Method, {Path, Headers, Body}, Timeout, Retry},
{ClientId, Method, {Path, Headers, Body}, Timeout, Retry},
State
)
end;
Expand Down Expand Up @@ -370,9 +371,10 @@ on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) ->
headers := Headers,
request_timeout := Timeout
} = process_request(Request, Msg),
ClientId = maps:get(clientid, Msg, undefined),
on_query_async(
InstId,
{undefined, Method, {Path, Headers, Body}, Timeout},
{ClientId, Method, {Path, Headers, Body}, Timeout},
ReplyFunAndArgs,
State
)
Expand All @@ -394,12 +396,22 @@ on_query_async(
}
),
NRequest = formalize_request(Method, BasePath, Request),
MaxAttempts = maps:get(max_attempts, State, 3),
Context = #{
attempt => 1,
max_attempts => MaxAttempts,
state => State,
key_or_num => KeyOrNum,
method => Method,
request => NRequest,
timeout => Timeout
},
ok = ehttpc:request_async(
Worker,
Method,
NRequest,
Timeout,
{fun ?MODULE:reply_delegator/2, [ReplyFunAndArgs]}
{fun ?MODULE:reply_delegator/3, [Context, ReplyFunAndArgs]}
),
{ok, Worker}.

Expand Down Expand Up @@ -632,7 +644,10 @@ to_bin(Str) when is_list(Str) ->
to_bin(Atom) when is_atom(Atom) ->
atom_to_binary(Atom, utf8).

reply_delegator(ReplyFunAndArgs, Result) ->
reply_delegator(Context, ReplyFunAndArgs, Result) ->
spawn(fun() -> maybe_retry(Result, Context, ReplyFunAndArgs) end).

transform_result(Result) ->
case Result of
%% The normal reason happens when the HTTP connection times out before
%% the request has been fully processed
Expand All @@ -643,16 +658,47 @@ reply_delegator(ReplyFunAndArgs, Result) ->
Reason =:= {shutdown, normal};
Reason =:= {shutdown, closed}
->
Result1 = {error, {recoverable_error, Reason}},
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1);
{error, {recoverable_error, Reason}};
{error, {closed, _Message} = Reason} ->
%% _Message = "The connection was lost."
Result1 = {error, {recoverable_error, Reason}},
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1);
{error, {recoverable_error, Reason}};
_ ->
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result)
Result
end.

maybe_retry(Result0, _Context = #{attempt := N, max_attempts := Max}, ReplyFunAndArgs) when
N >= Max
->
Result = transform_result(Result0),
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result);
maybe_retry({error, Reason}, Context, ReplyFunAndArgs) ->
#{
state := State,
attempt := Attempt,
key_or_num := KeyOrNum,
method := Method,
request := Request,
timeout := Timeout
} = Context,
%% TODO: reset the expiration time for free retries?
IsFreeRetry = Reason =:= normal orelse Reason =:= {shutdown, normal},
NContext =
case IsFreeRetry of
true -> Context;
false -> Context#{attempt := Attempt + 1}
end,
Worker = resolve_pool_worker(State, KeyOrNum),
ok = ehttpc:request_async(
Worker,
Method,
Request,
Timeout,
{fun ?MODULE:reply_delegator/3, [NContext, ReplyFunAndArgs]}
),
ok;
maybe_retry(Result, _Context, ReplyFunAndArgs) ->
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result).

%% The HOCON schema system may generate sensitive keys with this format
is_sensitive_key([{str, StringKey}]) ->
is_sensitive_key(StringKey);
Expand Down
3 changes: 3 additions & 0 deletions changes/ce/perf-10690.en.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Added a retry mechanism to webhook bridge that attempts to improve throughput.

This optimization retries request failures without blocking the buffering layer, which can improve throughput in situations of high messaging rate.
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ defmodule EMQXUmbrella.MixProject do
{:erlcloud, github: "emqx/erlcloud", tag: "3.5.16-emqx-1", override: true},
# erlcloud's rebar.config requires rebar3 and does not support Mix,
# so it tries to fetch deps from git. We need to override this.
{:lhttpc, tag: "1.6.2", override: true},
{:lhttpc, "1.6.2", override: true},
{:eini, "1.2.9", override: true},
{:base16, "1.0.0", override: true},
# end of erlcloud's deps
Expand Down