Skip to content

Commit

Permalink
Merge pull request #10910 from thalesmg/unify-restart-interval-v50
Browse files Browse the repository at this point in the history
feat(resource): deprecate `auto_restart_interval` in favor of `health_check_interval`
  • Loading branch information
thalesmg committed Jun 2, 2023
2 parents 90d862c + 940353c commit 33aa879
Show file tree
Hide file tree
Showing 49 changed files with 101 additions and 245 deletions.
4 changes: 2 additions & 2 deletions apps/emqx_bridge/src/emqx_bridge.erl
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,9 @@ send_message(BridgeType, BridgeName, ResId, Message) ->
end.

query_opts(Config) ->
case emqx_utils_maps:deep_get([resource_opts, request_timeout], Config, false) of
case emqx_utils_maps:deep_get([resource_opts, request_ttl], Config, false) of
Timeout when is_integer(Timeout) orelse Timeout =:= infinity ->
%% request_timeout is configured
%% request_ttl is configured
#{timeout => Timeout};
_ ->
%% emqx_resource has a default value (15s)
Expand Down
2 changes: 0 additions & 2 deletions apps/emqx_bridge/src/emqx_bridge_api.erl
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ info_example_basic(webhook) ->
resource_opts => #{
worker_pool_size => 1,
health_check_interval => 15000,
auto_restart_interval => 15000,
query_mode => async,
inflight_window => 100,
max_buffer_bytes => 100 * 1024 * 1024
Expand All @@ -244,7 +243,6 @@ mqtt_main_example() ->
max_inflight => 100,
resource_opts => #{
health_check_interval => <<"15s">>,
auto_restart_interval => <<"60s">>,
query_mode => sync,
max_buffer_bytes => 100 * 1024 * 1024
},
Expand Down
6 changes: 3 additions & 3 deletions apps/emqx_bridge/src/emqx_bridge_resource.erl
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,8 @@ parse_confs(
Reason1 = emqx_utils:readable_error_msg(Reason),
invalid_data(<<"Invalid URL: ", Url1/binary, ", details: ", Reason1/binary>>)
end,
RequestTimeout = emqx_utils_maps:deep_get(
[resource_opts, request_timeout],
RequestTTL = emqx_utils_maps:deep_get(
[resource_opts, request_ttl],
Conf
),
Conf#{
Expand All @@ -339,7 +339,7 @@ parse_confs(
method => Method,
body => maps:get(body, Conf, undefined),
headers => Headers,
request_timeout => RequestTimeout,
request_ttl => RequestTTL,
max_retries => Retry
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ default_ssl() ->
default_resource_opts() ->
#{
<<"inflight_window">> => 100,
<<"auto_restart_interval">> => <<"60s">>,
<<"health_check_interval">> => <<"15s">>,
<<"max_buffer_bytes">> => <<"1GB">>,
<<"query_mode">> => <<"sync">>,
Expand Down
98 changes: 7 additions & 91 deletions apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ groups() ->
SingleOnlyTests = [
t_broken_bpapi_vsn,
t_old_bpapi_vsn,
t_bridges_probe,
t_auto_restart_interval
t_bridges_probe
],
ClusterLaterJoinOnlyTCs = [t_cluster_later_join_metrics],
[
Expand Down Expand Up @@ -559,89 +558,6 @@ t_http_crud_apis(Config) ->

{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config).

t_auto_restart_interval(Config) ->
Port = ?config(port, Config),
%% assert we there's no bridges at first
{ok, 200, []} = request_json(get, uri(["bridges"]), Config),

meck:new(emqx_resource, [passthrough]),
meck:expect(emqx_resource, call_start, fun(_, _, _) -> {error, fake_error} end),

%% then we add a webhook bridge, using POST
%% POST /bridges/ will create a bridge
URL1 = ?URL(Port, "path1"),
Name = ?BRIDGE_NAME,
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
BridgeParams = ?HTTP_BRIDGE(URL1, Name)#{
<<"resource_opts">> => #{<<"auto_restart_interval">> => "1s"}
},
?check_trace(
begin
?assertMatch(
{ok, 201, #{
<<"type">> := ?BRIDGE_TYPE_HTTP,
<<"name">> := Name,
<<"enable">> := true,
<<"status">> := _,
<<"node_status">> := [_ | _],
<<"url">> := URL1
}},
request_json(
post,
uri(["bridges"]),
BridgeParams,
Config
)
),
{ok, _} = ?block_until(#{?snk_kind := resource_disconnected_enter}),
{ok, _} = ?block_until(#{?snk_kind := resource_auto_reconnect}, 1500)
end,
fun(Trace0) ->
Trace = ?of_kind(resource_auto_reconnect, Trace0),
?assertMatch([#{}], Trace),
ok
end
),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config),
{ok, 200, []} = request_json(get, uri(["bridges"]), Config),

%% auto_retry_interval=infinity
BridgeParams1 = BridgeParams#{
<<"resource_opts">> => #{<<"auto_restart_interval">> => "infinity"}
},
?check_trace(
begin
?assertMatch(
{ok, 201, #{
<<"type">> := ?BRIDGE_TYPE_HTTP,
<<"name">> := Name,
<<"enable">> := true,
<<"status">> := _,
<<"node_status">> := [_ | _],
<<"url">> := URL1
}},
request_json(
post,
uri(["bridges"]),
BridgeParams1,
Config
)
),
{ok, _} = ?block_until(#{?snk_kind := resource_disconnected_enter}),
?assertEqual(timeout, ?block_until(#{?snk_kind := resource_auto_reconnect}, 1500))
end,
fun(Trace0) ->
Trace = ?of_kind(resource_auto_reconnect, Trace0),
?assertMatch([], Trace),
ok
end
),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config),
{ok, 200, []} = request_json(get, uri(["bridges"]), Config),
meck:unload(emqx_resource).

t_http_bridges_local_topic(Config) ->
Port = ?config(port, Config),
%% assert we there's no bridges at first
Expand Down Expand Up @@ -1384,7 +1300,7 @@ t_metrics(Config) ->
),
ok.

%% request_timeout in bridge root should match request_timeout in
%% request_timeout in bridge root should match request_ttl in
%% resource_opts.
t_inconsistent_webhook_request_timeouts(Config) ->
Port = ?config(port, Config),
Expand All @@ -1395,7 +1311,7 @@ t_inconsistent_webhook_request_timeouts(Config) ->
?HTTP_BRIDGE(URL1, Name),
#{
<<"request_timeout">> => <<"1s">>,
<<"resource_opts">> => #{<<"request_timeout">> => <<"2s">>}
<<"resource_opts">> => #{<<"request_ttl">> => <<"2s">>}
}
),
%% root request_timeout is deprecated for bridge.
Expand All @@ -1410,8 +1326,8 @@ t_inconsistent_webhook_request_timeouts(Config) ->
Config
),
?assertNot(maps:is_key(<<"request_timeout">>, Response)),
?assertMatch(#{<<"request_timeout">> := <<"2s">>}, ResourceOpts),
validate_resource_request_timeout(proplists:get_value(group, Config), 2000, Name),
?assertMatch(#{<<"request_ttl">> := <<"2s">>}, ResourceOpts),
validate_resource_request_ttl(proplists:get_value(group, Config), 2000, Name),
ok.

t_cluster_later_join_metrics(Config) ->
Expand Down Expand Up @@ -1452,7 +1368,7 @@ t_cluster_later_join_metrics(Config) ->
),
ok.

validate_resource_request_timeout(single, Timeout, Name) ->
validate_resource_request_ttl(single, Timeout, Name) ->
SentData = #{payload => <<"Hello EMQX">>, timestamp => 1668602148000},
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
ResId = emqx_bridge_resource:resource_id(<<"webhook">>, Name),
Expand All @@ -1472,7 +1388,7 @@ validate_resource_request_timeout(single, Timeout, Name) ->
ok
end
);
validate_resource_request_timeout(_Cluster, _Timeout, _Name) ->
validate_resource_request_ttl(_Cluster, _Timeout, _Name) ->
ignore.

%%
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ webhook_config_test() ->
}
}
} = check(Conf3),
?assertMatch(#{<<"request_timeout">> := infinity}, ResourceOpts),
?assertMatch(#{<<"request_ttl">> := infinity}, ResourceOpts),
ok.

up(#{<<"bridges">> := Bridges0} = Conf0) ->
Expand Down
13 changes: 6 additions & 7 deletions apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ bridge_async_config(#{port := Port} = Config) ->
ConnectTimeout = maps:get(connect_timeout, Config, 1),
RequestTimeout = maps:get(request_timeout, Config, 10000),
ResumeInterval = maps:get(resume_interval, Config, "1s"),
ResourceRequestTimeout = maps:get(resource_request_timeout, Config, "infinity"),
ResourceRequestTTL = maps:get(resource_request_ttl, Config, "infinity"),
ConfigString = io_lib:format(
"bridges.~s.~s {\n"
" url = \"http://localhost:~p\"\n"
Expand All @@ -182,11 +182,10 @@ bridge_async_config(#{port := Port} = Config) ->
" body = \"${id}\""
" resource_opts {\n"
" inflight_window = 100\n"
" auto_restart_interval = \"60s\"\n"
" health_check_interval = \"15s\"\n"
" max_buffer_bytes = \"1GB\"\n"
" query_mode = \"~s\"\n"
" request_timeout = \"~p\"\n"
" request_ttl = \"~p\"\n"
" resume_interval = \"~s\"\n"
" start_after_created = \"true\"\n"
" start_timeout = \"5s\"\n"
Expand All @@ -204,7 +203,7 @@ bridge_async_config(#{port := Port} = Config) ->
PoolSize,
RequestTimeout,
QueryMode,
ResourceRequestTimeout,
ResourceRequestTTL,
ResumeInterval
]
),
Expand Down Expand Up @@ -247,7 +246,7 @@ t_send_async_connection_timeout(_Config) ->
query_mode => "async",
connect_timeout => ResponseDelayMS * 2,
request_timeout => 10000,
resource_request_timeout => "infinity"
resource_request_ttl => "infinity"
}),
NumberOfMessagesToSend = 10,
[
Expand All @@ -269,7 +268,7 @@ t_async_free_retries(_Config) ->
query_mode => "sync",
connect_timeout => 1_000,
request_timeout => 10_000,
resource_request_timeout => "10000s"
resource_request_ttl => "10000s"
}),
%% Fail 5 times then succeed.
Context = #{error_attempts => 5},
Expand All @@ -295,7 +294,7 @@ t_async_common_retries(_Config) ->
resume_interval => "100ms",
connect_timeout => 1_000,
request_timeout => 10_000,
resource_request_timeout => "10000s"
resource_request_ttl => "10000s"
}),
%% Keeps failing until connector gives up.
Context = #{error_attempts => infinity},
Expand Down
1 change: 0 additions & 1 deletion apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.erl
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ values(_Method, Type) ->
resource_opts => #{
worker_pool_size => 8,
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME,
query_mode => sync,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ cassa_config(BridgeType, Config) ->
" password = ~p\n"
" cql = ~p\n"
" resource_opts = {\n"
" request_timeout = 500ms\n"
" request_ttl = 500ms\n"
" batch_size = ~b\n"
" query_mode = ~s\n"
" }\n"
Expand Down Expand Up @@ -511,7 +511,6 @@ t_write_failure(Config) ->
#{
<<"resource_opts">> =>
#{
<<"auto_restart_interval">> => <<"100ms">>,
<<"resume_interval">> => <<"100ms">>,
<<"health_check_interval">> => <<"100ms">>
}
Expand Down Expand Up @@ -636,7 +635,7 @@ t_bad_sql_parameter(Config) ->
Config,
#{
<<"resource_opts">> => #{
<<"request_timeout">> => 500,
<<"request_ttl">> => 500,
<<"resume_interval">> => 100,
<<"health_check_interval">> => 100
}
Expand Down
1 change: 0 additions & 1 deletion apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ values(_Method, Type) ->
resource_opts => #{
worker_pool_size => 8,
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME,
query_mode => async,
Expand Down
1 change: 0 additions & 1 deletion apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ values(_Method) ->
resource_opts => #{
worker_pool_size => 8,
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME,
query_mode => sync,
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_bridge_dynamo/test/emqx_bridge_dynamo_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ dynamo_config(BridgeType, Config) ->
" aws_access_key_id = ~p\n"
" aws_secret_access_key = ~p\n"
" resource_opts = {\n"
" request_timeout = 500ms\n"
" request_ttl = 500ms\n"
" batch_size = ~b\n"
" query_mode = ~s\n"
" }\n"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{application, emqx_bridge_gcp_pubsub, [
{description, "EMQX Enterprise GCP Pub/Sub Bridge"},
{vsn, "0.1.1"},
{vsn, "0.1.2"},
{registered, []},
{applications, [
kernel,
Expand Down

0 comments on commit 33aa879

Please sign in to comment.