Skip to content

Commit

Permalink
Merge pull request #12013 from kjellwinblad/kjell/postgresql_conn_act…
Browse files Browse the repository at this point in the history
…ion_3/EMQX-11155

split pgsql, matrix and timescale into connector action
  • Loading branch information
kjellwinblad committed Nov 25, 2023
2 parents f8f8cf9 + c85004b commit 7161f9d
Show file tree
Hide file tree
Showing 21 changed files with 859 additions and 128 deletions.
5 changes: 4 additions & 1 deletion apps/emqx_bridge/src/emqx_action_info.erl
Expand Up @@ -77,8 +77,11 @@ hard_coded_action_info_modules_ee() ->
emqx_bridge_confluent_producer_action_info,
emqx_bridge_gcp_pubsub_producer_action_info,
emqx_bridge_kafka_action_info,
emqx_bridge_matrix_action_info,
emqx_bridge_mongodb_action_info,
emqx_bridge_syskeeper_action_info
emqx_bridge_pgsql_action_info,
emqx_bridge_syskeeper_action_info,
emqx_bridge_timescale_action_info
].
-else.
hard_coded_action_info_modules_ee() ->
Expand Down
6 changes: 3 additions & 3 deletions apps/emqx_bridge/src/emqx_bridge.erl
Expand Up @@ -240,8 +240,8 @@ send_message(BridgeId, Message) ->
{BridgeV1Type, BridgeName} = emqx_bridge_resource:parse_bridge_id(BridgeId),
case emqx_bridge_v2:is_bridge_v2_type(BridgeV1Type) of
true ->
BridgeV2Type = emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
emqx_bridge_v2:send_message(BridgeV2Type, BridgeName, Message, #{});
ActionType = emqx_action_info:bridge_v1_type_to_action_type(BridgeV1Type),
emqx_bridge_v2:send_message(ActionType, BridgeName, Message, #{});
false ->
ResId = emqx_bridge_resource:resource_id(BridgeV1Type, BridgeName),
send_message(BridgeV1Type, BridgeName, ResId, Message, #{})
Expand Down Expand Up @@ -414,7 +414,7 @@ remove(BridgeType0, BridgeName) ->
}),
case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of
true ->
emqx_bridge_v2:remove(BridgeType, BridgeName);
emqx_bridge_v2:bridge_v1_remove(BridgeType0, BridgeName);
false ->
remove_v1(BridgeType, BridgeName)
end.
Expand Down
84 changes: 66 additions & 18 deletions apps/emqx_bridge/src/emqx_bridge_v2.erl
Expand Up @@ -55,6 +55,7 @@
disable_enable/3,
health_check/2,
send_message/4,
query/4,
start/2,
reset_metrics/2,
create_dry_run/2,
Expand Down Expand Up @@ -116,7 +117,9 @@
bridge_v1_enable_disable/3,
bridge_v1_restart/2,
bridge_v1_stop/2,
bridge_v1_start/2
bridge_v1_start/2,
%% For test cases only
bridge_v1_remove/2
]).

%%====================================================================
Expand Down Expand Up @@ -547,25 +550,25 @@ get_query_mode(BridgeV2Type, Config) ->
ResourceType = emqx_connector_resource:connector_to_resource_type(ConnectorType),
emqx_resource:query_mode(ResourceType, Config, CreationOpts).

-spec send_message(bridge_v2_type(), bridge_v2_name(), Message :: term(), QueryOpts :: map()) ->
-spec query(bridge_v2_type(), bridge_v2_name(), Message :: term(), QueryOpts :: map()) ->
term() | {error, term()}.
send_message(BridgeType, BridgeName, Message, QueryOpts0) ->
query(BridgeType, BridgeName, Message, QueryOpts0) ->
case lookup_conf(BridgeType, BridgeName) of
#{enable := true} = Config0 ->
Config = combine_connector_and_bridge_v2_config(BridgeType, BridgeName, Config0),
do_send_msg_with_enabled_config(BridgeType, BridgeName, Message, QueryOpts0, Config);
do_query_with_enabled_config(BridgeType, BridgeName, Message, QueryOpts0, Config);
#{enable := false} ->
{error, bridge_stopped};
_Error ->
{error, bridge_not_found}
end.

do_send_msg_with_enabled_config(
do_query_with_enabled_config(
_BridgeType, _BridgeName, _Message, _QueryOpts0, {error, Reason} = Error
) ->
?SLOG(error, Reason),
Error;
do_send_msg_with_enabled_config(
do_query_with_enabled_config(
BridgeType, BridgeName, Message, QueryOpts0, Config
) ->
QueryMode = get_query_mode(BridgeType, Config),
Expand All @@ -579,7 +582,17 @@ do_send_msg_with_enabled_config(
}
),
BridgeV2Id = id(BridgeType, BridgeName),
emqx_resource:query(BridgeV2Id, {BridgeV2Id, Message}, QueryOpts).
case Message of
{send_message, Msg} ->
emqx_resource:query(BridgeV2Id, {BridgeV2Id, Msg}, QueryOpts);
Msg ->
emqx_resource:query(BridgeV2Id, Msg, QueryOpts)
end.

-spec send_message(bridge_v2_type(), bridge_v2_name(), Message :: term(), QueryOpts :: map()) ->
term() | {error, term()}.
send_message(BridgeType, BridgeName, Message, QueryOpts0) ->
query(BridgeType, BridgeName, {send_message, Message}, QueryOpts0).

-spec health_check(BridgeType :: term(), BridgeName :: term()) ->
#{status := emqx_resource:resource_status(), error := term()} | {error, Reason :: term()}.
Expand Down Expand Up @@ -785,17 +798,24 @@ parse_id(Id) ->
end.

get_channels_for_connector(ConnectorId) ->
{ConnectorType, ConnectorName} = emqx_connector_resource:parse_connector_id(ConnectorId),
RootConf = maps:keys(emqx:get_config([?ROOT_KEY], #{})),
RelevantBridgeV2Types = [
Type
|| Type <- RootConf,
connector_type(Type) =:= ConnectorType
],
lists:flatten([
get_channels_for_connector(ConnectorName, BridgeV2Type)
|| BridgeV2Type <- RelevantBridgeV2Types
]).
try emqx_connector_resource:parse_connector_id(ConnectorId) of
{ConnectorType, ConnectorName} ->
RootConf = maps:keys(emqx:get_config([?ROOT_KEY], #{})),
RelevantBridgeV2Types = [
Type
|| Type <- RootConf,
connector_type(Type) =:= ConnectorType
],
lists:flatten([
get_channels_for_connector(ConnectorName, BridgeV2Type)
|| BridgeV2Type <- RelevantBridgeV2Types
])
catch
_:_ ->
%% ConnectorId is not a valid connector id so we assume the connector
%% has no channels (e.g. it is a a connector for authn or authz)
[]
end.

get_channels_for_connector(ConnectorName, BridgeV2Type) ->
BridgeV2s = emqx:get_config([?ROOT_KEY, BridgeV2Type], #{}),
Expand Down Expand Up @@ -1325,6 +1345,34 @@ bridge_v1_create_dry_run(BridgeType, RawConfig0) ->
} = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf, PreviousRawConf),
create_dry_run_helper(BridgeV2Type, ConnectorRawConf, BridgeV2RawConf).

%% Only called by test cases (may create broken references)
bridge_v1_remove(BridgeV1Type, BridgeName) ->
ActionType = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
bridge_v1_remove(
ActionType,
BridgeName,
lookup_conf(ActionType, BridgeName)
).

bridge_v1_remove(
ActionType,
Name,
#{connector := ConnectorName}
) ->
case remove(ActionType, Name) of
ok ->
ConnectorType = connector_type(ActionType),
emqx_connector:remove(ConnectorType, ConnectorName);
Error ->
Error
end;
bridge_v1_remove(
_ActionType,
_Name,
Error
) ->
Error.

bridge_v1_check_deps_and_remove(BridgeV1Type, BridgeName, RemoveDeps) ->
BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
bridge_v1_check_deps_and_remove(
Expand Down
4 changes: 2 additions & 2 deletions apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src
@@ -1,13 +1,13 @@
{application, emqx_bridge_matrix, [
{description, "EMQX Enterprise MatrixDB Bridge"},
{vsn, "0.1.2"},
{vsn, "0.1.3"},
{registered, []},
{applications, [
kernel,
stdlib,
emqx_resource
]},
{env, []},
{env, [{emqx_action_info_modules, [emqx_bridge_matrix_action_info]}]},
{modules, []},
{links, []}
]}.
57 changes: 56 additions & 1 deletion apps/emqx_bridge_matrix/src/emqx_bridge_matrix.erl
Expand Up @@ -3,6 +3,8 @@
%%--------------------------------------------------------------------
-module(emqx_bridge_matrix).

-include_lib("hocon/include/hoconsc.hrl").

-export([
conn_bridge_examples/1
]).
Expand All @@ -14,6 +16,12 @@
desc/1
]).

%% Examples
-export([
bridge_v2_examples/1,
connector_examples/1
]).

%% -------------------------------------------------------------------------------------------------
%% api

Expand All @@ -22,7 +30,7 @@ conn_bridge_examples(Method) ->
#{
<<"matrix">> => #{
summary => <<"Matrix Bridge">>,
value => emqx_bridge_pgsql:values(Method, matrix)
value => emqx_bridge_pgsql:values_conn_bridge_examples(Method, matrix)
}
}
].
Expand All @@ -35,8 +43,55 @@ roots() -> [].

fields("post") ->
emqx_bridge_pgsql:fields("post", matrix);
fields("config_connector") ->
emqx_bridge_pgsql:fields("config_connector");
fields(action) ->
{matrix,
hoconsc:mk(
hoconsc:map(name, hoconsc:ref(emqx_bridge_pgsql, pgsql_action)),
#{
desc => <<"Matrix Action Config">>,
required => false
}
)};
fields("put_bridge_v2") ->
emqx_bridge_pgsql:fields(pgsql_action);
fields("get_bridge_v2") ->
emqx_bridge_pgsql:fields(pgsql_action);
fields("post_bridge_v2") ->
emqx_bridge_pgsql:fields(pgsql_action);
fields("put_connector") ->
emqx_bridge_pgsql:fields("config_connector");
fields("get_connector") ->
emqx_bridge_pgsql:fields("config_connector");
fields("post_connector") ->
emqx_bridge_pgsql:fields("config_connector");
fields(Method) ->
emqx_bridge_pgsql:fields(Method).

desc("config_connector") ->
?DESC(emqx_postgresql_connector_schema, "config_connector");
desc(_) ->
undefined.

%% Examples

connector_examples(Method) ->
[
#{
<<"matrix">> => #{
summary => <<"Matrix Connector">>,
value => emqx_postgresql_connector_schema:values({Method, <<"matrix">>})
}
}
].

bridge_v2_examples(Method) ->
[
#{
<<"matrix">> => #{
summary => <<"Matrix Action">>,
value => emqx_bridge_pgsql:values({Method, matrix})
}
}
].
22 changes: 22 additions & 0 deletions apps/emqx_bridge_matrix/src/emqx_bridge_matrix_action_info.erl
@@ -0,0 +1,22 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------

-module(emqx_bridge_matrix_action_info).

-behaviour(emqx_action_info).

-export([
bridge_v1_type_name/0,
action_type_name/0,
connector_type_name/0,
schema_module/0
]).

bridge_v1_type_name() -> matrix.

action_type_name() -> matrix.

connector_type_name() -> matrix.

schema_module() -> emqx_bridge_matrix.
2 changes: 1 addition & 1 deletion apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src
Expand Up @@ -8,7 +8,7 @@
emqx_resource,
emqx_postgresql
]},
{env, []},
{env, [{emqx_action_info_modules, [emqx_bridge_pgsql_action_info]}]},
{modules, []},
{links, []}
]}.

0 comments on commit 7161f9d

Please sign in to comment.