Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(resource): deprecate auto_restart_interval in favor of health_check_interval #10910

Merged
merged 6 commits into from
Jun 2, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions apps/emqx_bridge/src/emqx_bridge.erl
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,9 @@ send_message(BridgeType, BridgeName, ResId, Message) ->
end.

query_opts(Config) ->
case emqx_utils_maps:deep_get([resource_opts, request_timeout], Config, false) of
case emqx_utils_maps:deep_get([resource_opts, request_ttl], Config, false) of
Timeout when is_integer(Timeout) orelse Timeout =:= infinity ->
%% request_timeout is configured
%% request_ttl is configured
#{timeout => Timeout};
_ ->
%% emqx_resource has a default value (15s)
Expand Down
2 changes: 0 additions & 2 deletions apps/emqx_bridge/src/emqx_bridge_api.erl
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ info_example_basic(webhook) ->
resource_opts => #{
worker_pool_size => 1,
health_check_interval => 15000,
auto_restart_interval => 15000,
query_mode => async,
inflight_window => 100,
max_buffer_bytes => 100 * 1024 * 1024
Expand All @@ -244,7 +243,6 @@ mqtt_main_example() ->
max_inflight => 100,
resource_opts => #{
health_check_interval => <<"15s">>,
auto_restart_interval => <<"60s">>,
query_mode => sync,
max_buffer_bytes => 100 * 1024 * 1024
},
Expand Down
6 changes: 3 additions & 3 deletions apps/emqx_bridge/src/emqx_bridge_resource.erl
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,8 @@ parse_confs(
Reason1 = emqx_utils:readable_error_msg(Reason),
invalid_data(<<"Invalid URL: ", Url1/binary, ", details: ", Reason1/binary>>)
end,
RequestTimeout = emqx_utils_maps:deep_get(
[resource_opts, request_timeout],
RequestTTL = emqx_utils_maps:deep_get(
[resource_opts, request_ttl],
Conf
),
Conf#{
Expand All @@ -339,7 +339,7 @@ parse_confs(
method => Method,
body => maps:get(body, Conf, undefined),
headers => Headers,
request_timeout => RequestTimeout,
request_ttl => RequestTTL,
max_retries => Retry
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ default_ssl() ->
default_resource_opts() ->
#{
<<"inflight_window">> => 100,
<<"auto_restart_interval">> => <<"60s">>,
<<"health_check_interval">> => <<"15s">>,
<<"max_buffer_bytes">> => <<"1GB">>,
<<"query_mode">> => <<"sync">>,
Expand Down
98 changes: 7 additions & 91 deletions apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ groups() ->
SingleOnlyTests = [
t_broken_bpapi_vsn,
t_old_bpapi_vsn,
t_bridges_probe,
t_auto_restart_interval
t_bridges_probe
],
ClusterLaterJoinOnlyTCs = [t_cluster_later_join_metrics],
[
Expand Down Expand Up @@ -559,89 +558,6 @@ t_http_crud_apis(Config) ->

{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config).

t_auto_restart_interval(Config) ->
Port = ?config(port, Config),
%% assert we there's no bridges at first
{ok, 200, []} = request_json(get, uri(["bridges"]), Config),

meck:new(emqx_resource, [passthrough]),
meck:expect(emqx_resource, call_start, fun(_, _, _) -> {error, fake_error} end),

%% then we add a webhook bridge, using POST
%% POST /bridges/ will create a bridge
URL1 = ?URL(Port, "path1"),
Name = ?BRIDGE_NAME,
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
BridgeParams = ?HTTP_BRIDGE(URL1, Name)#{
<<"resource_opts">> => #{<<"auto_restart_interval">> => "1s"}
},
?check_trace(
begin
?assertMatch(
{ok, 201, #{
<<"type">> := ?BRIDGE_TYPE_HTTP,
<<"name">> := Name,
<<"enable">> := true,
<<"status">> := _,
<<"node_status">> := [_ | _],
<<"url">> := URL1
}},
request_json(
post,
uri(["bridges"]),
BridgeParams,
Config
)
),
{ok, _} = ?block_until(#{?snk_kind := resource_disconnected_enter}),
{ok, _} = ?block_until(#{?snk_kind := resource_auto_reconnect}, 1500)
end,
fun(Trace0) ->
Trace = ?of_kind(resource_auto_reconnect, Trace0),
?assertMatch([#{}], Trace),
ok
end
),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config),
{ok, 200, []} = request_json(get, uri(["bridges"]), Config),

%% auto_retry_interval=infinity
BridgeParams1 = BridgeParams#{
<<"resource_opts">> => #{<<"auto_restart_interval">> => "infinity"}
},
?check_trace(
begin
?assertMatch(
{ok, 201, #{
<<"type">> := ?BRIDGE_TYPE_HTTP,
<<"name">> := Name,
<<"enable">> := true,
<<"status">> := _,
<<"node_status">> := [_ | _],
<<"url">> := URL1
}},
request_json(
post,
uri(["bridges"]),
BridgeParams1,
Config
)
),
{ok, _} = ?block_until(#{?snk_kind := resource_disconnected_enter}),
?assertEqual(timeout, ?block_until(#{?snk_kind := resource_auto_reconnect}, 1500))
end,
fun(Trace0) ->
Trace = ?of_kind(resource_auto_reconnect, Trace0),
?assertMatch([], Trace),
ok
end
),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config),
{ok, 200, []} = request_json(get, uri(["bridges"]), Config),
meck:unload(emqx_resource).

t_http_bridges_local_topic(Config) ->
Port = ?config(port, Config),
%% assert we there's no bridges at first
Expand Down Expand Up @@ -1384,7 +1300,7 @@ t_metrics(Config) ->
),
ok.

%% request_timeout in bridge root should match request_timeout in
%% request_timeout in bridge root should match request_ttl in
%% resource_opts.
t_inconsistent_webhook_request_timeouts(Config) ->
Port = ?config(port, Config),
Expand All @@ -1395,7 +1311,7 @@ t_inconsistent_webhook_request_timeouts(Config) ->
?HTTP_BRIDGE(URL1, Name),
#{
<<"request_timeout">> => <<"1s">>,
<<"resource_opts">> => #{<<"request_timeout">> => <<"2s">>}
<<"resource_opts">> => #{<<"request_ttl">> => <<"2s">>}
}
),
%% root request_timeout is deprecated for bridge.
Expand All @@ -1410,8 +1326,8 @@ t_inconsistent_webhook_request_timeouts(Config) ->
Config
),
?assertNot(maps:is_key(<<"request_timeout">>, Response)),
?assertMatch(#{<<"request_timeout">> := <<"2s">>}, ResourceOpts),
validate_resource_request_timeout(proplists:get_value(group, Config), 2000, Name),
?assertMatch(#{<<"request_ttl">> := <<"2s">>}, ResourceOpts),
validate_resource_request_ttl(proplists:get_value(group, Config), 2000, Name),
ok.

t_cluster_later_join_metrics(Config) ->
Expand Down Expand Up @@ -1452,7 +1368,7 @@ t_cluster_later_join_metrics(Config) ->
),
ok.

validate_resource_request_timeout(single, Timeout, Name) ->
validate_resource_request_ttl(single, Timeout, Name) ->
SentData = #{payload => <<"Hello EMQX">>, timestamp => 1668602148000},
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
ResId = emqx_bridge_resource:resource_id(<<"webhook">>, Name),
Expand All @@ -1472,7 +1388,7 @@ validate_resource_request_timeout(single, Timeout, Name) ->
ok
end
);
validate_resource_request_timeout(_Cluster, _Timeout, _Name) ->
validate_resource_request_ttl(_Cluster, _Timeout, _Name) ->
ignore.

%%
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ webhook_config_test() ->
}
}
} = check(Conf3),
?assertMatch(#{<<"request_timeout">> := infinity}, ResourceOpts),
?assertMatch(#{<<"request_ttl">> := infinity}, ResourceOpts),
ok.

up(#{<<"bridges">> := Bridges0} = Conf0) ->
Expand Down
13 changes: 6 additions & 7 deletions apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ bridge_async_config(#{port := Port} = Config) ->
ConnectTimeout = maps:get(connect_timeout, Config, 1),
RequestTimeout = maps:get(request_timeout, Config, 10000),
ResumeInterval = maps:get(resume_interval, Config, "1s"),
ResourceRequestTimeout = maps:get(resource_request_timeout, Config, "infinity"),
ResourceRequestTTL = maps:get(resource_request_ttl, Config, "infinity"),
ConfigString = io_lib:format(
"bridges.~s.~s {\n"
" url = \"http://localhost:~p\"\n"
Expand All @@ -182,11 +182,10 @@ bridge_async_config(#{port := Port} = Config) ->
" body = \"${id}\""
" resource_opts {\n"
" inflight_window = 100\n"
" auto_restart_interval = \"60s\"\n"
" health_check_interval = \"15s\"\n"
" max_buffer_bytes = \"1GB\"\n"
" query_mode = \"~s\"\n"
" request_timeout = \"~p\"\n"
" request_ttl = \"~p\"\n"
" resume_interval = \"~s\"\n"
" start_after_created = \"true\"\n"
" start_timeout = \"5s\"\n"
Expand All @@ -204,7 +203,7 @@ bridge_async_config(#{port := Port} = Config) ->
PoolSize,
RequestTimeout,
QueryMode,
ResourceRequestTimeout,
ResourceRequestTTL,
ResumeInterval
]
),
Expand Down Expand Up @@ -247,7 +246,7 @@ t_send_async_connection_timeout(_Config) ->
query_mode => "async",
connect_timeout => ResponseDelayMS * 2,
request_timeout => 10000,
resource_request_timeout => "infinity"
resource_request_ttl => "infinity"
}),
NumberOfMessagesToSend = 10,
[
Expand All @@ -269,7 +268,7 @@ t_async_free_retries(_Config) ->
query_mode => "sync",
connect_timeout => 1_000,
request_timeout => 10_000,
resource_request_timeout => "10000s"
resource_request_ttl => "10000s"
}),
%% Fail 5 times then succeed.
Context = #{error_attempts => 5},
Expand All @@ -295,7 +294,7 @@ t_async_common_retries(_Config) ->
resume_interval => "100ms",
connect_timeout => 1_000,
request_timeout => 10_000,
resource_request_timeout => "10000s"
resource_request_ttl => "10000s"
}),
%% Keeps failing until connector gives up.
Context = #{error_attempts => infinity},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{application, emqx_bridge_cassandra, [
{description, "EMQX Enterprise Cassandra Bridge"},
{vsn, "0.1.1"},
{vsn, "0.1.2"},
{registered, []},
{applications, [kernel, stdlib, ecql]},
{env, []},
Expand Down
1 change: 0 additions & 1 deletion apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.erl
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ values(_Method, Type) ->
resource_opts => #{
worker_pool_size => 8,
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME,
query_mode => sync,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ cassa_config(BridgeType, Config) ->
" password = ~p\n"
" cql = ~p\n"
" resource_opts = {\n"
" request_timeout = 500ms\n"
" request_ttl = 500ms\n"
" batch_size = ~b\n"
" query_mode = ~s\n"
" }\n"
Expand Down Expand Up @@ -511,7 +511,6 @@ t_write_failure(Config) ->
#{
<<"resource_opts">> =>
#{
<<"auto_restart_interval">> => <<"100ms">>,
<<"resume_interval">> => <<"100ms">>,
<<"health_check_interval">> => <<"100ms">>
}
Expand Down Expand Up @@ -636,7 +635,7 @@ t_bad_sql_parameter(Config) ->
Config,
#{
<<"resource_opts">> => #{
<<"request_timeout">> => 500,
<<"request_ttl">> => 500,
<<"resume_interval">> => 100,
<<"health_check_interval">> => 100
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{application, emqx_bridge_clickhouse, [
{description, "EMQX Enterprise ClickHouse Bridge"},
{vsn, "0.2.0"},
{vsn, "0.2.1"},
{registered, []},
{applications, [kernel, stdlib, clickhouse, emqx_resource]},
{env, []},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ values(_Method, Type) ->
resource_opts => #{
worker_pool_size => 8,
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME,
query_mode => async,
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{application, emqx_bridge_dynamo, [
{description, "EMQX Enterprise Dynamo Bridge"},
{vsn, "0.1.1"},
{vsn, "0.1.2"},
{registered, []},
{applications, [kernel, stdlib, erlcloud]},
{env, []},
Expand Down
1 change: 0 additions & 1 deletion apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ values(_Method) ->
resource_opts => #{
worker_pool_size => 8,
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME,
query_mode => sync,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ dynamo_config(BridgeType, Config) ->
" aws_access_key_id = ~p\n"
" aws_secret_access_key = ~p\n"
" resource_opts = {\n"
" request_timeout = 500ms\n"
" request_ttl = 500ms\n"
" batch_size = ~b\n"
" query_mode = ~s\n"
" }\n"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{application, emqx_bridge_gcp_pubsub, [
{description, "EMQX Enterprise GCP Pub/Sub Bridge"},
{vsn, "0.1.1"},
{vsn, "0.1.2"},
{registered, []},
{applications, [
kernel,
Expand Down