Skip to content

Commit

Permalink
Merge pull request #10690 from thalesmg/perf-webhook-retry-async-repl…
Browse files Browse the repository at this point in the history
…y-v50

perf(webhook): add async retries and evaluate reply callback in fresh process
  • Loading branch information
thalesmg committed May 17, 2023
2 parents 397dce0 + bc7d0d5 commit b2afe4e
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 16 deletions.
110 changes: 106 additions & 4 deletions apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
-compile(export_all).

-import(emqx_mgmt_api_test_util, [request/3, uri/1]).
-import(emqx_common_test_helpers, [on_exit/1]).

-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
Expand Down Expand Up @@ -52,6 +53,13 @@ end_per_suite(_Config) ->
suite() ->
[{timetrap, {seconds, 60}}].

init_per_testcase(_TestCase, Config) ->
Config.

end_per_testcase(_TestCase, _Config) ->
emqx_common_test_helpers:call_janitor(),
ok.

%%------------------------------------------------------------------------------
%% HTTP server for testing
%% (Orginally copied from emqx_bridge_api_SUITE)
Expand Down Expand Up @@ -158,7 +166,8 @@ bridge_async_config(#{port := Port} = Config) ->
QueryMode = maps:get(query_mode, Config, "async"),
ConnectTimeout = maps:get(connect_timeout, Config, 1),
RequestTimeout = maps:get(request_timeout, Config, 10000),
ResourceRequestTimeout = maps:get(resouce_request_timeout, Config, "infinity"),
ResumeInterval = maps:get(resume_interval, Config, "1s"),
ResourceRequestTimeout = maps:get(resource_request_timeout, Config, "infinity"),
ConfigString = io_lib:format(
"bridges.~s.~s {\n"
" url = \"http://localhost:~p\"\n"
Expand All @@ -177,7 +186,8 @@ bridge_async_config(#{port := Port} = Config) ->
" health_check_interval = \"15s\"\n"
" max_buffer_bytes = \"1GB\"\n"
" query_mode = \"~s\"\n"
" request_timeout = \"~s\"\n"
" request_timeout = \"~p\"\n"
" resume_interval = \"~s\"\n"
" start_after_created = \"true\"\n"
" start_timeout = \"5s\"\n"
" worker_pool_size = \"1\"\n"
Expand All @@ -194,7 +204,8 @@ bridge_async_config(#{port := Port} = Config) ->
PoolSize,
RequestTimeout,
QueryMode,
ResourceRequestTimeout
ResourceRequestTimeout,
ResumeInterval
]
),
ct:pal(ConfigString),
Expand Down Expand Up @@ -236,7 +247,7 @@ t_send_async_connection_timeout(_Config) ->
query_mode => "async",
connect_timeout => ResponseDelayMS * 2,
request_timeout => 10000,
resouce_request_timeout => "infinity"
resource_request_timeout => "infinity"
}),
NumberOfMessagesToSend = 10,
[
Expand All @@ -250,6 +261,97 @@ t_send_async_connection_timeout(_Config) ->
stop_http_server(Server),
ok.

t_async_free_retries(_Config) ->
#{port := Port} = start_http_server(#{response_delay_ms => 0}),
BridgeID = make_bridge(#{
port => Port,
pool_size => 1,
query_mode => "sync",
connect_timeout => 1_000,
request_timeout => 10_000,
resource_request_timeout => "10000s"
}),
%% Fail 5 times then succeed.
Context = #{error_attempts => 5},
ExpectedAttempts = 6,
Fn = fun(Get, Error) ->
?assertMatch(
{ok, 200, _, _},
emqx_bridge:send_message(BridgeID, #{<<"hello">> => <<"world">>}),
#{error => Error}
),
?assertEqual(ExpectedAttempts, Get(), #{error => Error})
end,
do_t_async_retries(Context, {error, normal}, Fn),
do_t_async_retries(Context, {error, {shutdown, normal}}, Fn),
ok.

t_async_common_retries(_Config) ->
#{port := Port} = start_http_server(#{response_delay_ms => 0}),
BridgeID = make_bridge(#{
port => Port,
pool_size => 1,
query_mode => "sync",
resume_interval => "100ms",
connect_timeout => 1_000,
request_timeout => 10_000,
resource_request_timeout => "10000s"
}),
%% Keeps failing until connector gives up.
Context = #{error_attempts => infinity},
ExpectedAttempts = 3,
FnSucceed = fun(Get, Error) ->
?assertMatch(
{ok, 200, _, _},
emqx_bridge:send_message(BridgeID, #{<<"hello">> => <<"world">>}),
#{error => Error, attempts => Get()}
),
?assertEqual(ExpectedAttempts, Get(), #{error => Error})
end,
FnFail = fun(Get, Error) ->
?assertMatch(
Error,
emqx_bridge:send_message(BridgeID, #{<<"hello">> => <<"world">>}),
#{error => Error, attempts => Get()}
),
?assertEqual(ExpectedAttempts, Get(), #{error => Error})
end,
%% These two succeed because they're further retried by the buffer
%% worker synchronously, and we're not mock that call.
do_t_async_retries(Context, {error, {closed, "The connection was lost."}}, FnSucceed),
do_t_async_retries(Context, {error, {shutdown, closed}}, FnSucceed),
%% This fails because this error is treated as unrecoverable.
do_t_async_retries(Context, {error, something_else}, FnFail),
ok.

do_t_async_retries(TestContext, Error, Fn) ->
#{error_attempts := ErrorAttempts} = TestContext,
persistent_term:put({?MODULE, ?FUNCTION_NAME, attempts}, 0),
on_exit(fun() -> persistent_term:erase({?MODULE, ?FUNCTION_NAME, attempts}) end),
Get = fun() -> persistent_term:get({?MODULE, ?FUNCTION_NAME, attempts}) end,
GetAndBump = fun() ->
Attempts = persistent_term:get({?MODULE, ?FUNCTION_NAME, attempts}),
persistent_term:put({?MODULE, ?FUNCTION_NAME, attempts}, Attempts + 1),
Attempts + 1
end,
emqx_common_test_helpers:with_mock(
emqx_connector_http,
reply_delegator,
fun(Context, ReplyFunAndArgs, Result) ->
Attempts = GetAndBump(),
case Attempts > ErrorAttempts of
true ->
ct:pal("succeeding ~p : ~p", [Error, Attempts]),
meck:passthrough([Context, ReplyFunAndArgs, Result]);
false ->
ct:pal("failing ~p : ~p", [Error, Attempts]),
meck:passthrough([Context, ReplyFunAndArgs, Error])
end
end,
fun() -> Fn(Get, Error) end
),
ok.

receive_request_notifications(MessageIDs, _ResponseDelay) when map_size(MessageIDs) =:= 0 ->
ok;
receive_request_notifications(MessageIDs, ResponseDelay) ->
Expand Down
68 changes: 57 additions & 11 deletions apps/emqx_connector/src/emqx_connector_http.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
on_query/3,
on_query_async/4,
on_get_status/2,
reply_delegator/2
reply_delegator/3
]).

-export([
Expand Down Expand Up @@ -245,10 +245,11 @@ on_query(InstId, {send_message, Msg}, State) ->
request_timeout := Timeout
} = process_request(Request, Msg),
%% bridge buffer worker has retry, do not let ehttpc retry
Retry = 0,
Retry = 2,
ClientId = maps:get(clientid, Msg, undefined),
on_query(
InstId,
{undefined, Method, {Path, Headers, Body}, Timeout, Retry},
{ClientId, Method, {Path, Headers, Body}, Timeout, Retry},
State
)
end;
Expand Down Expand Up @@ -348,9 +349,10 @@ on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) ->
headers := Headers,
request_timeout := Timeout
} = process_request(Request, Msg),
ClientId = maps:get(clientid, Msg, undefined),
on_query_async(
InstId,
{undefined, Method, {Path, Headers, Body}, Timeout},
{ClientId, Method, {Path, Headers, Body}, Timeout},
ReplyFunAndArgs,
State
)
Expand All @@ -372,12 +374,22 @@ on_query_async(
}
),
NRequest = formalize_request(Method, BasePath, Request),
MaxAttempts = maps:get(max_attempts, State, 3),
Context = #{
attempt => 1,
max_attempts => MaxAttempts,
state => State,
key_or_num => KeyOrNum,
method => Method,
request => NRequest,
timeout => Timeout
},
ok = ehttpc:request_async(
Worker,
Method,
NRequest,
Timeout,
{fun ?MODULE:reply_delegator/2, [ReplyFunAndArgs]}
{fun ?MODULE:reply_delegator/3, [Context, ReplyFunAndArgs]}
),
{ok, Worker}.

Expand Down Expand Up @@ -598,7 +610,10 @@ to_bin(Str) when is_list(Str) ->
to_bin(Atom) when is_atom(Atom) ->
atom_to_binary(Atom, utf8).

reply_delegator(ReplyFunAndArgs, Result) ->
reply_delegator(Context, ReplyFunAndArgs, Result) ->
spawn(fun() -> maybe_retry(Result, Context, ReplyFunAndArgs) end).

transform_result(Result) ->
case Result of
%% The normal reason happens when the HTTP connection times out before
%% the request has been fully processed
Expand All @@ -609,16 +624,47 @@ reply_delegator(ReplyFunAndArgs, Result) ->
Reason =:= {shutdown, normal};
Reason =:= {shutdown, closed}
->
Result1 = {error, {recoverable_error, Reason}},
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1);
{error, {recoverable_error, Reason}};
{error, {closed, _Message} = Reason} ->
%% _Message = "The connection was lost."
Result1 = {error, {recoverable_error, Reason}},
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1);
{error, {recoverable_error, Reason}};
_ ->
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result)
Result
end.

maybe_retry(Result0, _Context = #{attempt := N, max_attempts := Max}, ReplyFunAndArgs) when
N >= Max
->
Result = transform_result(Result0),
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result);
maybe_retry({error, Reason}, Context, ReplyFunAndArgs) ->
#{
state := State,
attempt := Attempt,
key_or_num := KeyOrNum,
method := Method,
request := Request,
timeout := Timeout
} = Context,
%% TODO: reset the expiration time for free retries?
IsFreeRetry = Reason =:= normal orelse Reason =:= {shutdown, normal},
NContext =
case IsFreeRetry of
true -> Context;
false -> Context#{attempt := Attempt + 1}
end,
Worker = resolve_pool_worker(State, KeyOrNum),
ok = ehttpc:request_async(
Worker,
Method,
Request,
Timeout,
{fun ?MODULE:reply_delegator/3, [NContext, ReplyFunAndArgs]}
),
ok;
maybe_retry(Result, _Context, ReplyFunAndArgs) ->
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result).

%% The HOCON schema system may generate sensitive keys with this format
is_sensitive_key([{str, StringKey}]) ->
is_sensitive_key(StringKey);
Expand Down
3 changes: 3 additions & 0 deletions changes/ce/perf-10690.en.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Added a retry mechanism to webhook bridge that attempts to improve throughput.

This optimization retries request failures without blocking the buffering layer, which can improve throughput in situations of high messaging rate.
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ defmodule EMQXUmbrella.MixProject do
{:erlcloud, github: "emqx/erlcloud", tag: "3.5.16-emqx-1", override: true},
# erlcloud's rebar.config requires rebar3 and does not support Mix,
# so it tries to fetch deps from git. We need to override this.
{:lhttpc, tag: "1.6.2", override: true},
{:lhttpc, "1.6.2", override: true},
{:eini, "1.2.9", override: true},
{:base16, "1.0.0", override: true},
# end of erlcloud's deps
Expand Down

0 comments on commit b2afe4e

Please sign in to comment.