Skip to content

Commit

Permalink
Merge pull request #11963 from sstrigler/EMQX-11156-bridge-v-2-mongo-…
Browse files Browse the repository at this point in the history
…db-support

EMQX 11156 bridge v2 mongo db support
  • Loading branch information
sstrigler committed Nov 24, 2023
2 parents 8f548f4 + c1ef773 commit f8f8cf9
Show file tree
Hide file tree
Showing 17 changed files with 809 additions and 197 deletions.
12 changes: 9 additions & 3 deletions apps/emqx_bridge/src/emqx_action_info.erl
Expand Up @@ -77,6 +77,7 @@ hard_coded_action_info_modules_ee() ->
emqx_bridge_confluent_producer_action_info,
emqx_bridge_gcp_pubsub_producer_action_info,
emqx_bridge_kafka_action_info,
emqx_bridge_mongodb_action_info,
emqx_bridge_syskeeper_action_info
].
-else.
Expand Down Expand Up @@ -116,22 +117,27 @@ bridge_v1_type_to_action_type(Type) ->

action_type_to_bridge_v1_type(Bin, Conf) when is_binary(Bin) ->
action_type_to_bridge_v1_type(binary_to_existing_atom(Bin), Conf);
action_type_to_bridge_v1_type(ActionType, Conf) ->
action_type_to_bridge_v1_type(ActionType, ActionConf) ->
ActionInfoMap = info_map(),
ActionTypeToBridgeV1Type = maps:get(action_type_to_bridge_v1_type, ActionInfoMap),
case maps:get(ActionType, ActionTypeToBridgeV1Type, undefined) of
undefined ->
ActionType;
BridgeV1TypeFun when is_function(BridgeV1TypeFun) ->
BridgeV1TypeFun(get_confs(ActionType, Conf));
case get_confs(ActionType, ActionConf) of
{ConnectorConfig, ActionConfig} -> BridgeV1TypeFun({ConnectorConfig, ActionConfig});
undefined -> ActionType
end;
BridgeV1Type ->
BridgeV1Type
end.

get_confs(ActionType, #{<<"connector">> := ConnectorName} = ActionConfig) ->
ConnectorType = action_type_to_connector_type(ActionType),
ConnectorConfig = emqx_conf:get_raw([connectors, ConnectorType, ConnectorName]),
{ActionConfig, ConnectorConfig}.
{ConnectorConfig, ActionConfig};
get_confs(_, _) ->
undefined.

%% This function should return true for all inputs that are bridge V1 types for
%% bridges that have been refactored to bridge V2s, and for all all bridge V2
Expand Down
18 changes: 12 additions & 6 deletions apps/emqx_bridge/src/emqx_bridge.erl
Expand Up @@ -237,9 +237,15 @@ send_to_matched_egress_bridges_loop(Topic, Msg, [Id | Ids]) ->
send_to_matched_egress_bridges_loop(Topic, Msg, Ids).

send_message(BridgeId, Message) ->
{BridgeType, BridgeName} = emqx_bridge_resource:parse_bridge_id(BridgeId),
ResId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
send_message(BridgeType, BridgeName, ResId, Message, #{}).
{BridgeV1Type, BridgeName} = emqx_bridge_resource:parse_bridge_id(BridgeId),
case emqx_bridge_v2:is_bridge_v2_type(BridgeV1Type) of
true ->
BridgeV2Type = emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
emqx_bridge_v2:send_message(BridgeV2Type, BridgeName, Message, #{});
false ->
ResId = emqx_bridge_resource:resource_id(BridgeV1Type, BridgeName),
send_message(BridgeV1Type, BridgeName, ResId, Message, #{})
end.

send_message(BridgeType, BridgeName, ResId, Message, QueryOpts0) ->
case emqx:get_config([?ROOT_KEY, BridgeType, BridgeName], not_found) of
Expand Down Expand Up @@ -377,8 +383,8 @@ disable_enable(Action, BridgeType0, BridgeName) when
)
end.

create(BridgeType0, BridgeName, RawConf) ->
BridgeType = upgrade_type(BridgeType0),
create(BridgeV1Type, BridgeName, RawConf) ->
BridgeType = upgrade_type(BridgeV1Type),
?SLOG(debug, #{
bridge_action => create,
bridge_type => BridgeType,
Expand All @@ -387,7 +393,7 @@ create(BridgeType0, BridgeName, RawConf) ->
}),
case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of
true ->
emqx_bridge_v2:bridge_v1_split_config_and_create(BridgeType, BridgeName, RawConf);
emqx_bridge_v2:bridge_v1_split_config_and_create(BridgeV1Type, BridgeName, RawConf);
false ->
emqx_conf:update(
emqx_bridge:config_key_path() ++ [BridgeType, BridgeName],
Expand Down
14 changes: 8 additions & 6 deletions apps/emqx_bridge/src/emqx_bridge_lib.erl
Expand Up @@ -78,6 +78,14 @@ external_ids(Type, Name) ->
[external_id(Type0, Name), external_id(Type, Name)]
end.

get_conf(BridgeType, BridgeName) ->
case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of
true ->
emqx_conf:get_raw([actions, BridgeType, BridgeName]);
false ->
undefined
end.

%% Creates the external id for the bridge_v2 that is used by the rule actions
%% to refer to the bridge_v2
external_id(BridgeType, BridgeName) ->
Expand All @@ -87,9 +95,3 @@ external_id(BridgeType, BridgeName) ->

bin(Bin) when is_binary(Bin) -> Bin;
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).

get_conf(BridgeType, BridgeName) ->
case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of
true -> emqx_conf:get_raw([actions, BridgeType, BridgeName]);
false -> emqx_conf:get_raw([bridges, BridgeType, BridgeName])
end.
14 changes: 8 additions & 6 deletions apps/emqx_bridge/src/emqx_bridge_v2.erl
Expand Up @@ -410,10 +410,10 @@ uninstall_bridge_v2(
CreationOpts = emqx_resource:fetch_creation_opts(Config),
ok = emqx_resource_buffer_worker_sup:stop_workers(BridgeV2Id, CreationOpts),
ok = emqx_resource:clear_metrics(BridgeV2Id),
case combine_connector_and_bridge_v2_config(BridgeV2Type, BridgeName, Config) of
case validate_referenced_connectors(BridgeV2Type, ConnectorName, BridgeName) of
{error, _} ->
ok;
_CombinedConfig ->
ok ->
%% Deinstall from connector
ConnectorId = emqx_connector_resource:resource_id(
connector_type(BridgeV2Type), ConnectorName
Expand Down Expand Up @@ -1053,8 +1053,8 @@ bridge_v1_is_valid(BridgeV1Type, BridgeName) ->
bridge_v1_type_to_bridge_v2_type(Type) ->
emqx_action_info:bridge_v1_type_to_action_type(Type).

bridge_v2_type_to_bridge_v1_type(Type, Conf) ->
emqx_action_info:action_type_to_bridge_v1_type(Type, Conf).
bridge_v2_type_to_bridge_v1_type(ActionType, ActionConf) ->
emqx_action_info:action_type_to_bridge_v1_type(ActionType, ActionConf).

is_bridge_v2_type(Type) ->
emqx_action_info:is_action_type(Type).
Expand All @@ -1065,8 +1065,8 @@ bridge_v1_list_and_transform() ->

bridge_v1_lookup_and_transform(ActionType, Name) ->
case lookup(ActionType, Name) of
{ok, #{raw_config := #{<<"connector">> := ConnectorName}} = ActionConfig} ->
BridgeV1Type = ?MODULE:bridge_v2_type_to_bridge_v1_type(ActionType, ActionConfig),
{ok, #{raw_config := #{<<"connector">> := ConnectorName} = RawConfig} = ActionConfig} ->
BridgeV1Type = ?MODULE:bridge_v2_type_to_bridge_v1_type(ActionType, RawConfig),
case ?MODULE:bridge_v1_is_valid(BridgeV1Type, Name) of
true ->
ConnectorType = connector_type(ActionType),
Expand Down Expand Up @@ -1244,6 +1244,8 @@ split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf, PreviousR
#{bin(BridgeV2Type) => #{bin(BridgeName) => PreviousRawConf}},
PreviousRawConf =/= undefined
),
%% [FIXME] this will loop through all connector types, instead pass the
%% connector type and just do it for that one
Output = emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2(
FakeGlobalConfig
),
Expand Down
34 changes: 20 additions & 14 deletions apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl
Expand Up @@ -552,18 +552,24 @@ t_on_get_status(Config, Opts) ->
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
),
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
ct:sleep(500),
?retry(
_Interval0 = 200,
_Attempts0 = 10,
?assertEqual({ok, FailureStatus}, emqx_resource_manager:health_check(ResourceId))
)
end),
%% Check that it recovers itself.
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
),
case ProxyHost of
undefined ->
ok;
_ ->
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
?retry(
_Interval0 = 100,
_Attempts0 = 20,
?assertEqual(
{ok, FailureStatus}, emqx_resource_manager:health_check(ResourceId)
)
)
end),
%% Check that it recovers itself.
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
)
end,
ok.
83 changes: 42 additions & 41 deletions apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl
Expand Up @@ -112,16 +112,15 @@ values({put, connector}) ->
values({put, KafkaType}) ->
maps:merge(values(common_config), values(KafkaType));
values(bridge_v2_producer) ->
maps:merge(
#{
enable => true,
connector => <<"my_kafka_producer_connector">>,
resource_opts => #{
health_check_interval => "32s"
}
},
values(producer)
);
#{
enable => true,
connector => <<"my_kafka_producer_connector">>,
parameters => values(producer_values),
local_topic => <<"mqtt/local/topic">>,
resource_opts => #{
health_check_interval => "32s"
}
};
values(common_config) ->
#{
authentication => #{
Expand All @@ -143,39 +142,41 @@ values(common_config) ->
};
values(producer) ->
#{
kafka => #{
topic => <<"kafka-topic">>,
message => #{
key => <<"${.clientid}">>,
value => <<"${.}">>,
timestamp => <<"${.timestamp}">>
kafka => values(producer_values),
local_topic => <<"mqtt/local/topic">>
};
values(producer_values) ->
#{
topic => <<"kafka-topic">>,
message => #{
key => <<"${.clientid}">>,
value => <<"${.}">>,
timestamp => <<"${.timestamp}">>
},
max_batch_bytes => <<"896KB">>,
compression => <<"no_compression">>,
partition_strategy => <<"random">>,
required_acks => <<"all_isr">>,
partition_count_refresh_interval => <<"60s">>,
kafka_headers => <<"${pub_props}">>,
kafka_ext_headers => [
#{
kafka_ext_header_key => <<"clientid">>,
kafka_ext_header_value => <<"${clientid}">>
},
max_batch_bytes => <<"896KB">>,
compression => <<"no_compression">>,
partition_strategy => <<"random">>,
required_acks => <<"all_isr">>,
partition_count_refresh_interval => <<"60s">>,
kafka_headers => <<"${pub_props}">>,
kafka_ext_headers => [
#{
kafka_ext_header_key => <<"clientid">>,
kafka_ext_header_value => <<"${clientid}">>
},
#{
kafka_ext_header_key => <<"topic">>,
kafka_ext_header_value => <<"${topic}">>
}
],
kafka_header_value_encode_mode => none,
max_inflight => 10,
buffer => #{
mode => <<"hybrid">>,
per_partition_limit => <<"2GB">>,
segment_bytes => <<"100MB">>,
memory_overload_protection => true
#{
kafka_ext_header_key => <<"topic">>,
kafka_ext_header_value => <<"${topic}">>
}
},
local_topic => <<"mqtt/local/topic">>
],
kafka_header_value_encode_mode => none,
max_inflight => 10,
buffer => #{
mode => <<"hybrid">>,
per_partition_limit => <<"2GB">>,
segment_bytes => <<"100MB">>,
memory_overload_protection => true
}
};
values(consumer) ->
#{
Expand Down
Expand Up @@ -483,11 +483,10 @@ t_failed_creation_then_fix(Config) ->
{ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset),
?assertMatch(#kafka_message{key = BinTime}, KafkaMsg),
% %% TODO: refactor those into init/end per testcase
ok = ?PRODUCER:on_stop(ResourceId, State),
?assertEqual([], supervisor:which_children(wolff_client_sup)),
?assertEqual([], supervisor:which_children(wolff_producers_sup)),
ok = emqx_bridge:remove(list_to_atom(Type), list_to_atom(Name)),
delete_all_bridges(),
?assertEqual([], supervisor:which_children(wolff_client_sup)),
?assertEqual([], supervisor:which_children(wolff_producers_sup)),
ok.

t_custom_timestamp(_Config) ->
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src
Expand Up @@ -9,7 +9,7 @@
emqx_resource,
emqx_mongodb
]},
{env, []},
{env, [{emqx_action_info_modules, [emqx_bridge_mongodb_action_info]}]},
{modules, []},
{links, []}
]}.

0 comments on commit f8f8cf9

Please sign in to comment.