Skip to content

Commit

Permalink
Merge pull request #11094 from thalesmg/fix-check-health-on-start-master
Browse files Browse the repository at this point in the history
fix(resource): check status when (re)starting a resource
  • Loading branch information
thalesmg committed Jun 20, 2023
2 parents b990db6 + 13746c2 commit 9c5d4f1
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 4 deletions.
3 changes: 1 addition & 2 deletions apps/emqx_resource/src/emqx_resource_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,7 @@ restart(ResId, Opts) when is_binary(ResId) ->
start(ResId, Opts) ->
case safe_call(ResId, start, ?T_OPERATION) of
ok ->
_ = wait_for_ready(ResId, maps:get(start_timeout, Opts, 5000)),
ok;
wait_for_ready(ResId, maps:get(start_timeout, Opts, 5000));
{error, _Reason} = Error ->
Error
end.
Expand Down
3 changes: 3 additions & 0 deletions apps/emqx_resource/test/emqx_connector_demo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,9 @@ batch_big_payload({async, ReplyFunAndArgs}, InstId, Batch, State = #{pid := Pid}
on_get_status(_InstId, #{health_check_error := true}) ->
?tp(connector_demo_health_check_error, #{}),
disconnected;
on_get_status(_InstId, State = #{health_check_error := {msg, Message}}) ->
?tp(connector_demo_health_check_error, #{}),
{disconnected, State, Message};
on_get_status(_InstId, #{pid := Pid}) ->
timer:sleep(300),
case is_process_alive(Pid) of
Expand Down
28 changes: 26 additions & 2 deletions apps/emqx_resource/test/emqx_resource_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ groups() ->
init_per_testcase(_, Config) ->
ct:timetrap({seconds, 30}),
emqx_connector_demo:set_callback_mode(always_sync),
snabbkaffe:start_trace(),
Config.

end_per_testcase(_, _Config) ->
Expand Down Expand Up @@ -1145,10 +1146,33 @@ t_auto_retry(_) ->
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource, create_error => true},
#{auto_retry_interval => 100}
#{health_check_interval => 100}
),
?assertEqual(ok, Res).

%% tests resources that have an asynchronous start: they are created
%% without problems, but later some issue is found when calling the
%% health check.
t_start_throw_error(_Config) ->
Message = "something went wrong",
?assertMatch(
{{ok, _}, {ok, _}},
?wait_async_action(
emqx_resource:create_local(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource, health_check_error => {msg, Message}},
#{health_check_interval => 100}
),
#{?snk_kind := connector_demo_health_check_error},
1_000
)
),
%% Now, if we try to "reconnect" (restart) it, we should get the error
?assertMatch({error, Message}, emqx_resource:start(?ID, _Opts = #{})),
ok.

t_health_check_disconnected(_) ->
?check_trace(
begin
Expand All @@ -1157,7 +1181,7 @@ t_health_check_disconnected(_) ->
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource, create_error => true},
#{auto_retry_interval => 100}
#{health_check_interval => 100}
),
?assertEqual(
{ok, disconnected},
Expand Down
1 change: 1 addition & 0 deletions changes/ce/fix-11094.en.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed an issue where connection errors in Kafka Producer would not be reported when reconnecting the bridge.

0 comments on commit 9c5d4f1

Please sign in to comment.