From 13746c2cdf94b7f1e1e4f10a3f1cd60aa13c173d Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 19 Jun 2023 17:56:06 -0300 Subject: [PATCH] fix(resource): check status when (re)starting a resource Fixes https://emqx.atlassian.net/browse/EMQX-10290 --- .../src/emqx_resource_manager.erl | 3 +- .../test/emqx_connector_demo.erl | 3 ++ .../test/emqx_resource_SUITE.erl | 28 +++++++++++++++++-- changes/ce/fix-11094.en.md | 1 + 4 files changed, 31 insertions(+), 4 deletions(-) create mode 100644 changes/ce/fix-11094.en.md diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index c1adb8ecd9..2e4822a2f6 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -223,8 +223,7 @@ restart(ResId, Opts) when is_binary(ResId) -> start(ResId, Opts) -> case safe_call(ResId, start, ?T_OPERATION) of ok -> - _ = wait_for_ready(ResId, maps:get(start_timeout, Opts, 5000)), - ok; + wait_for_ready(ResId, maps:get(start_timeout, Opts, 5000)); {error, _Reason} = Error -> Error end. diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index 96e22c6b68..b95d8c8bff 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -246,6 +246,9 @@ batch_big_payload({async, ReplyFunAndArgs}, InstId, Batch, State = #{pid := Pid} on_get_status(_InstId, #{health_check_error := true}) -> ?tp(connector_demo_health_check_error, #{}), disconnected; +on_get_status(_InstId, State = #{health_check_error := {msg, Message}}) -> + ?tp(connector_demo_health_check_error, #{}), + {disconnected, State, Message}; on_get_status(_InstId, #{pid := Pid}) -> timer:sleep(300), case is_process_alive(Pid) of diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 5883614aa7..934a978298 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -40,6 +40,7 @@ groups() -> init_per_testcase(_, Config) -> ct:timetrap({seconds, 30}), emqx_connector_demo:set_callback_mode(always_sync), + snabbkaffe:start_trace(), Config. end_per_testcase(_, _Config) -> @@ -1145,10 +1146,33 @@ t_auto_retry(_) -> ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource, create_error => true}, - #{auto_retry_interval => 100} + #{health_check_interval => 100} ), ?assertEqual(ok, Res). +%% tests resources that have an asynchronous start: they are created +%% without problems, but later some issue is found when calling the +%% health check. +t_start_throw_error(_Config) -> + Message = "something went wrong", + ?assertMatch( + {{ok, _}, {ok, _}}, + ?wait_async_action( + emqx_resource:create_local( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource, health_check_error => {msg, Message}}, + #{health_check_interval => 100} + ), + #{?snk_kind := connector_demo_health_check_error}, + 1_000 + ) + ), + %% Now, if we try to "reconnect" (restart) it, we should get the error + ?assertMatch({error, Message}, emqx_resource:start(?ID, _Opts = #{})), + ok. + t_health_check_disconnected(_) -> ?check_trace( begin @@ -1157,7 +1181,7 @@ t_health_check_disconnected(_) -> ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource, create_error => true}, - #{auto_retry_interval => 100} + #{health_check_interval => 100} ), ?assertEqual( {ok, disconnected}, diff --git a/changes/ce/fix-11094.en.md b/changes/ce/fix-11094.en.md new file mode 100644 index 0000000000..e73a8635f6 --- /dev/null +++ b/changes/ce/fix-11094.en.md @@ -0,0 +1 @@ +Fixed an issue where connection errors in Kafka Producer would not be reported when reconnecting the bridge.