Skip to content

Commit

Permalink
Merge pull request #12506 from keynslug/fix/EMQX-11830/recoverable
Browse files Browse the repository at this point in the history
fix(s3-bridge): handle recoverable AWS errors
  • Loading branch information
keynslug committed Feb 13, 2024
2 parents edd28be + a4eac75 commit 6fbb6f6
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 5 deletions.
4 changes: 3 additions & 1 deletion apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,10 @@ run_simple_upload(

map_error({socket_error, _} = Reason) ->
{recoverable_error, Reason};
map_error(Reason = {aws_error, Status, _, _Body}) when Status >= 500 ->
%% https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList
{recoverable_error, Reason};
map_error(Reason) ->
%% TODO: Recoverable errors.
{unrecoverable_error, Reason}.

render_bucket(Template, Data) ->
Expand Down
39 changes: 35 additions & 4 deletions apps/emqx_bridge_s3/test/emqx_bridge_s3_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/test_macros.hrl").

-import(emqx_utils_conv, [bin/1]).

Expand Down Expand Up @@ -89,8 +90,8 @@ connector_config(Name, _Config) ->
<<"headers">> => #{
<<"content-type">> => <<?CONTENT_TYPE>>
},
<<"connect_timeout">> => 1000,
<<"request_timeout">> => 1000,
<<"connect_timeout">> => <<"500ms">>,
<<"request_timeout">> => <<"1s">>,
<<"pool_size">> => 4,
<<"max_retries">> => 0,
<<"enable_pipelining">> => 1
Expand All @@ -110,13 +111,13 @@ action_config(Name, ConnectorId) ->
<<"resource_opts">> => #{
<<"buffer_mode">> => <<"memory_only">>,
<<"buffer_seg_bytes">> => <<"10MB">>,
<<"health_check_interval">> => <<"5s">>,
<<"health_check_interval">> => <<"3s">>,
<<"inflight_window">> => 40,
<<"max_buffer_bytes">> => <<"256MB">>,
<<"metrics_flush_interval">> => <<"1s">>,
<<"query_mode">> => <<"sync">>,
<<"request_ttl">> => <<"60s">>,
<<"resume_interval">> => <<"5s">>,
<<"resume_interval">> => <<"3s">>,
<<"worker_pool_size">> => <<"4">>
}
}).
Expand Down Expand Up @@ -165,6 +166,36 @@ t_sync_query(Config) ->
maps:from_list(erlcloud_s3:get_object(Bucket, Topic, AwsConfig))
).

t_query_retry_recoverable(Config) ->
ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config),
BridgeName = ?config(bridge_name, Config),
Bucket = emqx_s3_test_helpers:unique_bucket(),
Topic = "d/e/f",
Payload = rand:bytes(1024),
AwsConfig = emqx_s3_test_helpers:aws_config(tcp),
ok = erlcloud_s3:create_bucket(Bucket, AwsConfig),
%% Create a bridge with the sample configuration.
?assertMatch(
{ok, _Bridge},
emqx_bridge_v2_testlib:create_bridge(Config)
),
%% Simulate recoverable failure.
_ = emqx_common_test_helpers:enable_failure(timeout, ?PROXY_NAME, ProxyHost, ProxyPort),
_ = timer:apply_after(
_Timeout = 5000,
emqx_common_test_helpers,
heal_failure,
[timeout, ?PROXY_NAME, ProxyHost, ProxyPort]
),
Message = mk_message(Bucket, Topic, Payload),
%% Verify that the message is sent eventually.
ok = emqx_bridge_v2:send_message(?BRIDGE_TYPE, BridgeName, Message, #{}),
?assertMatch(
#{content := Payload},
maps:from_list(erlcloud_s3:get_object(Bucket, Topic, AwsConfig))
).

mk_message(ClientId, Topic, Payload) ->
Message = emqx_message:make(bin(ClientId), bin(Topic), Payload),
{Event, _} = emqx_rule_events:eventmsg_publish(Message),
Expand Down

0 comments on commit 6fbb6f6

Please sign in to comment.