Skip to content

Commit

Permalink
Merge pull request #12812 from thalesmg/async-res-manager-health-chec…
Browse files Browse the repository at this point in the history
…k-m-20240328

feat(resource manager): perform non-blocking health checks
  • Loading branch information
thalesmg committed Apr 4, 2024
2 parents a0ad4fa + bade09b commit 069cd4f
Show file tree
Hide file tree
Showing 7 changed files with 291 additions and 129 deletions.
43 changes: 34 additions & 9 deletions apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
Expand Up @@ -825,22 +825,47 @@ do_start_stop_bridges(Type, Config) ->
%% Connecting to this endpoint should always timeout
BadServer = iolist_to_binary(io_lib:format("localhost:~B", [ListenPort])),
BadName = <<"bad_", (atom_to_binary(Type))/binary>>,
CreateRes0 = request_json(
post,
uri(["bridges"]),
?MQTT_BRIDGE(BadServer, BadName),
Config
),
?assertMatch(
{ok, 201, #{
<<"type">> := ?BRIDGE_TYPE_MQTT,
<<"name">> := BadName,
<<"enable">> := true,
<<"server">> := BadServer,
<<"status">> := <<"connecting">>,
<<"node_status">> := [_ | _]
<<"server">> := BadServer
}},
request_json(
post,
uri(["bridges"]),
?MQTT_BRIDGE(BadServer, BadName),
Config
)
CreateRes0
),
{ok, 201, CreateRes1} = CreateRes0,
case CreateRes1 of
#{
<<"node_status">> := [
#{
<<"status">> := <<"disconnected">>,
<<"status_reason">> := <<"connack_timeout">>
},
#{<<"status">> := <<"connecting">>}
| _
],
%% `inconsistent': one node is `?status_disconnected' (because it has already
%% timed out), the other node is `?status_connecting' (started later and
%% haven't timed out yet)
<<"status">> := <<"inconsistent">>,
<<"status_reason">> := <<"connack_timeout">>
} ->
ok;
#{
<<"node_status">> := [_],
<<"status">> := <<"connecting">>
} ->
ok;
_ ->
error({unexpected_result, CreateRes1})
end,
BadBridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_MQTT, BadName),
?assertMatch(
%% request from product: return 400 on such errors
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_resource/src/emqx_resource.app.src
@@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_resource, [
{description, "Manager for all external resources"},
{vsn, "0.1.28"},
{vsn, "0.1.29"},
{registered, []},
{mod, {emqx_resource_app, []}},
{applications, [
Expand Down
235 changes: 160 additions & 75 deletions apps/emqx_resource/src/emqx_resource_manager.erl
Expand Up @@ -60,6 +60,9 @@
% Behaviour
-export([init/1, callback_mode/0, handle_event/4, terminate/3]).

%% Internal exports.
-export([worker_resource_health_check/1]).

% State record
-record(data, {
id,
Expand All @@ -73,7 +76,15 @@
state,
error,
pid,
added_channels,
added_channels = #{},
%% Reference to process performing resource health check.
hc_workers = #{resource => #{}, channel => #{}} :: #{
resource | channel := #{{pid(), reference()} => true}
},
%% Callers waiting on health check
hc_pending_callers = #{resource => [], channel => []} :: #{
resource | channel := [gen_server:from()]
},
extra
}).
-type data() :: #data{}.
Expand Down Expand Up @@ -153,13 +164,13 @@ create(ResId, Group, ResourceType, Config, Opts) ->
case SpawnBufferWorkers andalso lists:member(QueryMode, [sync, async]) of
true ->
%% start resource workers as the query type requires them
ok = emqx_resource_buffer_worker_sup:start_workers(ResId, Opts),
case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of
true ->
wait_for_ready(ResId, maps:get(start_timeout, Opts, ?START_TIMEOUT));
false ->
ok
end;
ok = emqx_resource_buffer_worker_sup:start_workers(ResId, Opts);
false ->
ok
end,
case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of
true ->
wait_for_ready(ResId, maps:get(start_timeout, Opts, ?START_TIMEOUT));
false ->
ok
end.
Expand Down Expand Up @@ -455,25 +466,25 @@ handle_event({call, From}, {remove, ClearMetrics}, _State, Data) ->
handle_event({call, From}, lookup, _State, #data{group = Group} = Data) ->
Reply = {ok, Group, data_record_to_external_map(Data)},
{keep_state_and_data, [{reply, From, Reply}]};
% Called when doing a manually health check.
% Called when doing a manual health check.
handle_event({call, From}, health_check, ?state_stopped, _Data) ->
Actions = [{reply, From, {error, resource_is_stopped}}],
{keep_state_and_data, Actions};
handle_event({call, From}, {channel_health_check, _}, ?state_stopped, _Data) ->
Actions = [{reply, From, {error, resource_is_stopped}}],
{keep_state_and_data, Actions};
handle_event({call, From}, health_check, _State, Data) ->
handle_manually_health_check(From, Data);
handle_manual_resource_health_check(From, Data);
handle_event({call, From}, {channel_health_check, ChannelId}, _State, Data) ->
handle_manually_channel_health_check(From, Data, ChannelId);
handle_manual_channel_health_check(From, Data, ChannelId);
% State: CONNECTING
handle_event(enter, _OldState, ?state_connecting = State, Data) ->
ok = log_status_consistency(State, Data),
{keep_state_and_data, [{state_timeout, 0, health_check}]};
handle_event(internal, start_resource, ?state_connecting, Data) ->
start_resource(Data, undefined);
handle_event(state_timeout, health_check, ?state_connecting, Data) ->
handle_connecting_health_check(Data);
start_resource_health_check(Data);
handle_event(
{call, From}, {remove_channel, ChannelId}, ?state_connecting = _State, Data
) ->
Expand All @@ -487,7 +498,7 @@ handle_event(enter, _OldState, ?state_connected = State, Data) ->
?tp(resource_connected_enter, #{}),
{keep_state_and_data, health_check_actions(Data)};
handle_event(state_timeout, health_check, ?state_connected, Data) ->
handle_connected_health_check(Data);
start_resource_health_check(Data);
handle_event(
{call, From}, {add_channel, ChannelId, Config}, ?state_connected = _State, Data
) ->
Expand Down Expand Up @@ -523,6 +534,15 @@ handle_event(
) ->
Channels = emqx_resource:call_get_channels(Data#data.id, Data#data.mod),
{keep_state_and_data, {reply, From, {ok, Channels}}};
handle_event(
info,
{'DOWN', Ref, process, Pid, Res},
State0,
Data0 = #data{hc_workers = #{resource := HCWorkers}}
) when
is_map_key({Pid, Ref}, HCWorkers)
->
handle_resource_health_check_worker_down(State0, Data0, {Pid, Ref}, Res);
% Ignore all other events
handle_event(EventType, EventData, State, Data) ->
?SLOG(
Expand Down Expand Up @@ -835,26 +855,135 @@ handle_not_connected_and_not_connecting_remove_channel(From, ChannelId, Data) ->
_ = maybe_clear_alarm(ChannelId),
{keep_state, update_state(NewData, Data), [{reply, From, ok}]}.

handle_manually_health_check(From, Data) ->
with_health_check(
Data,
fun(Status, UpdatedData) ->
Actions = [{reply, From, {ok, Status}}],
{next_state, Status, channels_health_check(Status, UpdatedData), Actions}
end
).
handle_manual_resource_health_check(From, Data0 = #data{hc_workers = #{resource := HCWorkers}}) when
map_size(HCWorkers) > 0
->
%% ongoing health check
#data{hc_pending_callers = Pending0 = #{resource := RPending0}} = Data0,
Pending = Pending0#{resource := [From | RPending0]},
Data = Data0#data{hc_pending_callers = Pending},
{keep_state, Data};
handle_manual_resource_health_check(From, Data0) ->
#data{hc_pending_callers = Pending0 = #{resource := RPending0}} = Data0,
Pending = Pending0#{resource := [From | RPending0]},
Data = Data0#data{hc_pending_callers = Pending},
start_resource_health_check(Data).

reply_pending_health_check_callers(Status, resource, Data0 = #data{hc_pending_callers = Pending0}) ->
#{resource := RPending} = Pending0,
Actions = [{reply, From, {ok, Status}} || From <- RPending],
Data = Data0#data{hc_pending_callers = Pending0#{resource := []}},
{Actions, Data}.

start_resource_health_check(#data{state = undefined} = Data) ->
%% No resource running, thus disconnected.
%% A health check spawn when state is undefined can only happen when someone manually
%% asks for a health check and the resource could not initialize or has not had enough
%% time to do so. Let's assume the continuation is as if we were `?status_connecting'.
continue_resource_health_check_not_connected(?status_disconnected, Data);
start_resource_health_check(#data{hc_workers = #{resource := HCWorkers}}) when
map_size(HCWorkers) > 0
->
%% Already ongoing
keep_state_and_data;
start_resource_health_check(#data{} = Data0) ->
#data{hc_workers = HCWorkers0 = #{resource := RHCWorkers0}} = Data0,
WorkerRef = {_Pid, _Ref} = spawn_health_check_worker(Data0),
HCWorkers = HCWorkers0#{resource := RHCWorkers0#{WorkerRef => true}},
Data = Data0#data{hc_workers = HCWorkers},
{keep_state, Data}.

-spec spawn_health_check_worker(data()) -> {pid(), reference()}.
spawn_health_check_worker(#data{} = Data) ->
spawn_monitor(?MODULE, worker_resource_health_check, [Data]).

%% separated so it can be spec'ed and placate dialyzer tantrums...
-spec worker_resource_health_check(data()) -> no_return().
worker_resource_health_check(Data) ->
HCRes = emqx_resource:call_health_check(Data#data.id, Data#data.mod, Data#data.state),
exit({ok, HCRes}).

handle_resource_health_check_worker_down(CurrentState, Data0, WorkerRef, ExitResult) ->
#data{hc_workers = HCWorkers0 = #{resource := RHCWorkers0}} = Data0,
HCWorkers = HCWorkers0#{resource := maps:remove(WorkerRef, RHCWorkers0)},
Data1 = Data0#data{hc_workers = HCWorkers},
case ExitResult of
{ok, HCRes} ->
continue_with_health_check(Data1, CurrentState, HCRes);
_ ->
%% Unexpected: `emqx_resource:call_health_check' catches all exceptions.
continue_with_health_check(Data1, CurrentState, {error, ExitResult})
end.

handle_manually_channel_health_check(From, #data{state = undefined}, _ChannelId) ->
continue_with_health_check(#data{} = Data0, CurrentState, HCRes) ->
#data{
id = ResId,
error = PrevError
} = Data0,
{NewStatus, NewState, Err} = parse_health_check_result(HCRes, Data0),
_ = maybe_alarm(NewStatus, ResId, Err, PrevError),
ok = maybe_resume_resource_workers(ResId, NewStatus),
Data1 = Data0#data{
state = NewState, status = NewStatus, error = Err
},
Data = update_state(Data1, Data0),
case CurrentState of
?state_connected ->
continue_resource_health_check_connected(NewStatus, Data);
_ ->
%% `?state_connecting' | `?state_disconnected' | `?state_stopped'
continue_resource_health_check_not_connected(NewStatus, Data)
end.

%% Continuation to be used when the current resource state is `?state_connected'.
continue_resource_health_check_connected(NewStatus, Data0) ->
case NewStatus of
?status_connected ->
{Replies, Data1} = reply_pending_health_check_callers(NewStatus, resource, Data0),
Data2 = channels_health_check(?status_connected, Data1),
Data = update_state(Data2, Data0),
Actions = Replies ++ health_check_actions(Data),
{keep_state, Data, Actions};
_ ->
?SLOG(warning, #{
msg => "health_check_failed",
id => Data0#data.id,
status => NewStatus
}),
%% Note: works because, coincidentally, channel/resource status is a
%% subset of resource manager state... But there should be a conversion
%% between the two here, as resource manager also has `stopped', which is
%% not a valid status at the time of writing.
{Replies, Data} = reply_pending_health_check_callers(NewStatus, resource, Data0),
{next_state, NewStatus, channels_health_check(NewStatus, Data), Replies}
end.

%% Continuation to be used when the current resource state is not `?state_connected'.
continue_resource_health_check_not_connected(NewStatus, Data0) ->
{Replies, Data} = reply_pending_health_check_callers(NewStatus, resource, Data0),
case NewStatus of
?status_connected ->
{next_state, ?state_connected, channels_health_check(?status_connected, Data), Replies};
?status_connecting ->
Actions = Replies ++ health_check_actions(Data),
{next_state, ?status_connecting, channels_health_check(?status_connecting, Data),
Actions};
?status_disconnected ->
{next_state, ?state_disconnected, channels_health_check(?status_disconnected, Data),
Replies}
end.

handle_manual_channel_health_check(From, #data{state = undefined}, _ChannelId) ->
{keep_state_and_data, [{reply, From, channel_status({error, resource_disconnected})}]};
handle_manually_channel_health_check(
handle_manual_channel_health_check(
From,
#data{added_channels = Channels} = _Data,
ChannelId
) when
is_map_key(ChannelId, Channels)
->
{keep_state_and_data, [{reply, From, maps:get(ChannelId, Channels)}]};
handle_manually_channel_health_check(
handle_manual_channel_health_check(
From,
_Data,
_ChannelId
Expand All @@ -865,56 +994,6 @@ get_channel_status_channel_added(#data{id = ResId, mod = Mod, state = State}, Ch
RawStatus = emqx_resource:call_channel_health_check(ResId, ChannelId, Mod, State),
channel_status(RawStatus).

handle_connecting_health_check(Data) ->
with_health_check(
Data,
fun
(?status_connected, UpdatedData) ->
{next_state, ?state_connected,
channels_health_check(?status_connected, UpdatedData)};
(?status_connecting, UpdatedData) ->
{keep_state, channels_health_check(?status_connecting, UpdatedData),
health_check_actions(UpdatedData)};
(?status_disconnected, UpdatedData) ->
{next_state, ?state_disconnected,
channels_health_check(?status_disconnected, UpdatedData)}
end
).

handle_connected_health_check(Data) ->
with_health_check(
Data,
fun
(?status_connected, UpdatedData0) ->
UpdatedData1 = channels_health_check(?status_connected, UpdatedData0),
{keep_state, UpdatedData1, health_check_actions(UpdatedData1)};
(Status, UpdatedData) ->
?SLOG(warning, #{
msg => "health_check_failed",
id => Data#data.id,
status => Status
}),
%% Note: works because, coincidentally, channel/resource status is a
%% subset of resource manager state... But there should be a conversion
%% between the two here, as resource manager also has `stopped', which is
%% not a valid status at the time of writing.
{next_state, Status, channels_health_check(Status, UpdatedData)}
end
).

with_health_check(#data{state = undefined} = Data, Func) ->
Func(disconnected, Data);
with_health_check(#data{error = PrevError} = Data, Func) ->
ResId = Data#data.id,
HCRes = emqx_resource:call_health_check(Data#data.id, Data#data.mod, Data#data.state),
{Status, NewState, Err} = parse_health_check_result(HCRes, Data),
_ = maybe_alarm(Status, ResId, Err, PrevError),
ok = maybe_resume_resource_workers(ResId, Status),
UpdatedData = Data#data{
state = NewState, status = Status, error = Err
},
Func(Status, update_state(UpdatedData, Data)).

-spec channels_health_check(resource_status(), data()) -> data().
channels_health_check(?status_connected = _ConnectorStatus, Data0) ->
Channels = maps:to_list(Data0#data.added_channels),
Expand Down Expand Up @@ -1097,9 +1176,15 @@ update_state(Data) ->
update_state(DataWas, DataWas) ->
DataWas;
update_state(Data, _DataWas) ->
_ = insert_cache(Data#data.id, Data),
_ = insert_cache(Data#data.id, remove_runtime_data(Data)),
Data.

remove_runtime_data(#data{} = Data0) ->
Data0#data{
hc_workers = #{resource => #{}, channel => #{}},
hc_pending_callers = #{resource => [], channel => []}
}.

health_check_interval(Opts) ->
maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL).

Expand Down

0 comments on commit 069cd4f

Please sign in to comment.