Skip to content

Commit

Permalink
feat: added channel health check functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
kjellwinblad committed Sep 29, 2023
1 parent 02cbbcd commit 0953a8a
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 9 deletions.
21 changes: 20 additions & 1 deletion apps/emqx_bridge/src/emqx_bridge_v2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@
config_key_path/0,
disable_enable/3,
create/3,
remove/2
remove/2,
health_check/2
]).

%% Config Update Handler API
Expand Down Expand Up @@ -191,6 +192,24 @@ send_message(BridgeType, BridgeName, Message, QueryOpts0) ->
Error
end.

health_check(BridgeType, BridgeName) ->
case lookup(BridgeType, BridgeName) of
#{
enable := true,
connector := ConnectorName
} ->
ConnectorId = emqx_connector_resource:resource_id(
bridge_v2_type_to_connector_type(BridgeType), ConnectorName
),
emqx_resource_manager:channel_health_check(
ConnectorId, id(BridgeType, BridgeName, ConnectorName)
);
#{enable := false} ->
{error, bridge_stopped};
Error ->
Error
end.

% do_send_msg_with_enabled_config(BridgeType, BridgeName, Message, QueryOpts0, Config) ->
% BridgeV2Id = emqx_bridge_v2:id(BridgeType, BridgeName),
% ConnectorResourceId = emqx_bridge_v2:extract_connector_id_from_bridge_v2_id(BridgeV2Id),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
on_get_status/2,
on_add_channel/4,
on_remove_channel/3,
on_get_channels/1
on_get_channels/1,
on_get_channel_status/3
]).

-export([
Expand Down Expand Up @@ -466,6 +467,10 @@ on_get_status(
connecting
end.

on_get_channel_status(_ResId, _ChannelId, _ResourceState) ->
%% TODO add actual channel status
connected.

% check_if_healthy_leaders(Client, KafkaTopic) when is_pid(Client) ->
% Leaders =
% case wolff_client:get_leader_connections(Client, KafkaTopic) of
Expand Down
1 change: 1 addition & 0 deletions apps/emqx_resource/include/emqx_resource.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
%%--------------------------------------------------------------------
-type resource_type() :: module().
-type resource_id() :: binary().
-type channel_id() :: binary().
-type raw_resource_config() :: binary() | raw_term_resource_config().
-type raw_term_resource_config() :: #{binary() => term()} | [raw_term_resource_config()].
-type resource_config() :: term().
Expand Down
32 changes: 27 additions & 5 deletions apps/emqx_resource/src/emqx_resource.erl
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
]).

%% Calls to the callback module with current resource state
%% They also save the state after the call finished (except query/2,3).
%% They also save the state after the call finished (except call_get_channel_config/3).

-export([
start/1,
Expand All @@ -76,6 +76,7 @@
restart/2,
%% verify if the resource is working normally
health_check/1,
channel_health_check/2,
%% set resource status to disconnected
set_resource_status_connecting/1,
%% stop the instance
Expand All @@ -91,7 +92,9 @@
has_allocated_resources/1,
get_allocated_resources/1,
get_allocated_resources_list/1,
forget_allocated_resources/1
forget_allocated_resources/1,
%% Get channel config from resource
call_get_channel_config/3
]).

%% Direct calls to the callback module
Expand All @@ -103,6 +106,8 @@
call_start/3,
%% verify if the resource is working normally
call_health_check/3,
%% verify if the resource channel is working normally
call_channel_health_check/4,
%% stop the instance
call_stop/3,
%% get the query mode of the resource
Expand All @@ -112,9 +117,7 @@
%% Remove channel from resource
call_remove_channel/4,
%% Get channels from resource
call_get_channels/2,
%% Get channel config from resource
call_get_channel_config/3
call_get_channels/2
]).

%% list all the instances, id only.
Expand All @@ -137,6 +140,7 @@
-export_type([
query_mode/0,
resource_id/0,
channel_id/0,
resource_data/0,
resource_status/0
]).
Expand All @@ -147,6 +151,7 @@
on_query_async/4,
on_batch_query_async/4,
on_get_status/2,
on_get_channel_status/3,
on_add_channel/4,
on_remove_channel/3,
on_get_channels/1,
Expand Down Expand Up @@ -191,6 +196,10 @@
| {resource_status(), resource_state()}
| {resource_status(), resource_state(), term()}.

-callback on_get_channel_status(resource_id(), channel_id(), resource_state()) ->
resource_status()
| {resource_status(), term()}.

-callback query_mode(Config :: term()) -> query_mode().

%% This callback handles the installation of a specified Bridge V2 resource.
Expand Down Expand Up @@ -398,6 +407,11 @@ stop(ResId) ->
health_check(ResId) ->
emqx_resource_manager:health_check(ResId).

-spec channel_health_check(resource_id(), channel_id()) ->
{ok, resource_status()} | {error, term()}.
channel_health_check(ResId, ChannelId) ->
emqx_resource_manager:channel_health_check(ResId, ChannelId).

set_resource_status_connecting(ResId) ->
emqx_resource_manager:set_resource_status_connecting(ResId).

Expand Down Expand Up @@ -472,6 +486,14 @@ call_start(ResId, Mod, Config) ->
call_health_check(ResId, Mod, ResourceState) ->
?SAFE_CALL(Mod:on_get_status(ResId, ResourceState)).

-spec call_channel_health_check(resource_id(), channel_id(), module(), resource_state()) ->
resource_status()
| {resource_status()}
| {resource_status(), term()}
| {error, term()}.
call_channel_health_check(ResId, ChannelId, Mod, ResourceState) ->
?SAFE_CALL(Mod:on_get_channel_status(ResId, ChannelId, ResourceState)).

call_add_channel(ResId, Mod, ResourceState, ChannelId, ChannelConfig) ->
%% Check if maybe_install_insert_template is exported
case erlang:function_exported(Mod, on_add_channel, 4) of
Expand Down
72 changes: 70 additions & 2 deletions apps/emqx_resource/src/emqx_resource_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
start/2,
stop/1,
health_check/1,
channel_health_check/2,
add_channel/3,
remove_channel/2
]).
Expand Down Expand Up @@ -275,6 +276,11 @@ list_group(Group) ->
health_check(ResId) ->
safe_call(ResId, health_check, ?T_OPERATION).

-spec channel_health_check(resource_id(), channel_id()) ->
{ok, resource_status()} | {error, term()}.
channel_health_check(ResId, ChannelId) ->
safe_call(ResId, {channel_health_check, ChannelId}, ?T_OPERATION).

add_channel(ResId, ChannelId, Config) ->
%% Use cache to avoid doing inter process communication on every call
Data = read_cache(ResId),
Expand Down Expand Up @@ -372,8 +378,13 @@ handle_event({call, From}, lookup, _State, #data{group = Group} = Data) ->
handle_event({call, From}, health_check, stopped, _Data) ->
Actions = [{reply, From, {error, resource_is_stopped}}],
{keep_state_and_data, Actions};
handle_event({call, From}, {channel_health_check, _}, stopped, _Data) ->
Actions = [{reply, From, {error, resource_is_stopped}}],
{keep_state_and_data, Actions};
handle_event({call, From}, health_check, _State, Data) ->
handle_manually_health_check(From, Data);
handle_event({call, From}, {channel_health_check, ChannelId}, _State, Data) ->
handle_manually_channel_health_check(From, Data, ChannelId);
% State: CONNECTING
handle_event(enter, _OldState, connecting = State, Data) ->
ok = log_state_consistency(State, Data),
Expand Down Expand Up @@ -607,6 +618,14 @@ handle_add_channel(From, Data, ChannelId, ChannelConfig) ->
end.

handle_add_channel_need_insert(From, Data, ChannelId, Data, ChannelConfig) ->
case add_channel_need_insert_update_data(Data, ChannelId, ChannelConfig) of
{ok, NewData} ->
{keep_state, NewData, [{reply, From, ok}]};
{error, _Reason} = Error ->
{keep_state_and_data, [{reply, From, Error}]}
end.

add_channel_need_insert_update_data(Data, ChannelId, ChannelConfig) ->
case
emqx_resource:call_add_channel(
Data#data.id, Data#data.mod, Data#data.state, ChannelId, ChannelConfig
Expand All @@ -620,7 +639,7 @@ handle_add_channel_need_insert(From, Data, ChannelId, Data, ChannelConfig) ->
added_channels = NewAddedChannelsMap
},
update_state(UpdatedData, Data),
{keep_state, UpdatedData, [{reply, From, ok}]};
{ok, UpdatedData};
{error, Reason} = Error ->
%% Log the error as a warning
?SLOG(warning, #{
Expand All @@ -629,7 +648,7 @@ handle_add_channel_need_insert(From, Data, ChannelId, Data, ChannelConfig) ->
channel_id => ChannelId,
reason => Reason
}),
{keep_state_and_data, [{reply, From, Error}]}
Error
end.

handle_remove_channel(From, ChannelId, Data) ->
Expand Down Expand Up @@ -678,6 +697,55 @@ handle_manually_health_check(From, Data) ->
end
).

handle_manually_channel_health_check(From, #data{state = undefined}, _ChannelId) ->
{keep_state_and_data, [{reply, From, {ok, disconnected}}]};
handle_manually_channel_health_check(
From,
#data{added_channels = Channels} = Data,
ChannelId
) when
is_map_key(ChannelId, Channels)
->
{keep_state_and_data, [{reply, From, get_channel_status_channel_added(Data, ChannelId)}]};
handle_manually_channel_health_check(
From,
Data,
ChannelId
) ->
%% add channel
ResId = Data#data.id,
Mod = Data#data.mod,
case emqx_resource:call_get_channel_config(ResId, ChannelId, Mod) of
ChannelConfig when is_map(ChannelConfig) ->
case add_channel_need_insert_update_data(Data, ChannelId, ChannelConfig) of
{ok, UpdatedData} ->
{keep_state, UpdatedData, [
{reply, From, get_channel_status_channel_added(UpdatedData, ChannelId)}
]};
{error, Reason} = Error ->
%% Log the error as a warning
?SLOG(warning, #{
msg => add_channel_failed_when_doing_status_check,
id => ResId,
channel_id => ChannelId,
reason => Reason
}),
{keep_state_and_data, [{reply, From, Error}]}
end;
{error, Reason} = Error ->
%% Log the error as a warning
?SLOG(warning, #{
msg => get_channel_config_failed_when_doing_status_check,
id => ResId,
channel_id => ChannelId,
reason => Reason
}),
{keep_state_and_data, [{reply, From, Error}]}
end.

get_channel_status_channel_added(#data{id = ResId, mod = Mod, state = State}, ChannelId) ->
emqx_resource:call_channel_health_check(ResId, ChannelId, Mod, State).

handle_connecting_health_check(Data) ->
with_health_check(
Data,
Expand Down

0 comments on commit 0953a8a

Please sign in to comment.