From d2901afd1b4f1734dfdc9479c7b3a6f682e66ad9 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Thu, 16 Nov 2023 09:41:56 +0100 Subject: [PATCH 1/4] fix(emqx_bridge_kafka): match example in api schema --- .../src/emqx_bridge_kafka.erl | 83 ++++++++++--------- 1 file changed, 42 insertions(+), 41 deletions(-) diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 5b3e3ca015..93515b5db7 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -112,16 +112,15 @@ values({put, connector}) -> values({put, KafkaType}) -> maps:merge(values(common_config), values(KafkaType)); values(bridge_v2_producer) -> - maps:merge( - #{ - enable => true, - connector => <<"my_kafka_producer_connector">>, - resource_opts => #{ - health_check_interval => "32s" - } - }, - values(producer) - ); + #{ + enable => true, + connector => <<"my_kafka_producer_connector">>, + parameters => values(producer_values), + local_topic => <<"mqtt/local/topic">>, + resource_opts => #{ + health_check_interval => "32s" + } + }; values(common_config) -> #{ authentication => #{ @@ -143,39 +142,41 @@ values(common_config) -> }; values(producer) -> #{ - kafka => #{ - topic => <<"kafka-topic">>, - message => #{ - key => <<"${.clientid}">>, - value => <<"${.}">>, - timestamp => <<"${.timestamp}">> + kafka => values(producer_values), + local_topic => <<"mqtt/local/topic">> + }; +values(producer_values) -> + #{ + topic => <<"kafka-topic">>, + message => #{ + key => <<"${.clientid}">>, + value => <<"${.}">>, + timestamp => <<"${.timestamp}">> + }, + max_batch_bytes => <<"896KB">>, + compression => <<"no_compression">>, + partition_strategy => <<"random">>, + required_acks => <<"all_isr">>, + partition_count_refresh_interval => <<"60s">>, + kafka_headers => <<"${pub_props}">>, + kafka_ext_headers => [ + #{ + kafka_ext_header_key => <<"clientid">>, + kafka_ext_header_value => <<"${clientid}">> }, - max_batch_bytes => <<"896KB">>, - compression => <<"no_compression">>, - partition_strategy => <<"random">>, - required_acks => <<"all_isr">>, - partition_count_refresh_interval => <<"60s">>, - kafka_headers => <<"${pub_props}">>, - kafka_ext_headers => [ - #{ - kafka_ext_header_key => <<"clientid">>, - kafka_ext_header_value => <<"${clientid}">> - }, - #{ - kafka_ext_header_key => <<"topic">>, - kafka_ext_header_value => <<"${topic}">> - } - ], - kafka_header_value_encode_mode => none, - max_inflight => 10, - buffer => #{ - mode => <<"hybrid">>, - per_partition_limit => <<"2GB">>, - segment_bytes => <<"100MB">>, - memory_overload_protection => true + #{ + kafka_ext_header_key => <<"topic">>, + kafka_ext_header_value => <<"${topic}">> } - }, - local_topic => <<"mqtt/local/topic">> + ], + kafka_header_value_encode_mode => none, + max_inflight => 10, + buffer => #{ + mode => <<"hybrid">>, + per_partition_limit => <<"2GB">>, + segment_bytes => <<"100MB">>, + memory_overload_protection => true + } }; values(consumer) -> #{ From 9ebbc9bbea763db9021904630ee2bcca88311f7c Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Thu, 16 Nov 2023 13:57:51 +0100 Subject: [PATCH 2/4] refactor(emqx_bridge): use more simplistic function to validate connector --- apps/emqx_bridge/src/emqx_bridge_v2.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 6f296c63c4..ba557d0b1c 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -410,10 +410,10 @@ uninstall_bridge_v2( CreationOpts = emqx_resource:fetch_creation_opts(Config), ok = emqx_resource_buffer_worker_sup:stop_workers(BridgeV2Id, CreationOpts), ok = emqx_resource:clear_metrics(BridgeV2Id), - case combine_connector_and_bridge_v2_config(BridgeV2Type, BridgeName, Config) of + case validate_referenced_connectors(BridgeV2Type, ConnectorName, BridgeName) of {error, _} -> ok; - _CombinedConfig -> + ok -> %% Deinstall from connector ConnectorId = emqx_connector_resource:resource_id( connector_type(BridgeV2Type), ConnectorName From 4e077c951b10bf363bc6934386b2e91f100ebbc3 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Wed, 15 Nov 2023 15:49:56 +0100 Subject: [PATCH 3/4] feat(emqx_bridge_mongodb): port mongodb to shared connector and actions --- apps/emqx_bridge/src/emqx_action_info.erl | 12 +- apps/emqx_bridge/src/emqx_bridge.erl | 18 +- apps/emqx_bridge/src/emqx_bridge_lib.erl | 14 +- apps/emqx_bridge/src/emqx_bridge_v2.erl | 10 +- .../test/emqx_bridge_v2_testlib.erl | 34 +-- .../src/emqx_bridge_mongodb.app.src | 2 +- .../src/emqx_bridge_mongodb.erl | 211 +++++++++++++--- .../src/emqx_bridge_mongodb_action_info.erl | 95 +++++++ .../src/emqx_bridge_mongodb_connector.erl | 113 ++++++--- .../test/emqx_bridge_mongodb_SUITE.erl | 42 +++- .../test/emqx_bridge_v2_mongodb_SUITE.erl | 232 ++++++++++++++++++ .../src/schema/emqx_connector_ee_schema.erl | 16 +- .../src/schema/emqx_connector_schema.erl | 12 +- apps/emqx_mongodb/src/emqx_mongodb.erl | 82 ++++--- rel/i18n/emqx_bridge_mongodb.hocon | 21 ++ 15 files changed, 763 insertions(+), 151 deletions(-) create mode 100644 apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_action_info.erl create mode 100644 apps/emqx_bridge_mongodb/test/emqx_bridge_v2_mongodb_SUITE.erl diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index 12988b163d..129142f246 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -77,6 +77,7 @@ hard_coded_action_info_modules_ee() -> emqx_bridge_confluent_producer_action_info, emqx_bridge_gcp_pubsub_producer_action_info, emqx_bridge_kafka_action_info, + emqx_bridge_mongodb_action_info, emqx_bridge_syskeeper_action_info ]. -else. @@ -116,14 +117,17 @@ bridge_v1_type_to_action_type(Type) -> action_type_to_bridge_v1_type(Bin, Conf) when is_binary(Bin) -> action_type_to_bridge_v1_type(binary_to_existing_atom(Bin), Conf); -action_type_to_bridge_v1_type(ActionType, Conf) -> +action_type_to_bridge_v1_type(ActionType, ActionConf) -> ActionInfoMap = info_map(), ActionTypeToBridgeV1Type = maps:get(action_type_to_bridge_v1_type, ActionInfoMap), case maps:get(ActionType, ActionTypeToBridgeV1Type, undefined) of undefined -> ActionType; BridgeV1TypeFun when is_function(BridgeV1TypeFun) -> - BridgeV1TypeFun(get_confs(ActionType, Conf)); + case get_confs(ActionType, ActionConf) of + {ConnectorConfig, ActionConfig} -> BridgeV1TypeFun({ConnectorConfig, ActionConfig}); + undefined -> ActionType + end; BridgeV1Type -> BridgeV1Type end. @@ -131,7 +135,9 @@ action_type_to_bridge_v1_type(ActionType, Conf) -> get_confs(ActionType, #{<<"connector">> := ConnectorName} = ActionConfig) -> ConnectorType = action_type_to_connector_type(ActionType), ConnectorConfig = emqx_conf:get_raw([connectors, ConnectorType, ConnectorName]), - {ActionConfig, ConnectorConfig}. + {ConnectorConfig, ActionConfig}; +get_confs(_, _) -> + undefined. %% This function should return true for all inputs that are bridge V1 types for %% bridges that have been refactored to bridge V2s, and for all all bridge V2 diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 0e116589bb..f557210ed3 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -237,9 +237,15 @@ send_to_matched_egress_bridges_loop(Topic, Msg, [Id | Ids]) -> send_to_matched_egress_bridges_loop(Topic, Msg, Ids). send_message(BridgeId, Message) -> - {BridgeType, BridgeName} = emqx_bridge_resource:parse_bridge_id(BridgeId), - ResId = emqx_bridge_resource:resource_id(BridgeType, BridgeName), - send_message(BridgeType, BridgeName, ResId, Message, #{}). + {BridgeV1Type, BridgeName} = emqx_bridge_resource:parse_bridge_id(BridgeId), + case emqx_bridge_v2:is_bridge_v2_type(BridgeV1Type) of + true -> + BridgeV2Type = emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(BridgeV1Type), + emqx_bridge_v2:send_message(BridgeV2Type, BridgeName, Message, #{}); + false -> + ResId = emqx_bridge_resource:resource_id(BridgeV1Type, BridgeName), + send_message(BridgeV1Type, BridgeName, ResId, Message, #{}) + end. send_message(BridgeType, BridgeName, ResId, Message, QueryOpts0) -> case emqx:get_config([?ROOT_KEY, BridgeType, BridgeName], not_found) of @@ -377,8 +383,8 @@ disable_enable(Action, BridgeType0, BridgeName) when ) end. -create(BridgeType0, BridgeName, RawConf) -> - BridgeType = upgrade_type(BridgeType0), +create(BridgeV1Type, BridgeName, RawConf) -> + BridgeType = upgrade_type(BridgeV1Type), ?SLOG(debug, #{ bridge_action => create, bridge_type => BridgeType, @@ -387,7 +393,7 @@ create(BridgeType0, BridgeName, RawConf) -> }), case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of true -> - emqx_bridge_v2:bridge_v1_split_config_and_create(BridgeType, BridgeName, RawConf); + emqx_bridge_v2:bridge_v1_split_config_and_create(BridgeV1Type, BridgeName, RawConf); false -> emqx_conf:update( emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], diff --git a/apps/emqx_bridge/src/emqx_bridge_lib.erl b/apps/emqx_bridge/src/emqx_bridge_lib.erl index 04b3378ce0..ed8e918fac 100644 --- a/apps/emqx_bridge/src/emqx_bridge_lib.erl +++ b/apps/emqx_bridge/src/emqx_bridge_lib.erl @@ -78,6 +78,14 @@ external_ids(Type, Name) -> [external_id(Type0, Name), external_id(Type, Name)] end. +get_conf(BridgeType, BridgeName) -> + case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of + true -> + emqx_conf:get_raw([actions, BridgeType, BridgeName]); + false -> + undefined + end. + %% Creates the external id for the bridge_v2 that is used by the rule actions %% to refer to the bridge_v2 external_id(BridgeType, BridgeName) -> @@ -87,9 +95,3 @@ external_id(BridgeType, BridgeName) -> bin(Bin) when is_binary(Bin) -> Bin; bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). - -get_conf(BridgeType, BridgeName) -> - case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of - true -> emqx_conf:get_raw([actions, BridgeType, BridgeName]); - false -> emqx_conf:get_raw([bridges, BridgeType, BridgeName]) - end. diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index ba557d0b1c..54ccf1b243 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -1053,8 +1053,8 @@ bridge_v1_is_valid(BridgeV1Type, BridgeName) -> bridge_v1_type_to_bridge_v2_type(Type) -> emqx_action_info:bridge_v1_type_to_action_type(Type). -bridge_v2_type_to_bridge_v1_type(Type, Conf) -> - emqx_action_info:action_type_to_bridge_v1_type(Type, Conf). +bridge_v2_type_to_bridge_v1_type(ActionType, ActionConf) -> + emqx_action_info:action_type_to_bridge_v1_type(ActionType, ActionConf). is_bridge_v2_type(Type) -> emqx_action_info:is_action_type(Type). @@ -1065,8 +1065,8 @@ bridge_v1_list_and_transform() -> bridge_v1_lookup_and_transform(ActionType, Name) -> case lookup(ActionType, Name) of - {ok, #{raw_config := #{<<"connector">> := ConnectorName}} = ActionConfig} -> - BridgeV1Type = ?MODULE:bridge_v2_type_to_bridge_v1_type(ActionType, ActionConfig), + {ok, #{raw_config := #{<<"connector">> := ConnectorName} = RawConfig} = ActionConfig} -> + BridgeV1Type = ?MODULE:bridge_v2_type_to_bridge_v1_type(ActionType, RawConfig), case ?MODULE:bridge_v1_is_valid(BridgeV1Type, Name) of true -> ConnectorType = connector_type(ActionType), @@ -1244,6 +1244,8 @@ split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf, PreviousR #{bin(BridgeV2Type) => #{bin(BridgeName) => PreviousRawConf}}, PreviousRawConf =/= undefined ), + %% [FIXME] this will loop through all connector types, instead pass the + %% connector type and just do it for that one Output = emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2( FakeGlobalConfig ), diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl index 5cb9b043f3..1ed0eb31b3 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl @@ -552,18 +552,24 @@ t_on_get_status(Config, Opts) -> _Attempts = 20, ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) ), - emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> - ct:sleep(500), - ?retry( - _Interval0 = 200, - _Attempts0 = 10, - ?assertEqual({ok, FailureStatus}, emqx_resource_manager:health_check(ResourceId)) - ) - end), - %% Check that it recovers itself. - ?retry( - _Sleep = 1_000, - _Attempts = 20, - ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) - ), + case ProxyHost of + undefined -> + ok; + _ -> + emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> + ?retry( + _Interval0 = 100, + _Attempts0 = 20, + ?assertEqual( + {ok, FailureStatus}, emqx_resource_manager:health_check(ResourceId) + ) + ) + end), + %% Check that it recovers itself. + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ) + end, ok. diff --git a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src index 5545ac967e..f361d52769 100644 --- a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src +++ b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src @@ -9,7 +9,7 @@ emqx_resource, emqx_mongodb ]}, - {env, []}, + {env, [{emqx_action_info_modules, [emqx_bridge_mongodb_action_info]}]}, {modules, []}, {links, []} ]}. diff --git a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl index b108f654f9..ac7aa6280d 100644 --- a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl +++ b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl @@ -12,7 +12,9 @@ %% emqx_bridge_enterprise "callbacks" -export([ - conn_bridge_examples/1 + bridge_v2_examples/1, + conn_bridge_examples/1, + connector_examples/1 ]). %% hocon_schema callbacks @@ -27,10 +29,13 @@ %% hocon_schema API %%================================================================================================= +%% [TODO] Namespace should be different depending on whether this is used for a +%% connector, an action or a legacy bridge type. namespace() -> "bridge_mongodb". roots() -> + %% ??? []. fields("config") -> @@ -44,6 +49,18 @@ fields("config") -> #{required => true, desc => ?DESC(emqx_resource_schema, "creation_opts")} )} ]; +fields("config_connector") -> + emqx_connector_schema:common_fields() ++ + [ + {parameters, + mk( + hoconsc:union([ + ref(emqx_mongodb, "connector_" ++ T) + || T <- ["single", "sharded", "rs"] + ]), + #{required => true, desc => ?DESC("mongodb_parameters")} + )} + ] ++ emqx_mongodb:fields(mongodb); fields("creation_opts") -> %% so far, mongodb connector does not support batching %% but we cannot delete this field due to compatibility reasons @@ -55,12 +72,47 @@ fields("creation_opts") -> desc => ?DESC("batch_size") }} ]); +fields(action) -> + {mongodb, + mk( + hoconsc:map(name, ref(?MODULE, mongodb_action)), + #{desc => <<"MongoDB Action Config">>, required => false} + )}; +fields(mongodb_action) -> + emqx_bridge_v2_schema:make_producer_action_schema( + mk(ref(?MODULE, action_parameters), #{ + required => true, desc => ?DESC(action_parameters) + }) + ); +fields(action_parameters) -> + [ + {collection, mk(binary(), #{desc => ?DESC("collection"), default => <<"mqtt">>})}, + {payload_template, mk(binary(), #{required => false, desc => ?DESC("payload_template")})} + ]; +fields(resource_opts) -> + fields("creation_opts"); fields(mongodb_rs) -> emqx_mongodb:fields(rs) ++ fields("config"); fields(mongodb_sharded) -> emqx_mongodb:fields(sharded) ++ fields("config"); fields(mongodb_single) -> emqx_mongodb:fields(single) ++ fields("config"); +fields("post_connector") -> + type_and_name_fields(mongodb) ++ + fields("config_connector"); +fields("put_connector") -> + fields("config_connector"); +fields("get_connector") -> + emqx_bridge_schema:status_fields() ++ + fields("post_connector"); +fields("get_bridge_v2") -> + emqx_bridge_schema:status_fields() ++ + fields("post_bridge_v2"); +fields("post_bridge_v2") -> + type_and_name_fields(mongodb) ++ + fields(mongodb_action); +fields("put_bridge_v2") -> + fields(mongodb_action); fields("post_rs") -> fields(mongodb_rs) ++ type_and_name_fields(mongodb_rs); fields("post_sharded") -> @@ -86,6 +138,16 @@ fields("get_single") -> fields(mongodb_single) ++ type_and_name_fields(mongodb_single). +bridge_v2_examples(Method) -> + [ + #{ + <<"mongodb">> => #{ + summary => <<"MongoDB Action">>, + value => action_values(Method) + } + } + ]. + conn_bridge_examples(Method) -> [ #{ @@ -108,16 +170,46 @@ conn_bridge_examples(Method) -> } ]. +connector_examples(Method) -> + [ + #{ + <<"mongodb_rs">> => #{ + summary => <<"MongoDB Replica Set Connector">>, + value => connector_values(mongodb_rs, Method) + } + }, + #{ + <<"mongodb_sharded">> => #{ + summary => <<"MongoDB Sharded Connector">>, + value => connector_values(mongodb_sharded, Method) + } + }, + #{ + <<"mongodb_single">> => #{ + summary => <<"MongoDB Standalone Connector">>, + value => connector_values(mongodb_single, Method) + } + } + ]. + +desc("config_connector") -> + ?DESC("desc_config"); desc("config") -> ?DESC("desc_config"); desc("creation_opts") -> ?DESC(emqx_resource_schema, "creation_opts"); +desc(resource_opts) -> + ?DESC(emqx_resource_schema, "resource_opts"); desc(mongodb_rs) -> ?DESC(mongodb_rs_conf); desc(mongodb_sharded) -> ?DESC(mongodb_sharded_conf); desc(mongodb_single) -> ?DESC(mongodb_single_conf); +desc(mongodb_action) -> + ?DESC(mongodb_action); +desc(action_parameters) -> + ?DESC(action_parameters); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> ["Configuration for MongoDB using `", string:to_upper(Method), "` method."]; desc(_) -> @@ -133,49 +225,102 @@ type_and_name_fields(MongoType) -> {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})} ]. -values(mongodb_rs = MongoType, Method) -> - TypeOpts = #{ +connector_values(Type, Method) -> + lists:foldl( + fun(M1, M2) -> + maps:merge(M1, M2) + end, + #{ + description => <<"My example connector">>, + parameters => mongo_type_opts(Type) + }, + [ + common_values(), + method_values(mongodb, Method) + ] + ). + +action_values(Method) -> + maps:merge( + method_values(mongodb, Method), + #{ + description => <<"My example action">>, + enable => true, + connector => <<"my_mongodb_connector">>, + parameters => #{ + collection => <<"mycol">> + } + } + ). + +values(MongoType, Method) -> + maps:merge( + mongo_type_opts(MongoType), + bridge_values(MongoType, Method) + ). + +mongo_type_opts(mongodb_rs) -> + #{ + mongo_type => <<"rs">>, servers => <<"localhost:27017, localhost:27018">>, w_mode => <<"safe">>, r_mode => <<"safe">>, replica_set_name => <<"rs">> - }, - values(common, MongoType, Method, TypeOpts); -values(mongodb_sharded = MongoType, Method) -> - TypeOpts = #{ + }; +mongo_type_opts(mongodb_sharded) -> + #{ + mongo_type => <<"sharded">>, servers => <<"localhost:27017, localhost:27018">>, w_mode => <<"safe">> - }, - values(common, MongoType, Method, TypeOpts); -values(mongodb_single = MongoType, Method) -> - TypeOpts = #{ + }; +mongo_type_opts(mongodb_single) -> + #{ + mongo_type => <<"single">>, server => <<"localhost:27017">>, w_mode => <<"safe">> - }, - values(common, MongoType, Method, TypeOpts). - -values(common, MongoType, Method, TypeOpts) -> - MongoTypeBin = atom_to_binary(MongoType), - Common = #{ - name => <>, - type => MongoTypeBin, + }. + +bridge_values(Type, _Method) -> + %% [FIXME] _Method makes a difference since PUT doesn't allow name and type + %% for connectors. + TypeBin = atom_to_binary(Type), + maps:merge( + #{ + name => <>, + type => TypeBin, + collection => <<"mycol">> + }, + common_values() + ). + +common_values() -> + #{ enable => true, - collection => <<"mycol">>, database => <<"mqtt">>, srv_record => false, pool_size => 8, username => <<"myuser">>, password => <<"******">> - }, - MethodVals = method_values(MongoType, Method), - Vals0 = maps:merge(MethodVals, Common), - maps:merge(Vals0, TypeOpts). - -method_values(MongoType, _) -> - ConnectorType = - case MongoType of - mongodb_rs -> <<"rs">>; - mongodb_sharded -> <<"sharded">>; - mongodb_single -> <<"single">> - end, - #{mongo_type => ConnectorType}. + }. + +method_values(Type, post) -> + TypeBin = atom_to_binary(Type), + #{ + name => <>, + type => TypeBin + }; +method_values(Type, get) -> + maps:merge( + method_values(Type, post), + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ] + } + ); +method_values(_Type, put) -> + #{}. diff --git a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_action_info.erl b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_action_info.erl new file mode 100644 index 0000000000..8bbe5ff3a3 --- /dev/null +++ b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_action_info.erl @@ -0,0 +1,95 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_mongodb_action_info). + +-behaviour(emqx_action_info). + +%% behaviour callbacks +-export([ + bridge_v1_config_to_action_config/2, + bridge_v1_config_to_connector_config/1, + connector_action_config_to_bridge_v1_config/2, + action_type_name/0, + bridge_v1_type_name/0, + connector_type_name/0, + schema_module/0 +]). + +%% dynamic callback +-export([ + bridge_v1_type_name_fun/1 +]). + +-import(emqx_utils_conv, [bin/1]). + +-define(SCHEMA_MODULE, emqx_bridge_mongodb). + +connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) -> + fix_v1_type( + maps:merge( + maps:without( + [<<"connector">>], + map_unindent(<<"parameters">>, ActionConfig) + ), + map_unindent(<<"parameters">>, ConnectorConfig) + ) + ). + +fix_v1_type(#{<<"mongo_type">> := MongoType} = Conf) -> + Conf#{<<"type">> => v1_type(MongoType)}. + +bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) -> + ActionTopLevelKeys = schema_keys(mongodb_action), + ActionParametersKeys = schema_keys(action_parameters), + ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys, + ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config), + ActionConfig#{<<"connector">> => ConnectorName}. + +bridge_v1_config_to_connector_config(BridgeV1Config) -> + ActionTopLevelKeys = schema_keys(mongodb_action), + ActionParametersKeys = schema_keys(action_parameters), + ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys, + ConnectorTopLevelKeys = schema_keys("config_connector"), + ConnectorKeys = maps:keys(BridgeV1Config) -- (ActionKeys -- ConnectorTopLevelKeys), + ConnectorParametersKeys = ConnectorKeys -- ConnectorTopLevelKeys, + make_config_map(ConnectorKeys, ConnectorParametersKeys, BridgeV1Config). + +make_config_map(PickKeys, IndentKeys, Config) -> + Conf0 = maps:with(PickKeys, Config), + map_indent(<<"parameters">>, IndentKeys, Conf0). + +bridge_v1_type_name() -> + {fun ?MODULE:bridge_v1_type_name_fun/1, bridge_v1_type_names()}. + +action_type_name() -> mongodb. + +connector_type_name() -> mongodb. + +schema_module() -> ?SCHEMA_MODULE. + +bridge_v1_type_names() -> [mongodb_rs, mongodb_sharded, mongodb_single]. + +bridge_v1_type_name_fun({#{<<"parameters">> := #{<<"mongo_type">> := MongoType}}, _}) -> + v1_type(MongoType). + +v1_type(<<"rs">>) -> mongodb_rs; +v1_type(<<"sharded">>) -> mongodb_sharded; +v1_type(<<"single">>) -> mongodb_single. + +map_unindent(Key, Map) -> + maps:merge( + maps:get(Key, Map), + maps:remove(Key, Map) + ). + +map_indent(IndentKey, PickKeys, Map) -> + maps:put( + IndentKey, + maps:with(PickKeys, Map), + maps:without(PickKeys, Map) + ). + +schema_keys(Name) -> + [bin(Key) || Key <- proplists:get_keys(?SCHEMA_MODULE:fields(Name))]. diff --git a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl index 741db95509..d0ea93ebc6 100644 --- a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl +++ b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl @@ -6,16 +6,19 @@ -behaviour(emqx_resource). --include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). %% `emqx_resource' API -export([ + on_remove_channel/3, callback_mode/0, - on_start/2, - on_stop/2, + on_add_channel/4, + on_get_channel_status/3, + on_get_channels/1, + on_get_status/2, on_query/3, - on_get_status/2 + on_start/2, + on_stop/2 ]). %%======================================================================================== @@ -24,44 +27,94 @@ callback_mode() -> emqx_mongodb:callback_mode(). -on_start(InstanceId, Config) -> - case emqx_mongodb:on_start(InstanceId, Config) of - {ok, ConnectorState} -> - PayloadTemplate0 = maps:get(payload_template, Config, undefined), - PayloadTemplate = preprocess_template(PayloadTemplate0), - CollectionTemplateSource = maps:get(collection, Config), - CollectionTemplate = preprocess_template(CollectionTemplateSource), - State = #{ - payload_template => PayloadTemplate, - collection_template => CollectionTemplate, - connector_state => ConnectorState - }, - {ok, State}; - Error -> - Error +on_add_channel( + _InstanceId, + #{channels := Channels} = OldState, + ChannelId, + #{parameters := Parameters} = ChannelConfig0 +) -> + PayloadTemplate0 = maps:get(payload_template, Parameters, undefined), + PayloadTemplate = preprocess_template(PayloadTemplate0), + CollectionTemplateSource = maps:get(collection, Parameters), + CollectionTemplate = preprocess_template(CollectionTemplateSource), + ChannelConfig = maps:merge( + Parameters, + ChannelConfig0#{ + payload_template => PayloadTemplate, + collection_template => CollectionTemplate + } + ), + NewState = OldState#{channels => maps:put(ChannelId, ChannelConfig, Channels)}, + {ok, NewState}. + +on_get_channel_status(InstanceId, _ChannelId, State) -> + case on_get_status(InstanceId, State) of + connected -> + connected; + _ -> + connecting end. -on_stop(InstanceId, _State = #{connector_state := ConnectorState}) -> - emqx_mongodb:on_stop(InstanceId, ConnectorState). +on_get_channels(InstanceId) -> + emqx_bridge_v2:get_channels_for_connector(InstanceId). -on_query(InstanceId, {send_message, Message0}, State) -> +on_get_status(InstanceId, _State = #{connector_state := ConnectorState}) -> + emqx_mongodb:on_get_status(InstanceId, ConnectorState). + +on_query(InstanceId, {Channel, Message0}, #{channels := Channels, connector_state := ConnectorState}) -> #{ payload_template := PayloadTemplate, - collection_template := CollectionTemplate, - connector_state := ConnectorState - } = State, - NewConnectorState = ConnectorState#{ + collection_template := CollectionTemplate + } = ChannelState0 = maps:get(Channel, Channels), + ChannelState = ChannelState0#{ collection => emqx_placeholder:proc_tmpl(CollectionTemplate, Message0) }, Message = render_message(PayloadTemplate, Message0), - Res = emqx_mongodb:on_query(InstanceId, {send_message, Message}, NewConnectorState), - ?tp(mongo_bridge_connector_on_query_return, #{result => Res}), + Res = emqx_mongodb:on_query( + InstanceId, + {Channel, Message}, + maps:merge(ConnectorState, ChannelState) + ), + ?tp(mongo_bridge_connector_on_query_return, #{instance_id => InstanceId, result => Res}), Res; on_query(InstanceId, Request, _State = #{connector_state := ConnectorState}) -> emqx_mongodb:on_query(InstanceId, Request, ConnectorState). -on_get_status(InstanceId, _State = #{connector_state := ConnectorState}) -> - emqx_mongodb:on_get_status(InstanceId, ConnectorState). +on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannelId) -> + NewState = State#{channels => maps:remove(ChannelId, Channels)}, + {ok, NewState}. + +on_start(InstanceId, Config0) -> + Config = config_transform(Config0), + case emqx_mongodb:on_start(InstanceId, Config) of + {ok, ConnectorState} -> + State = #{ + connector_state => ConnectorState, + channels => #{} + }, + {ok, State}; + Error -> + Error + end. + +config_transform(#{parameters := #{mongo_type := MongoType} = Parameters} = Config) -> + maps:put( + type, + connector_type(MongoType), + maps:merge( + maps:remove(parameters, Config), + Parameters + ) + ). + +connector_type(rs) -> mongodb_rs; +connector_type(sharded) -> mongodb_sharded; +connector_type(single) -> mongodb_single. + +on_stop(InstanceId, _State = #{connector_state := ConnectorState}) -> + ok = emqx_mongodb:on_stop(InstanceId, ConnectorState), + ?tp(mongodb_stopped, #{instance_id => InstanceId}), + ok. %%======================================================================================== %% Helper fns diff --git a/apps/emqx_bridge_mongodb/test/emqx_bridge_mongodb_SUITE.erl b/apps/emqx_bridge_mongodb/test/emqx_bridge_mongodb_SUITE.erl index cedb19b88e..d87e1665fa 100644 --- a/apps/emqx_bridge_mongodb/test/emqx_bridge_mongodb_SUITE.erl +++ b/apps/emqx_bridge_mongodb/test/emqx_bridge_mongodb_SUITE.erl @@ -132,7 +132,17 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_mgmt_api_test_util:end_suite(), - ok = emqx_common_test_helpers:stop_apps([emqx_mongodb, emqx_bridge, emqx_rule_engine, emqx_conf]), + ok = emqx_common_test_helpers:stop_apps( + [ + emqx_management, + emqx_bridge_mongodb, + emqx_mongodb, + emqx_bridge, + emqx_connector, + emqx_rule_engine, + emqx_conf + ] + ), ok. init_per_testcase(_Testcase, Config) -> @@ -144,6 +154,7 @@ init_per_testcase(_Testcase, Config) -> end_per_testcase(_Testcase, Config) -> clear_db(Config), delete_bridge(Config), + [] = emqx_connector:list(), snabbkaffe:stop(), ok. @@ -157,9 +168,17 @@ start_apps() -> %% we want to make sure they are loaded before %% ekka start in emqx_common_test_helpers:start_apps/1 emqx_common_test_helpers:render_and_load_app_config(emqx_conf), - ok = emqx_common_test_helpers:start_apps([ - emqx_conf, emqx_rule_engine, emqx_bridge, emqx_mongodb - ]). + ok = emqx_common_test_helpers:start_apps( + [ + emqx_conf, + emqx_rule_engine, + emqx_connector, + emqx_bridge, + emqx_mongodb, + emqx_bridge_mongodb, + emqx_management + ] + ). ensure_loaded() -> _ = application:load(emqtt), @@ -198,6 +217,7 @@ mongo_config(MongoHost, MongoPort0, rs = Type, Config) -> "\n w_mode = safe" "\n use_legacy_protocol = auto" "\n database = mqtt" + "\n mongo_type = rs" "\n resource_opts = {" "\n query_mode = ~s" "\n worker_pool_size = 1" @@ -224,6 +244,7 @@ mongo_config(MongoHost, MongoPort0, sharded = Type, Config) -> "\n w_mode = safe" "\n use_legacy_protocol = auto" "\n database = mqtt" + "\n mongo_type = sharded" "\n resource_opts = {" "\n query_mode = ~s" "\n worker_pool_size = 1" @@ -253,6 +274,7 @@ mongo_config(MongoHost, MongoPort0, single = Type, Config) -> "\n auth_source = ~s" "\n username = ~s" "\n password = \"file://~s\"" + "\n mongo_type = single" "\n resource_opts = {" "\n query_mode = ~s" "\n worker_pool_size = 1" @@ -290,13 +312,17 @@ create_bridge(Config, Overrides) -> delete_bridge(Config) -> Type = mongo_type_bin(?config(mongo_type, Config)), Name = ?config(mongo_name, Config), - emqx_bridge:remove(Type, Name). + emqx_bridge:check_deps_and_remove(Type, Name, [connector, rule_actions]). create_bridge_http(Params) -> Path = emqx_mgmt_api_test_util:api_path(["bridges"]), AuthHeader = emqx_mgmt_api_test_util:auth_header_(), - case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of - {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])}; + case + emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, #{ + return_all => true + }) + of + {ok, {{_, 201, _}, _, Body}} -> {ok, emqx_utils_json:decode(Body, [return_maps])}; Error -> Error end. @@ -564,8 +590,8 @@ t_get_status_server_selection_too_short(Config) -> ok. t_use_legacy_protocol_option(Config) -> - ResourceID = resource_id(Config), {ok, _} = create_bridge(Config, #{<<"use_legacy_protocol">> => <<"true">>}), + ResourceID = resource_id(Config), ?retry( _Interval0 = 200, _NAttempts0 = 20, diff --git a/apps/emqx_bridge_mongodb/test/emqx_bridge_v2_mongodb_SUITE.erl b/apps/emqx_bridge_mongodb/test/emqx_bridge_v2_mongodb_SUITE.erl new file mode 100644 index 0000000000..9fd13c50b4 --- /dev/null +++ b/apps/emqx_bridge_mongodb/test/emqx_bridge_v2_mongodb_SUITE.erl @@ -0,0 +1,232 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_bridge_v2_mongodb_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-define(BRIDGE_TYPE, mongodb). +-define(BRIDGE_TYPE_BIN, <<"mongodb">>). +-define(CONNECTOR_TYPE, mongodb). +-define(CONNECTOR_TYPE_BIN, <<"mongodb">>). + +-import(emqx_common_test_helpers, [on_exit/1]). +-import(emqx_utils_conv, [bin/1]). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + MongoHost = os:getenv("MONGO_SINGLE_HOST", "mongo"), + MongoPort = list_to_integer(os:getenv("MONGO_SINGLE_PORT", "27017")), + case emqx_common_test_helpers:is_tcp_server_available(MongoHost, MongoPort) of + true -> + Apps = emqx_cth_suite:start( + [ + emqx, + emqx_conf, + emqx_connector, + emqx_bridge, + emqx_bridge_mongodb, + emqx_rule_engine, + emqx_management, + {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + {ok, Api} = emqx_common_test_http:create_default_app(), + [ + {apps, Apps}, + {api, Api}, + {mongo_host, MongoHost}, + {mongo_port, MongoPort} + | Config + ]; + false -> + case os:getenv("IS_CI") of + "yes" -> + throw(no_mongo); + _ -> + {skip, no_mongo} + end + end. + +end_per_suite(Config) -> + Apps = ?config(apps, Config), + emqx_cth_suite:stop(Apps), + ok. + +init_per_testcase(TestCase, Config) -> + common_init_per_testcase(TestCase, Config). + +common_init_per_testcase(TestCase, Config) -> + ct:timetrap(timer:seconds(60)), + emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), + emqx_config:delete_override_conf_files(), + UniqueNum = integer_to_binary(erlang:unique_integer()), + Name = iolist_to_binary([atom_to_binary(TestCase), UniqueNum]), + AuthSource = bin(os:getenv("MONGO_AUTHSOURCE", "admin")), + Username = bin(os:getenv("MONGO_USERNAME", "")), + Password = bin(os:getenv("MONGO_PASSWORD", "")), + Passfile = filename:join(?config(priv_dir, Config), "passfile"), + ok = file:write_file(Passfile, Password), + NConfig = [ + {mongo_authsource, AuthSource}, + {mongo_username, Username}, + {mongo_password, Password}, + {mongo_passfile, Passfile} + | Config + ], + ConnectorConfig = connector_config(Name, NConfig), + BridgeConfig = bridge_config(Name, Name), + ok = snabbkaffe:start_trace(), + [ + {connector_type, ?CONNECTOR_TYPE}, + {connector_name, Name}, + {connector_config, ConnectorConfig}, + {bridge_type, ?BRIDGE_TYPE}, + {bridge_name, Name}, + {bridge_config, BridgeConfig} + | NConfig + ]. + +end_per_testcase(_Testcase, Config) -> + case proplists:get_bool(skip_does_not_apply, Config) of + true -> + ok; + false -> + emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), + emqx_common_test_helpers:call_janitor(60_000), + ok = snabbkaffe:stop(), + ok + end. + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + +connector_config(Name, Config) -> + MongoHost = ?config(mongo_host, Config), + MongoPort = ?config(mongo_port, Config), + AuthSource = ?config(mongo_authsource, Config), + Username = ?config(mongo_username, Config), + PassFile = ?config(mongo_passfile, Config), + InnerConfigMap0 = + #{ + <<"enable">> => true, + <<"database">> => <<"mqtt">>, + <<"parameters">> => + #{ + <<"mongo_type">> => <<"single">>, + <<"server">> => iolist_to_binary([MongoHost, ":", integer_to_binary(MongoPort)]), + <<"w_mode">> => <<"safe">> + }, + <<"pool_size">> => 8, + <<"srv_record">> => false, + <<"username">> => Username, + <<"password">> => iolist_to_binary(["file://", PassFile]), + <<"auth_source">> => AuthSource + }, + InnerConfigMap = serde_roundtrip(InnerConfigMap0), + parse_and_check_connector_config(InnerConfigMap, Name). + +parse_and_check_connector_config(InnerConfigMap, Name) -> + TypeBin = ?CONNECTOR_TYPE_BIN, + RawConf = #{<<"connectors">> => #{TypeBin => #{Name => InnerConfigMap}}}, + #{<<"connectors">> := #{TypeBin := #{Name := Config}}} = + hocon_tconf:check_plain(emqx_connector_schema, RawConf, #{ + required => false, atom_key => false + }), + ct:pal("parsed config: ~p", [Config]), + InnerConfigMap. + +bridge_config(Name, ConnectorId) -> + InnerConfigMap0 = + #{ + <<"enable">> => true, + <<"connector">> => ConnectorId, + <<"parameters">> => + #{}, + <<"local_topic">> => <<"t/aeh">> + %%, + }, + InnerConfigMap = serde_roundtrip(InnerConfigMap0), + parse_and_check_bridge_config(InnerConfigMap, Name). + +%% check it serializes correctly +serde_roundtrip(InnerConfigMap0) -> + IOList = hocon_pp:do(InnerConfigMap0, #{}), + {ok, InnerConfigMap} = hocon:binary(IOList), + InnerConfigMap. + +parse_and_check_bridge_config(InnerConfigMap, Name) -> + TypeBin = ?BRIDGE_TYPE_BIN, + RawConf = #{<<"bridges">> => #{TypeBin => #{Name => InnerConfigMap}}}, + hocon_tconf:check_plain(emqx_bridge_v2_schema, RawConf, #{required => false, atom_key => false}), + InnerConfigMap. + +shared_secret_path() -> + os:getenv("CI_SHARED_SECRET_PATH", "/var/lib/secret"). + +shared_secret(client_keyfile) -> + filename:join([shared_secret_path(), "client.key"]); +shared_secret(client_certfile) -> + filename:join([shared_secret_path(), "client.crt"]); +shared_secret(client_cacertfile) -> + filename:join([shared_secret_path(), "ca.crt"]); +shared_secret(rig_keytab) -> + filename:join([shared_secret_path(), "rig.keytab"]). + +make_message() -> + Time = erlang:unique_integer(), + BinTime = integer_to_binary(Time), + Payload = emqx_guid:to_hexstr(emqx_guid:gen()), + #{ + clientid => BinTime, + payload => Payload, + timestamp => Time + }. + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_start_stop(Config) -> + emqx_bridge_v2_testlib:t_start_stop(Config, mongodb_stopped), + ok. + +t_create_via_http(Config) -> + emqx_bridge_v2_testlib:t_create_via_http(Config), + ok. + +t_on_get_status(Config) -> + emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}), + ok. + +t_sync_query(Config) -> + ok = emqx_bridge_v2_testlib:t_sync_query( + Config, + fun make_message/0, + fun(Res) -> ?assertEqual(ok, Res) end, + mongo_bridge_connector_on_query_return + ), + ok. diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index 6c303dd7e5..535917e4ef 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -20,8 +20,8 @@ resource_type(Type) when is_binary(Type) -> resource_type(binary_to_atom(Type, utf8)); -%% We use AEH's Kafka interface. resource_type(azure_event_hub_producer) -> + %% We use AEH's Kafka interface. emqx_bridge_kafka_impl_producer; resource_type(confluent_producer) -> emqx_bridge_kafka_impl_producer; @@ -29,6 +29,8 @@ resource_type(gcp_pubsub_producer) -> emqx_bridge_gcp_pubsub_impl_producer; resource_type(kafka_producer) -> emqx_bridge_kafka_impl_producer; +resource_type(mongodb) -> + emqx_bridge_mongodb_connector; resource_type(syskeeper_forwarder) -> emqx_bridge_syskeeper_connector; resource_type(syskeeper_proxy) -> @@ -83,6 +85,14 @@ connector_structs() -> required => false } )}, + {mongodb, + mk( + hoconsc:map(name, ref(emqx_bridge_mongodb, "config_connector")), + #{ + desc => <<"MongoDB Connector Config">>, + required => false + } + )}, {syskeeper_forwarder, mk( hoconsc:map(name, ref(emqx_bridge_syskeeper_connector, config)), @@ -119,6 +129,7 @@ schema_modules() -> emqx_bridge_confluent_producer, emqx_bridge_gcp_pubsub_producer_schema, emqx_bridge_kafka, + emqx_bridge_mongodb, emqx_bridge_syskeeper_connector, emqx_bridge_syskeeper_proxy ]. @@ -133,12 +144,13 @@ api_schemas(Method) -> api_ref( emqx_bridge_confluent_producer, <<"confluent_producer">>, Method ++ "_connector" ), - api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"), api_ref( emqx_bridge_gcp_pubsub_producer_schema, <<"gcp_pubsub_producer">>, Method ++ "_connector" ), + api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"), + api_ref(emqx_bridge_mongodb, <<"mongodb">>, Method ++ "_connector"), api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method), api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method) ]. diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index 2330e54919..765a693e20 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -68,8 +68,9 @@ enterprise_fields_connectors() -> []. connector_type_to_bridge_types(azure_event_hub_producer) -> [azure_event_hub_producer]; connector_type_to_bridge_types(confluent_producer) -> [confluent_producer]; -connector_type_to_bridge_types(gcp_pubsub_producer) -> [gcp_pubsub_producer]; +connector_type_to_bridge_types(gcp_pubsub_producer) -> [gcp_pubsub, gcp_pubsub_producer]; connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer]; +connector_type_to_bridge_types(mongodb) -> [mongodb, mongodb_rs, mongodb_sharded, mongodb_single]; connector_type_to_bridge_types(syskeeper_forwarder) -> [syskeeper_forwarder]; connector_type_to_bridge_types(syskeeper_proxy) -> []. @@ -266,8 +267,9 @@ transform_old_style_bridges_to_connector_and_actions_of_type( RawConfigSoFar1 ), %% Add action + ActionType = emqx_action_info:bridge_v1_type_to_action_type(to_bin(BridgeType)), RawConfigSoFar3 = emqx_utils_maps:deep_put( - [actions_config_name(), to_bin(maybe_rename(BridgeType)), BridgeName], + [actions_config_name(), to_bin(ActionType), BridgeName], RawConfigSoFar2, ActionMap ), @@ -286,12 +288,6 @@ transform_bridges_v1_to_connectors_and_bridges_v2(RawConfig) -> ), NewRawConf. -%% v1 uses 'kafka' as bridge type v2 uses 'kafka_producer' -maybe_rename(kafka) -> - kafka_producer; -maybe_rename(Name) -> - Name. - %%====================================================================================== %% HOCON Schema Callbacks %%====================================================================================== diff --git a/apps/emqx_mongodb/src/emqx_mongodb.erl b/apps/emqx_mongodb/src/emqx_mongodb.erl index 3adf52e6d4..2c246e5065 100644 --- a/apps/emqx_mongodb/src/emqx_mongodb.erl +++ b/apps/emqx_mongodb/src/emqx_mongodb.erl @@ -68,19 +68,10 @@ roots() -> }} ]. -fields(single) -> - [ - {mongo_type, #{ - type => single, - default => single, - desc => ?DESC("single_mongo_type") - }}, - {server, server()}, - {w_mode, fun w_mode/1} - ] ++ mongo_fields(); -fields(rs) -> +fields("connector_rs") -> [ {mongo_type, #{ + required => true, type => rs, default => rs, desc => ?DESC("rs_mongo_type") @@ -89,17 +80,51 @@ fields(rs) -> {w_mode, fun w_mode/1}, {r_mode, fun r_mode/1}, {replica_set_name, fun replica_set_name/1} - ] ++ mongo_fields(); -fields(sharded) -> + ]; +fields("connector_sharded") -> [ {mongo_type, #{ + required => true, type => sharded, default => sharded, desc => ?DESC("sharded_mongo_type") }}, {servers, servers()}, {w_mode, fun w_mode/1} - ] ++ mongo_fields(); + ]; +fields("connector_single") -> + [ + {mongo_type, #{ + required => true, + type => single, + default => single, + desc => ?DESC("single_mongo_type") + }}, + {server, server()}, + {w_mode, fun w_mode/1} + ]; +fields(Type) when Type =:= rs; Type =:= single; Type =:= sharded -> + fields("connector_" ++ atom_to_list(Type)) ++ fields(mongodb); +fields(mongodb) -> + [ + {srv_record, fun srv_record/1}, + {pool_size, fun emqx_connector_schema_lib:pool_size/1}, + {username, fun emqx_connector_schema_lib:username/1}, + {password, emqx_connector_schema_lib:password_field()}, + {use_legacy_protocol, + hoconsc:mk(hoconsc:enum([auto, true, false]), #{ + default => auto, + desc => ?DESC("use_legacy_protocol") + })}, + {auth_source, #{ + type => binary(), + required => false, + desc => ?DESC("auth_source") + }}, + {database, fun emqx_connector_schema_lib:database/1}, + {topology, #{type => hoconsc:ref(?MODULE, topology), required => false}} + ] ++ + emqx_connector_schema_lib:ssl_fields(); fields(topology) -> [ {pool_size, @@ -129,6 +154,12 @@ fields(topology) -> {min_heartbeat_frequency_ms, duration("min_heartbeat_period")} ]. +desc("connector_single") -> + ?DESC("desc_single"); +desc("connector_rs") -> + ?DESC("desc_rs"); +desc("connector_sharded") -> + ?DESC("desc_sharded"); desc(single) -> ?DESC("desc_single"); desc(rs) -> @@ -140,27 +171,6 @@ desc(topology) -> desc(_) -> undefined. -mongo_fields() -> - [ - {srv_record, fun srv_record/1}, - {pool_size, fun emqx_connector_schema_lib:pool_size/1}, - {username, fun emqx_connector_schema_lib:username/1}, - {password, emqx_connector_schema_lib:password_field()}, - {use_legacy_protocol, - hoconsc:mk(hoconsc:enum([auto, true, false]), #{ - default => auto, - desc => ?DESC("use_legacy_protocol") - })}, - {auth_source, #{ - type => binary(), - required => false, - desc => ?DESC("auth_source") - }}, - {database, fun emqx_connector_schema_lib:database/1}, - {topology, #{type => hoconsc:ref(?MODULE, topology), required => false}} - ] ++ - emqx_connector_schema_lib:ssl_fields(). - %% =================================================================== callback_mode() -> always_sync. @@ -236,7 +246,7 @@ on_stop(InstId, _State) -> on_query( InstId, - {send_message, Document}, + {_ChannelId, Document}, #{pool_name := PoolName, collection := Collection} = State ) -> Request = {insert, Collection, Document}, diff --git a/rel/i18n/emqx_bridge_mongodb.hocon b/rel/i18n/emqx_bridge_mongodb.hocon index 4edd1182d1..d7c14588ba 100644 --- a/rel/i18n/emqx_bridge_mongodb.hocon +++ b/rel/i18n/emqx_bridge_mongodb.hocon @@ -48,6 +48,12 @@ mongodb_single_conf.desc: mongodb_single_conf.label: """MongoDB (Standalone) Configuration""" +mongodb_parameters.label: +"""MongoDB Type Specific Parameters""" + +mongodb_parameters.desc: +"""Set of parameters specific for the given type of this MongoDB connector, `mongo_type` can be one of `single` (Standalone), `sharded` (Sharded) or `rs` (Replica Set).""" + payload_template.desc: """The template for formatting the outgoing messages. If undefined, rule engine will use JSON format to serialize all visible inputs, such as clientid, topic, payload etc.""" @@ -59,4 +65,19 @@ batch_size.desc: batch_size.label: """Batch Size""" +action_parameters.label: +"""Action Parameters""" +action_parameters.desc: +"""Additional parameters specific to this action type""" + +mongodb_action.label: +"""MongoDB Action""" +mongodb_action.desc: +"""Action to interact with a MongoDB connector""" + +mqtt_topic.desc: +"""MQTT topic or topic filter as data source (bridge input). If rule action is used as data source, this config should be left empty, otherwise messages will be duplicated in MongoDB.""" +mqtt_topic.label: +"""Source MQTT Topic""" + } From c1ef773e770797807053ce95614d548dfdd47fdf Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Thu, 23 Nov 2023 13:25:03 +0100 Subject: [PATCH 4/4] fix: check for sane state after regular shutdown --- .../test/emqx_bridge_kafka_impl_producer_SUITE.erl | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl index 2a8a42a09e..09d3f78aa7 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl @@ -483,11 +483,10 @@ t_failed_creation_then_fix(Config) -> {ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset), ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg), % %% TODO: refactor those into init/end per testcase - ok = ?PRODUCER:on_stop(ResourceId, State), - ?assertEqual([], supervisor:which_children(wolff_client_sup)), - ?assertEqual([], supervisor:which_children(wolff_producers_sup)), ok = emqx_bridge:remove(list_to_atom(Type), list_to_atom(Name)), delete_all_bridges(), + ?assertEqual([], supervisor:which_children(wolff_client_sup)), + ?assertEqual([], supervisor:which_children(wolff_producers_sup)), ok. t_custom_timestamp(_Config) ->