-
Notifications
You must be signed in to change notification settings - Fork 2.2k
/
emqx_bridge_kafka_consumer_action_info.erl
105 lines (88 loc) · 3.86 KB
/
emqx_bridge_kafka_consumer_action_info.erl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_kafka_consumer_action_info).
-behaviour(emqx_action_info).
-export([
is_source/0,
is_action/0,
bridge_v1_type_name/0,
action_type_name/0,
connector_type_name/0,
schema_module/0,
connector_action_config_to_bridge_v1_config/2,
bridge_v1_config_to_action_config/2
]).
is_source() -> true.
is_action() -> false.
bridge_v1_type_name() -> kafka_consumer.
action_type_name() -> kafka_consumer.
connector_type_name() -> kafka_consumer.
schema_module() -> emqx_bridge_kafka_consumer_schema.
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
V1Config1 = maps:remove(<<"connector">>, ActionConfig),
V1Config2 = emqx_utils_maps:deep_merge(ConnectorConfig, V1Config1),
V1Config3 = maybe_fabricate_topic_mapping(V1Config2),
{Params1, V1Config4} = maps:take(<<"parameters">>, V1Config3),
TopLevelCfgKeys = [to_bin(K) || {K, _} <- emqx_bridge_kafka:fields(consumer_opts), K =/= kafka],
TopLevelCfg = maps:with(TopLevelCfgKeys, Params1),
%% `topic' is v2-only
Params = maps:without([<<"topic">> | TopLevelCfgKeys], Params1),
V1Config5 = emqx_utils_maps:deep_merge(V1Config4, TopLevelCfg),
V1Config = emqx_utils_maps:update_if_present(
<<"resource_opts">>,
%% Slightly different from default source resource opts...
fun(RO) -> maps:with(v1_fields(connector_resource_opts), RO) end,
V1Config5
),
maps:put(<<"kafka">>, Params, V1Config).
bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) ->
Config0 = emqx_action_info:transform_bridge_v1_config_to_action_config(
BridgeV1Conf, ConnectorName, schema_module(), source_parameters
),
TopicMapping = maps:get(<<"topic_mapping">>, BridgeV1Conf, []),
Params0 = maps:get(<<"kafka">>, BridgeV1Conf, #{}),
Params1 = maps:with(source_parameters_field_keys(), Params0),
Params2 = emqx_utils_maps:put_if(
Params1, <<"topic_mapping">>, TopicMapping, TopicMapping =/= []
),
Params = maybe_set_kafka_topic(Params2),
{source, action_type_name(), maps:put(<<"parameters">>, Params, Config0)}.
%%------------------------------------------------------------------------------------------
%% Internal helper functions
%%------------------------------------------------------------------------------------------
%% The new schema has a single kafka topic, so we take it from topic mapping when
%% converting from v1.
maybe_set_kafka_topic(#{<<"topic_mapping">> := [#{<<"kafka_topic">> := Topic} | _]} = Params) ->
Params#{<<"topic">> => Topic};
maybe_set_kafka_topic(Params) ->
Params.
%% The old schema requires `topic_mapping', which is now hidden.
maybe_fabricate_topic_mapping(#{<<"parameters">> := Params0} = BridgeV1Config0) ->
#{<<"topic">> := Topic} = Params0,
case maps:get(<<"topic_mapping">>, Params0, undefined) of
[_ | _] ->
BridgeV1Config0;
_ ->
%% Have to fabricate an MQTT topic, unfortunately... QoS and payload already
%% have defaults.
FakeTopicMapping = #{
<<"kafka_topic">> => Topic,
<<"mqtt_topic">> => <<>>
},
Params = Params0#{<<"topic_mapping">> => [FakeTopicMapping]},
BridgeV1Config0#{<<"parameters">> := Params}
end.
v1_fields(StructName) ->
[
to_bin(K)
|| {K, _} <- emqx_bridge_kafka:fields(StructName)
].
source_parameters_field_keys() ->
[
to_bin(K)
|| {K, _} <- emqx_bridge_kafka_consumer_schema:fields(source_parameters)
].
to_bin(B) when is_binary(B) -> B;
to_bin(L) when is_list(L) -> list_to_binary(L);
to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8).