diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index 12fda5d51c..4204322ce8 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -92,6 +92,7 @@ hard_coded_action_info_modules_ee() -> emqx_bridge_gcp_pubsub_consumer_action_info, emqx_bridge_gcp_pubsub_producer_action_info, emqx_bridge_kafka_action_info, + emqx_bridge_kafka_consumer_action_info, emqx_bridge_kinesis_action_info, emqx_bridge_hstreamdb_action_info, emqx_bridge_matrix_action_info, diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 56fe0029af..92f36e0d8e 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -765,19 +765,26 @@ create_dry_run(ConfRootKey, Type, Conf0) -> {error, Reason1} end. -create_dry_run_helper(ConfRootKey, BridgeType, ConnectorRawConf, BridgeV2RawConf) -> +create_dry_run_helper(ConfRootKey, BridgeV2Type, ConnectorRawConf, BridgeV2RawConf) -> BridgeName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]), - ConnectorType = connector_type(BridgeType), + ConnectorType = connector_type(BridgeV2Type), OnReadyCallback = fun(ConnectorId) -> {_, ConnectorName} = emqx_connector_resource:parse_connector_id(ConnectorId), - ChannelTestId = id(BridgeType, BridgeName, ConnectorName), - Conf = emqx_utils_maps:unsafe_atom_key_map(BridgeV2RawConf), + ChannelTestId = id(BridgeV2Type, BridgeName, ConnectorName), + BridgeV2Conf0 = fill_defaults( + BridgeV2Type, + BridgeV2RawConf, + bin(ConfRootKey), + emqx_bridge_v2_schema, + #{make_serializable => false} + ), + BridgeV2Conf = emqx_utils_maps:unsafe_atom_key_map(BridgeV2Conf0), AugmentedConf = augment_channel_config( ConfRootKey, - BridgeType, + BridgeV2Type, BridgeName, - Conf + BridgeV2Conf ), case emqx_resource_manager:add_channel(ConnectorId, ChannelTestId, AugmentedConf) of {error, Reason} -> @@ -1204,8 +1211,11 @@ perform_bridge_changes([#{action := Action, data := MapConfs} = Task | Tasks], E perform_bridge_changes(Tasks, Errors). fill_defaults(Type, RawConf, TopLevelConf, SchemaModule) -> + fill_defaults(Type, RawConf, TopLevelConf, SchemaModule, _Opts = #{}). + +fill_defaults(Type, RawConf, TopLevelConf, SchemaModule, Opts) -> PackedConf = pack_bridge_conf(Type, RawConf, TopLevelConf), - FullConf = emqx_config:fill_defaults(SchemaModule, PackedConf, #{}), + FullConf = emqx_config:fill_defaults(SchemaModule, PackedConf, Opts), unpack_bridge_conf(Type, FullConf, TopLevelConf). pack_bridge_conf(Type, RawConf, TopLevelConf) -> diff --git a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl index cd35e8eed0..92c0b43a02 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl @@ -775,7 +775,7 @@ handle_update(ConfRootKey, Id, Conf0) -> Id, case emqx_bridge_v2:lookup(ConfRootKey, BridgeType, BridgeName) of {ok, _} -> - RawConf = emqx:get_raw_config([bridges, BridgeType, BridgeName], #{}), + RawConf = emqx:get_raw_config([ConfRootKey, BridgeType, BridgeName], #{}), Conf = emqx_utils:deobfuscate(Conf1, RawConf), update_bridge(ConfRootKey, BridgeType, BridgeName, Conf); {error, not_found} -> diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl index 44d73a4bb1..9def284d98 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl @@ -89,6 +89,7 @@ end_per_testcase(_Testcase, Config) -> %% in CI, apparently this needs more time since the %% machines struggle with all the containers running... emqx_common_test_helpers:call_janitor(60_000), + delete_all_bridges_and_connectors(), ok = snabbkaffe:stop(), ok end. @@ -132,7 +133,13 @@ parse_and_check(Kind, Type, Name, InnerConfigMap0) -> TypeBin = emqx_utils_conv:bin(Type), RawConf = #{RootBin => #{TypeBin => #{Name => InnerConfigMap0}}}, #{RootBin := #{TypeBin := #{Name := InnerConfigMap}}} = hocon_tconf:check_plain( - emqx_bridge_v2_schema, RawConf, #{required => false, atom_key => false} + emqx_bridge_v2_schema, + RawConf, + #{ + required => false, + atom_key => false, + make_serializable => true + } ), InnerConfigMap. @@ -140,7 +147,13 @@ parse_and_check_connector(Type, Name, InnerConfigMap0) -> TypeBin = emqx_utils_conv:bin(Type), RawConf = #{<<"connectors">> => #{TypeBin => #{Name => InnerConfigMap0}}}, #{<<"connectors">> := #{TypeBin := #{Name := InnerConfigMap}}} = hocon_tconf:check_plain( - emqx_connector_schema, RawConf, #{required => false, atom_key => false} + emqx_connector_schema, + RawConf, + #{ + required => false, + atom_key => false, + make_serializable => true + } ), InnerConfigMap. @@ -282,20 +295,23 @@ list_bridges_api() -> ct:pal("list bridges result: ~p", [Res]), Res. +get_source_api(BridgeType, BridgeName) -> + get_bridge_api(source, BridgeType, BridgeName). + get_bridge_api(BridgeType, BridgeName) -> + get_bridge_api(action, BridgeType, BridgeName). + +get_bridge_api(BridgeKind, BridgeType, BridgeName) -> BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName), Params = [], - Path = emqx_mgmt_api_test_util:api_path(["actions", BridgeId]), - AuthHeader = emqx_mgmt_api_test_util:auth_header_(), - Opts = #{return_all => true}, - ct:pal("get bridge ~p (via http)", [{BridgeType, BridgeName}]), - Res = - case emqx_mgmt_api_test_util:request_api(get, Path, "", AuthHeader, Params, Opts) of - {ok, {Status, Headers, Body0}} -> - {ok, {Status, Headers, emqx_utils_json:decode(Body0, [return_maps])}}; - Error -> - Error + Root = + case BridgeKind of + source -> "sources"; + action -> "actions" end, + Path = emqx_mgmt_api_test_util:api_path([Root, BridgeId]), + ct:pal("get bridge ~p (via http)", [{BridgeKind, BridgeType, BridgeName}]), + Res = request(get, Path, Params), ct:pal("get bridge ~p result: ~p", [{BridgeType, BridgeName}, Res]), Res. @@ -672,7 +688,8 @@ t_async_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) -> end, ok. -%% - `ProduceFn': produces a message in the remote system that shall be consumed. +%% - `ProduceFn': produces a message in the remote system that shall be consumed. May be +%% a `{function(), integer()}' tuple. %% - `Tracepoint': marks the end of consumed message processing. t_consume(Config, Opts) -> #{ @@ -683,14 +700,17 @@ t_consume(Config, Opts) -> } = Opts, ?check_trace( begin - ?assertMatch( - {{ok, _}, {ok, _}}, - snabbkaffe:wait_async_action( - fun() -> create_bridge_api(Config) end, - ConsumerReadyTPFn, - 15_000 - ) - ), + ConsumerReadyTimeout = maps:get(consumer_ready_timeout, Opts, 15_000), + case ConsumerReadyTPFn of + {Predicate, NEvents} when is_function(Predicate) -> + {ok, SRef0} = snabbkaffe:subscribe(Predicate, NEvents, ConsumerReadyTimeout); + Predicate when is_function(Predicate) -> + {ok, SRef0} = snabbkaffe:subscribe( + Predicate, _NEvents = 1, ConsumerReadyTimeout + ) + end, + ?assertMatch({ok, _}, create_bridge_api(Config)), + ?assertMatch({ok, _}, snabbkaffe:receive_events(SRef0)), ok = add_source_hookpoint(Config), ?retry( _Sleep = 200, diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src index a504a42d83..74ba582171 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src @@ -12,7 +12,12 @@ brod, brod_gssapi ]}, - {env, [{emqx_action_info_modules, [emqx_bridge_kafka_action_info]}]}, + {env, [ + {emqx_action_info_modules, [ + emqx_bridge_kafka_action_info, + emqx_bridge_kafka_consumer_action_info + ]} + ]}, {modules, []}, {links, []} diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_action_info.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_action_info.erl new file mode 100644 index 0000000000..5ea6451e83 --- /dev/null +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_action_info.erl @@ -0,0 +1,105 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_kafka_consumer_action_info). + +-behaviour(emqx_action_info). + +-export([ + is_source/0, + is_action/0, + bridge_v1_type_name/0, + action_type_name/0, + connector_type_name/0, + schema_module/0, + connector_action_config_to_bridge_v1_config/2, + bridge_v1_config_to_action_config/2 +]). + +is_source() -> true. + +is_action() -> false. + +bridge_v1_type_name() -> kafka_consumer. + +action_type_name() -> kafka_consumer. + +connector_type_name() -> kafka_consumer. + +schema_module() -> emqx_bridge_kafka_consumer_schema. + +connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) -> + V1Config1 = maps:remove(<<"connector">>, ActionConfig), + V1Config2 = emqx_utils_maps:deep_merge(ConnectorConfig, V1Config1), + V1Config3 = maybe_fabricate_topic_mapping(V1Config2), + {Params1, V1Config4} = maps:take(<<"parameters">>, V1Config3), + TopLevelCfgKeys = [to_bin(K) || {K, _} <- emqx_bridge_kafka:fields(consumer_opts), K =/= kafka], + TopLevelCfg = maps:with(TopLevelCfgKeys, Params1), + %% `topic' is v2-only + Params = maps:without([<<"topic">> | TopLevelCfgKeys], Params1), + V1Config5 = emqx_utils_maps:deep_merge(V1Config4, TopLevelCfg), + V1Config = emqx_utils_maps:update_if_present( + <<"resource_opts">>, + %% Slightly different from default source resource opts... + fun(RO) -> maps:with(v1_fields(connector_resource_opts), RO) end, + V1Config5 + ), + maps:put(<<"kafka">>, Params, V1Config). + +bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) -> + Config0 = emqx_action_info:transform_bridge_v1_config_to_action_config( + BridgeV1Conf, ConnectorName, schema_module(), source_parameters + ), + TopicMapping = maps:get(<<"topic_mapping">>, BridgeV1Conf, []), + Params0 = maps:get(<<"kafka">>, BridgeV1Conf, #{}), + Params1 = maps:with(source_parameters_field_keys(), Params0), + Params2 = emqx_utils_maps:put_if( + Params1, <<"topic_mapping">>, TopicMapping, TopicMapping =/= [] + ), + Params = maybe_set_kafka_topic(Params2), + {source, action_type_name(), maps:put(<<"parameters">>, Params, Config0)}. + +%%------------------------------------------------------------------------------------------ +%% Internal helper functions +%%------------------------------------------------------------------------------------------ + +%% The new schema has a single kafka topic, so we take it from topic mapping when +%% converting from v1. +maybe_set_kafka_topic(#{<<"topic_mapping">> := [#{<<"kafka_topic">> := Topic} | _]} = Params) -> + Params#{<<"topic">> => Topic}; +maybe_set_kafka_topic(Params) -> + Params. + +%% The old schema requires `topic_mapping', which is now hidden. +maybe_fabricate_topic_mapping(#{<<"parameters">> := Params0} = BridgeV1Config0) -> + #{<<"topic">> := Topic} = Params0, + case maps:get(<<"topic_mapping">>, Params0, undefined) of + [_ | _] -> + BridgeV1Config0; + _ -> + %% Have to fabricate an MQTT topic, unfortunately... QoS and payload already + %% have defaults. + FakeTopicMapping = #{ + <<"kafka_topic">> => Topic, + <<"mqtt_topic">> => <<>> + }, + Params = Params0#{<<"topic_mapping">> => [FakeTopicMapping]}, + BridgeV1Config0#{<<"parameters">> := Params} + end. + +v1_fields(StructName) -> + [ + to_bin(K) + || {K, _} <- emqx_bridge_kafka:fields(StructName) + ]. + +source_parameters_field_keys() -> + [ + to_bin(K) + || {K, _} <- emqx_bridge_kafka_consumer_schema:fields(source_parameters) + ]. + +to_bin(B) when is_binary(B) -> B; +to_bin(L) when is_list(L) -> list_to_binary(L); +to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8). diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl new file mode 100644 index 0000000000..266d3f7d9a --- /dev/null +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl @@ -0,0 +1,233 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_kafka_consumer_schema). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-export([ + source_examples/1, + connector_examples/1 +]). + +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +-define(CONNECTOR_TYPE, kafka_consumer). +-define(SOURCE_TYPE, kafka_consumer). + +%%------------------------------------------------------------------------------------------------- +%% `hocon_schema' API +%%------------------------------------------------------------------------------------------------- + +namespace() -> "kafka_consumer". + +roots() -> []. + +%%========================================= +%% Source fields +%%========================================= +fields(source) -> + {kafka_consumer, + mk( + hoconsc:map(name, ref(?MODULE, consumer_source)), + #{ + desc => <<"Kafka Consumer Source Config">>, + required => false + } + )}; +fields(consumer_source) -> + emqx_bridge_v2_schema:make_consumer_action_schema( + mk( + ref(?MODULE, source_parameters), + #{ + required => true, + desc => ?DESC(consumer_source) + } + ) + ); +fields(source_parameters) -> + Fields0 = emqx_bridge_kafka:fields(consumer_kafka_opts), + Fields1 = emqx_bridge_kafka:fields(consumer_opts), + Fields2 = proplists:delete(kafka, Fields1), + Fields = lists:map( + fun + ({topic_mapping = Name, Sc}) -> + %% to please dialyzer... + Override = #{ + type => hocon_schema:field_schema(Sc, type), + required => false, + default => [], + validator => fun(_) -> ok end, + importance => ?IMPORTANCE_HIDDEN + }, + {Name, hocon_schema:override(Sc, Override)}; + (FieldSchema) -> + FieldSchema + end, + Fields0 ++ Fields2 + ), + [ + {topic, + mk( + binary(), + #{ + required => true, + desc => ?DESC(emqx_bridge_kafka, consumer_kafka_topic) + } + )} + | Fields + ]; +%%========================================= +%% HTTP API fields: source +%%========================================= +fields(Field) when + Field == "get_source"; + Field == "post_source"; + Field == "put_source" +-> + emqx_bridge_v2_schema:api_fields(Field, ?SOURCE_TYPE, fields(consumer_source)); +%%========================================= +%% Connector fields +%%========================================= +fields("config_connector") -> + emqx_connector_schema:common_fields() ++ + emqx_bridge_kafka:kafka_connector_config_fields(); +%%========================================= +%% HTTP API fields: connector +%%========================================= +fields(Field) when + Field == "get_connector"; + Field == "put_connector"; + Field == "post_connector" +-> + emqx_connector_schema:api_fields( + Field, + ?CONNECTOR_TYPE, + emqx_bridge_kafka:kafka_connector_config_fields() + ). + +desc("config_connector") -> + ?DESC("config_connector"); +desc(source_parameters) -> + ?DESC(source_parameters); +desc(consumer_source) -> + ?DESC(consumer_source); +desc(connector_resource_opts) -> + ?DESC(emqx_resource_schema, "resource_opts"); +desc(source_resource_opts) -> + ?DESC(emqx_resource_schema, "resource_opts"); +desc(Field) when + Field =:= "get_connector"; + Field =:= "put_connector"; + Field =:= "post_connector" +-> + "Configuration for Kafka Consumer Connector."; +desc(Field) when + Field =:= "get_source"; + Field =:= "put_source"; + Field =:= "post_source" +-> + "Configuration for Kafka Consumer Source."; +desc(Name) -> + throw({missing_desc, ?MODULE, Name}). + +%%------------------------------------------------------------------------------------------------- +%% `emqx_bridge_v2_schema' "unofficial" API +%%------------------------------------------------------------------------------------------------- + +source_examples(Method) -> + [ + #{ + <<"kafka_consumer">> => #{ + summary => <<"Kafka Consumer Source">>, + value => source_example(Method) + } + } + ]. + +connector_examples(Method) -> + [ + #{ + <<"kafka_consumer">> => #{ + summary => <<"Kafka Consumer Connector">>, + value => connector_example(Method) + } + } + ]. + +%%------------------------------------------------------------------------------------------------- +%% Helper fns +%%------------------------------------------------------------------------------------------------- + +source_example(post) -> + maps:merge( + source_example(put), + #{ + type => <<"kafka_consumer">>, + name => <<"my_source">> + } + ); +source_example(get) -> + maps:merge( + source_example(put), + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ] + } + ); +source_example(put) -> + #{ + parameters => + #{ + topic => <<"mytopic">> + }, + resource_opts => + #{ + health_check_interval => <<"30s">> + } + }. + +connector_example(get) -> + maps:merge( + connector_example(post), + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ] + } + ); +connector_example(post) -> + maps:merge( + connector_example(put), + #{ + type => <<"kafka_consumer">>, + name => <<"my_connector">> + } + ); +connector_example(put) -> + #{ + bootstrap_hosts => <<"kafka.emqx.net:9092">>, + resource_opts => + #{ + start_after_created => true, + health_check_interval => <<"30s">>, + start_timeout => <<"5s">> + } + }. diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl index 61ccf61f4b..e4f2376ecf 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl @@ -11,7 +11,12 @@ query_mode/1, on_start/2, on_stop/2, - on_get_status/2 + on_get_status/2, + + on_add_channel/4, + on_remove_channel/3, + on_get_channels/1, + on_get_channel_status/3 ]). %% `brod_group_consumer' API @@ -30,45 +35,57 @@ -include_lib("brod/include/brod.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). --type config() :: #{ +-type connector_config() :: #{ authentication := term(), bootstrap_hosts := binary(), - bridge_name := atom(), - kafka := #{ - max_batch_bytes := emqx_schema:bytesize(), - max_rejoin_attempts := non_neg_integer(), - offset_commit_interval_seconds := pos_integer(), - offset_reset_policy := offset_reset_policy(), - topic := binary() - }, - topic_mapping := nonempty_list( - #{ - kafka_topic := kafka_topic(), - mqtt_topic := emqx_types:topic(), - qos := emqx_types:qos(), - payload_template := string() - } - ), + connector_name := atom() | binary(), + connector_type := atom() | binary(), + socket_opts := _, ssl := _, any() => term() }. +-type source_config() :: #{ + bridge_name := atom(), + hookpoints := [binary()], + parameters := source_parameters() +}. +-type source_parameters() :: #{ + key_encoding_mode := encoding_mode(), + max_batch_bytes := emqx_schema:bytesize(), + max_rejoin_attempts := non_neg_integer(), + offset_commit_interval_seconds := pos_integer(), + offset_reset_policy := offset_reset_policy(), + topic := kafka_topic(), + value_encoding_mode := encoding_mode(), + topic_mapping => [one_topic_mapping()] +}. +-type one_topic_mapping() :: #{ + kafka_topic => kafka_topic(), + mqtt_topic => emqx_types:topic(), + qos => emqx_types:qos(), + payload_template => string() +}. -type subscriber_id() :: emqx_bridge_kafka_consumer_sup:child_id(). -type kafka_topic() :: brod:topic(). -type kafka_message() :: #kafka_message{}. --type state() :: #{ - kafka_topics := nonempty_list(kafka_topic()), +-type connector_state() :: #{ + kafka_client_id := brod:client_id(), + installed_sources := #{source_resource_id() => source_state()} +}. +-type source_state() :: #{ subscriber_id := subscriber_id(), - kafka_client_id := brod:client_id() + kafka_client_id := brod:client_id(), + kafka_topics := [kafka_topic()] }. -type offset_reset_policy() :: latest | earliest. -type encoding_mode() :: none | base64. -type consumer_init_data() :: #{ - hookpoint := binary(), + hookpoints := [binary()], key_encoding_mode := encoding_mode(), - resource_id := resource_id(), + resource_id := source_resource_id(), topic_mapping := #{ kafka_topic() := #{ - payload_template := emqx_placeholder:tmpl_token(), + payload_template => emqx_placeholder:tmpl_token(), mqtt_topic_template => emqx_placeholder:tmpl_token(), qos => emqx_types:qos() } @@ -76,13 +93,13 @@ value_encoding_mode := encoding_mode() }. -type consumer_state() :: #{ - hookpoint := binary(), - kafka_topic := binary(), + hookpoints := [binary()], + kafka_topic := kafka_topic(), key_encoding_mode := encoding_mode(), - resource_id := resource_id(), + resource_id := source_resource_id(), topic_mapping := #{ kafka_topic() := #{ - payload_template := emqx_placeholder:tmpl_token(), + payload_template => emqx_placeholder:tmpl_token(), mqtt_topic_template => emqx_placeholder:tmpl_token(), qos => emqx_types:qos() } @@ -90,7 +107,7 @@ value_encoding_mode := encoding_mode() }. -type subscriber_init_info() :: #{ - topic => brod:topic(), + topic := brod:topic(), parition => brod:partition(), group_id => brod:group_id(), commit_fun => brod_group_subscriber_v2:commit_fun() @@ -103,7 +120,7 @@ %% Allocatable resources -define(kafka_client_id, kafka_client_id). --define(kafka_subscriber_id, kafka_subscriber_id). +-define(kafka_subscriber_ids, kafka_subscriber_ids). %%------------------------------------------------------------------------------------- %% `emqx_resource' API @@ -116,27 +133,19 @@ callback_mode() -> query_mode(_Config) -> no_queries. --spec on_start(resource_id(), config()) -> {ok, state()}. -on_start(ResourceId, Config) -> +-spec on_start(connector_resource_id(), connector_config()) -> {ok, connector_state()}. +on_start(ConnectorResId, Config) -> #{ authentication := Auth, bootstrap_hosts := BootstrapHosts0, - bridge_type := BridgeType, - bridge_name := BridgeName, - hookpoint := _, - kafka := #{ - max_batch_bytes := _, - max_rejoin_attempts := _, - offset_commit_interval_seconds := _, - offset_reset_policy := _ - }, + connector_type := ConnectorType, + connector_name := ConnectorName, socket_opts := SocketOpts0, - ssl := SSL, - topic_mapping := _ + ssl := SSL } = Config, BootstrapHosts = emqx_bridge_kafka_impl:hosts(BootstrapHosts0), %% Note: this is distinct per node. - ClientID = make_client_id(ResourceId, BridgeType, BridgeName), + ClientID = make_client_id(ConnectorResId, ConnectorType, ConnectorName), ClientOpts0 = case Auth of none -> []; @@ -145,34 +154,37 @@ on_start(ResourceId, Config) -> ClientOpts = add_ssl_opts(ClientOpts0, SSL), SocketOpts = emqx_bridge_kafka_impl:socket_opts(SocketOpts0), ClientOpts1 = [{extra_sock_opts, SocketOpts} | ClientOpts], - ok = emqx_resource:allocate_resource(ResourceId, ?kafka_client_id, ClientID), + ok = emqx_resource:allocate_resource(ConnectorResId, ?kafka_client_id, ClientID), case brod:start_client(BootstrapHosts, ClientID, ClientOpts1) of ok -> ?tp( kafka_consumer_client_started, - #{client_id => ClientID, resource_id => ResourceId} + #{client_id => ClientID, resource_id => ConnectorResId} ), ?SLOG(info, #{ msg => "kafka_consumer_client_started", - resource_id => ResourceId, + resource_id => ConnectorResId, kafka_hosts => BootstrapHosts }); {error, Reason} -> ?SLOG(error, #{ msg => "failed_to_start_kafka_consumer_client", - resource_id => ResourceId, + resource_id => ConnectorResId, kafka_hosts => BootstrapHosts, reason => emqx_utils:redact(Reason) }), throw(?CLIENT_DOWN_MESSAGE) end, - start_consumer(Config, ResourceId, ClientID). - --spec on_stop(resource_id(), state()) -> ok. -on_stop(ResourceId, _State = undefined) -> - case emqx_resource:get_allocated_resources(ResourceId) of - #{?kafka_client_id := ClientID, ?kafka_subscriber_id := SubscriberId} -> - stop_subscriber(SubscriberId), + {ok, #{ + kafka_client_id => ClientID, + installed_sources => #{} + }}. + +-spec on_stop(resource_id(), connector_state()) -> ok. +on_stop(ConnectorResId, _State = undefined) -> + case emqx_resource:get_allocated_resources(ConnectorResId) of + #{?kafka_client_id := ClientID, ?kafka_subscriber_ids := SubscriberIds} -> + lists:foreach(fun stop_subscriber/1, SubscriberIds), stop_client(ClientID), ?tp(kafka_consumer_subcriber_and_client_stopped, #{}), ok; @@ -183,29 +195,91 @@ on_stop(ResourceId, _State = undefined) -> _ -> ok end; -on_stop(_ResourceId, State) -> +on_stop(ConnectorResId, State) -> #{ - subscriber_id := SubscriberId, + installed_sources := InstalledSources, kafka_client_id := ClientID } = State, - stop_subscriber(SubscriberId), + maps:foreach( + fun(_SourceResId, #{subscriber_id := SubscriberId}) -> + stop_subscriber(SubscriberId) + end, + InstalledSources + ), stop_client(ClientID), + ?tp(kafka_consumer_subcriber_and_client_stopped, #{instance_id => ConnectorResId}), ok. --spec on_get_status(resource_id(), state()) -> connected | disconnected. -on_get_status(_ResourceID, State) -> +-spec on_get_status(resource_id(), connector_state()) -> connected | disconnected. +on_get_status(_ResourceID, _State) -> + ?status_connected. + +-spec on_add_channel( + connector_resource_id(), + connector_state(), + source_resource_id(), + source_config() +) -> + {ok, connector_state()}. +on_add_channel(ConnectorResId, ConnectorState0, SourceResId, SourceConfig) -> #{ - subscriber_id := SubscriberId, kafka_client_id := ClientID, - kafka_topics := KafkaTopics - } = State, - case do_get_status(ClientID, KafkaTopics, SubscriberId) of - {disconnected, Message} -> - {disconnected, State, Message}; - Res -> - Res + installed_sources := InstalledSources0 + } = ConnectorState0, + case start_consumer(SourceConfig, ConnectorResId, SourceResId, ClientID) of + {ok, SourceState} -> + InstalledSources = InstalledSources0#{SourceResId => SourceState}, + ConnectorState = ConnectorState0#{installed_sources := InstalledSources}, + {ok, ConnectorState}; + Error = {error, _} -> + Error end. +-spec on_remove_channel( + connector_resource_id(), + connector_state(), + source_resource_id() +) -> + {ok, connector_state()}. +on_remove_channel(ConnectorResId, ConnectorState0, SourceResId) -> + #{installed_sources := InstalledSources0} = ConnectorState0, + case maps:take(SourceResId, InstalledSources0) of + {SourceState, InstalledSources} -> + #{subscriber_id := SubscriberId} = SourceState, + stop_subscriber(SubscriberId), + deallocate_subscriber_id(ConnectorResId, SubscriberId), + ok; + error -> + InstalledSources = InstalledSources0 + end, + ConnectorState = ConnectorState0#{installed_sources := InstalledSources}, + {ok, ConnectorState}. + +-spec on_get_channels(connector_resource_id()) -> + [{action_resource_id(), source_config()}]. +on_get_channels(ConnectorResId) -> + emqx_bridge_v2:get_channels_for_connector(ConnectorResId). + +-spec on_get_channel_status( + connector_resource_id(), + source_resource_id(), + connector_state() +) -> + ?status_connected | ?status_disconnected. +on_get_channel_status( + _ConnectorResId, + SourceResId, + ConnectorState = #{installed_sources := InstalledSources} +) when is_map_key(SourceResId, InstalledSources) -> + #{kafka_client_id := ClientID} = ConnectorState, + #{ + kafka_topics := KafkaTopics, + subscriber_id := SubscriberId + } = maps:get(SourceResId, InstalledSources), + do_get_status(ClientID, KafkaTopics, SubscriberId); +on_get_channel_status(_ConnectorResId, _SourceResId, _ConnectorState) -> + ?status_disconnected. + %%------------------------------------------------------------------------------------- %% `brod_group_subscriber' API %%------------------------------------------------------------------------------------- @@ -227,18 +301,13 @@ handle_message(Message, State) -> do_handle_message(Message, State) -> #{ - hookpoint := Hookpoint, + hookpoints := Hookpoints, kafka_topic := KafkaTopic, key_encoding_mode := KeyEncodingMode, - resource_id := ResourceId, + resource_id := SourceResId, topic_mapping := TopicMapping, value_encoding_mode := ValueEncodingMode } = State, - #{ - mqtt_topic_template := MQTTTopicTemplate, - qos := MQTTQoS, - payload_template := PayloadTemplate - } = maps:get(KafkaTopic, TopicMapping), FullMessage = #{ headers => maps:from_list(Message#kafka_message.headers), key => encode(Message#kafka_message.key, KeyEncodingMode), @@ -248,16 +317,31 @@ do_handle_message(Message, State) -> ts_type => Message#kafka_message.ts_type, value => encode(Message#kafka_message.value, ValueEncodingMode) }, - Payload = render(FullMessage, PayloadTemplate), - MQTTTopic = render(FullMessage, MQTTTopicTemplate), - MQTTMessage = emqx_message:make(ResourceId, MQTTQoS, MQTTTopic, Payload), - _ = emqx_broker:safe_publish(MQTTMessage), - emqx_hooks:run(Hookpoint, [FullMessage]), - emqx_resource_metrics:received_inc(ResourceId), + LegacyMQTTConfig = maps:get(KafkaTopic, TopicMapping, #{}), + legacy_maybe_publish_mqtt_message(LegacyMQTTConfig, SourceResId, FullMessage), + lists:foreach(fun(Hookpoint) -> emqx_hooks:run(Hookpoint, [FullMessage]) end, Hookpoints), + emqx_resource_metrics:received_inc(SourceResId), %% note: just `ack' does not commit the offset to the %% kafka consumer group. {ok, commit, State}. +legacy_maybe_publish_mqtt_message( + _MQTTConfig = #{ + payload_template := PayloadTemplate, + qos := MQTTQoS, + mqtt_topic_template := MQTTTopicTemplate + }, + SourceResId, + FullMessage +) when MQTTTopicTemplate =/= <<>> -> + Payload = render(FullMessage, PayloadTemplate), + MQTTTopic = render(FullMessage, MQTTTopicTemplate), + MQTTMessage = emqx_message:make(SourceResId, MQTTQoS, MQTTTopic, Payload), + _ = emqx_broker:safe_publish(MQTTMessage), + ok; +legacy_maybe_publish_mqtt_message(_MQTTConfig, _SourceResId, _FullMessage) -> + ok. + %%------------------------------------------------------------------------------------- %% Helper fns %%------------------------------------------------------------------------------------- @@ -292,28 +376,33 @@ ensure_consumer_supervisor_started() -> ok end. --spec start_consumer(config(), resource_id(), brod:client_id()) -> {ok, state()}. -start_consumer(Config, ResourceId, ClientID) -> +-spec start_consumer( + source_config(), + connector_resource_id(), + source_resource_id(), + brod:client_id() +) -> + {ok, source_state()} | {error, term()}. +start_consumer(Config, ConnectorResId, SourceResId, ClientID) -> #{ - bootstrap_hosts := BootstrapHosts0, bridge_name := BridgeName, - hookpoint := Hookpoint, - kafka := #{ + hookpoints := Hookpoints, + parameters := #{ + key_encoding_mode := KeyEncodingMode, max_batch_bytes := MaxBatchBytes, max_rejoin_attempts := MaxRejoinAttempts, offset_commit_interval_seconds := OffsetCommitInterval, - offset_reset_policy := OffsetResetPolicy0 - }, - key_encoding_mode := KeyEncodingMode, - topic_mapping := TopicMapping0, - value_encoding_mode := ValueEncodingMode + offset_reset_policy := OffsetResetPolicy0, + topic := _Topic, + value_encoding_mode := ValueEncodingMode + } = Params0 } = Config, ok = ensure_consumer_supervisor_started(), - TopicMapping = convert_topic_mapping(TopicMapping0), + TopicMapping = ensure_topic_mapping(Params0), InitialState = #{ key_encoding_mode => KeyEncodingMode, - hookpoint => Hookpoint, - resource_id => ResourceId, + hookpoints => Hookpoints, + resource_id => SourceResId, topic_mapping => TopicMapping, value_encoding_mode => ValueEncodingMode }, @@ -355,30 +444,38 @@ start_consumer(Config, ResourceId, ClientID) -> %% automatically, so we should not spawn duplicate workers. SubscriberId = make_subscriber_id(BridgeName), ?tp(kafka_consumer_about_to_start_subscriber, #{}), - ok = emqx_resource:allocate_resource(ResourceId, ?kafka_subscriber_id, SubscriberId), + ok = allocate_subscriber_id(ConnectorResId, SubscriberId), ?tp(kafka_consumer_subscriber_allocated, #{}), case emqx_bridge_kafka_consumer_sup:start_child(SubscriberId, GroupSubscriberConfig) of {ok, _ConsumerPid} -> ?tp( kafka_consumer_subscriber_started, - #{resource_id => ResourceId, subscriber_id => SubscriberId} + #{resource_id => SourceResId, subscriber_id => SubscriberId} ), {ok, #{ subscriber_id => SubscriberId, kafka_client_id => ClientID, kafka_topics => KafkaTopics }}; - {error, Reason2} -> + {error, Reason} -> ?SLOG(error, #{ msg => "failed_to_start_kafka_consumer", - resource_id => ResourceId, - kafka_hosts => emqx_bridge_kafka_impl:hosts(BootstrapHosts0), - reason => emqx_utils:redact(Reason2) + resource_id => SourceResId, + reason => emqx_utils:redact(Reason) }), - stop_client(ClientID), - throw(failed_to_start_kafka_consumer) + {error, Reason} end. +%% This is to ensure backwards compatibility with the deprectated topic mapping. +-spec ensure_topic_mapping(source_parameters()) -> #{kafka_topic() := map()}. +ensure_topic_mapping(#{topic_mapping := [_ | _] = TM}) -> + %% There is an existing topic mapping: legacy config. We use it and ignore the single + %% pubsub topic so that the bridge keeps working as before. + convert_topic_mapping(TM); +ensure_topic_mapping(#{topic := KafkaTopic}) -> + %% No topic mapping: generate one without MQTT templates. + #{KafkaTopic => #{}}. + -spec stop_subscriber(emqx_bridge_kafka_consumer_sup:child_id()) -> ok. stop_subscriber(SubscriberId) -> _ = log_when_error( @@ -415,36 +512,38 @@ do_get_status(ClientID, [KafkaTopic | RestTopics], SubscriberId) -> case brod:get_partitions_count(ClientID, KafkaTopic) of {ok, NPartitions} -> case do_get_topic_status(ClientID, KafkaTopic, SubscriberId, NPartitions) of - connected -> do_get_status(ClientID, RestTopics, SubscriberId); - disconnected -> disconnected + ?status_connected -> + do_get_status(ClientID, RestTopics, SubscriberId); + ?status_disconnected -> + ?status_disconnected end; {error, {client_down, Context}} -> case infer_client_error(Context) of auth_error -> Message = "Authentication error. " ++ ?CLIENT_DOWN_MESSAGE, - {disconnected, Message}; + {?status_disconnected, Message}; {auth_error, Message0} -> Message = binary_to_list(Message0) ++ "; " ++ ?CLIENT_DOWN_MESSAGE, - {disconnected, Message}; + {?status_disconnected, Message}; connection_refused -> Message = "Connection refused. " ++ ?CLIENT_DOWN_MESSAGE, - {disconnected, Message}; + {?status_disconnected, Message}; _ -> - {disconnected, ?CLIENT_DOWN_MESSAGE} + {?status_disconnected, ?CLIENT_DOWN_MESSAGE} end; {error, leader_not_available} -> Message = "Leader connection not available. Please check the Kafka topic used," " the connection parameters and Kafka cluster health", - {disconnected, Message}; + {?status_disconnected, Message}; _ -> - disconnected + ?status_disconnected end; do_get_status(_ClientID, _KafkaTopics = [], _SubscriberId) -> - connected. + ?status_connected. -spec do_get_topic_status(brod:client_id(), binary(), subscriber_id(), pos_integer()) -> - connected | disconnected. + ?status_connected | ?status_disconnected. do_get_topic_status(ClientID, KafkaTopic, SubscriberId, NPartitions) -> Results = lists:map( @@ -467,9 +566,9 @@ do_get_topic_status(ClientID, KafkaTopic, SubscriberId, NPartitions) -> WorkersAlive = are_subscriber_workers_alive(SubscriberId), case AllLeadersOk andalso WorkersAlive of true -> - connected; + ?status_connected; false -> - disconnected + ?status_disconnected end. are_subscriber_workers_alive(SubscriberId) -> @@ -507,19 +606,19 @@ consumer_group_id(BridgeName0) -> BridgeName = to_bin(BridgeName0), <<"emqx-kafka-consumer-", BridgeName/binary>>. --spec is_dry_run(resource_id()) -> boolean(). -is_dry_run(ResourceId) -> - TestIdStart = string:find(ResourceId, ?TEST_ID_PREFIX), +-spec is_dry_run(connector_resource_id()) -> boolean(). +is_dry_run(ConnectorResId) -> + TestIdStart = string:find(ConnectorResId, ?TEST_ID_PREFIX), case TestIdStart of nomatch -> false; _ -> - string:equal(TestIdStart, ResourceId) + string:equal(TestIdStart, ConnectorResId) end. --spec make_client_id(resource_id(), binary(), atom() | binary()) -> atom(). -make_client_id(ResourceId, BridgeType, BridgeName) -> - case is_dry_run(ResourceId) of +-spec make_client_id(connector_resource_id(), binary(), atom() | binary()) -> atom(). +make_client_id(ConnectorResId, BridgeType, BridgeName) -> + case is_dry_run(ConnectorResId) of false -> ClientID0 = emqx_bridge_kafka_impl:make_client_id(BridgeType, BridgeName), binary_to_atom(ClientID0); @@ -583,3 +682,19 @@ infer_client_error(Error) -> _ -> undefined end. + +allocate_subscriber_id(ConnectorResId, SubscriberId) -> + AllocatedResources = emqx_resource:get_allocated_resources(ConnectorResId), + AllocatedSubscriberIds0 = maps:get(?kafka_subscriber_ids, AllocatedResources, []), + AllocatedSubscriberIds = lists:usort([SubscriberId | AllocatedSubscriberIds0]), + ok = emqx_resource:allocate_resource( + ConnectorResId, ?kafka_subscriber_ids, AllocatedSubscriberIds + ). + +deallocate_subscriber_id(ConnectorResId, SubscriberId) -> + AllocatedResources = emqx_resource:get_allocated_resources(ConnectorResId), + AllocatedSubscriberIds0 = maps:get(?kafka_subscriber_ids, AllocatedResources, []), + AllocatedSubscriberIds = AllocatedSubscriberIds0 -- [SubscriberId], + ok = emqx_resource:allocate_resource( + ConnectorResId, ?kafka_subscriber_ids, AllocatedSubscriberIds + ). diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl index 2800c0a1be..b6827235e8 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl @@ -15,6 +15,8 @@ -import(emqx_common_test_helpers, [on_exit/1]). -define(BRIDGE_TYPE_BIN, <<"kafka_consumer">>). +-define(CONNECTOR_TYPE_BIN, <<"kafka_consumer">>). +-define(SOURCE_TYPE_BIN, <<"kafka_consumer">>). -define(APPS, [emqx_bridge, emqx_resource, emqx_rule_engine, emqx_bridge_kafka]). %%------------------------------------------------------------------------------ @@ -78,13 +80,29 @@ testcases(once) -> ]. init_per_suite(Config) -> - [{bridge_type, <<"kafka_consumer">>} | Config]. + emqx_common_test_helpers:clear_screen(), + Apps = emqx_cth_suite:start( + [ + emqx, + emqx_conf, + emqx_bridge_kafka, + emqx_bridge, + emqx_rule_engine, + emqx_management, + emqx_mgmt_api_test_util:emqx_dashboard() + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + {ok, _Api} = emqx_common_test_http:create_default_app(), + [ + {apps, Apps}, + {bridge_type, <<"kafka_consumer">>} + | Config + ]. -end_per_suite(_Config) -> - emqx_mgmt_api_test_util:end_suite(), - ok = emqx_common_test_helpers:stop_apps([emqx_conf]), - ok = emqx_connector_test_helpers:stop_apps(lists:reverse(?APPS)), - _ = application:stop(emqx_connector), +end_per_suite(Config) -> + Apps = ?config(apps, Config), + emqx_cth_suite:stop(Apps), ok. init_per_group(plain = Type, Config) -> @@ -242,11 +260,6 @@ common_init_per_group() -> ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), - application:load(emqx_bridge), - ok = emqx_common_test_helpers:start_apps([emqx_conf]), - ok = emqx_connector_test_helpers:start_apps(?APPS), - {ok, _} = application:ensure_all_started(emqx_connector), - emqx_mgmt_api_test_util:init_suite(), UniqueNum = integer_to_binary(erlang:unique_integer()), MQTTTopic = <<"mqtt/topic/", UniqueNum/binary>>, [ @@ -262,7 +275,7 @@ common_end_per_group(Config) -> ProxyHost = ?config(proxy_host, Config), ProxyPort = ?config(proxy_port, Config), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), - delete_all_bridges(), + emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), ok. end_per_group(Group, Config) when @@ -327,7 +340,7 @@ init_per_testcase(TestCase, Config) -> common_init_per_testcase(TestCase, Config0) -> ct:timetrap(timer:seconds(60)), - delete_all_bridges(), + emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), emqx_config:delete_override_conf_files(), KafkaTopic0 = << @@ -363,7 +376,12 @@ common_init_per_testcase(TestCase, Config0) -> {kafka_name, Name}, {kafka_config_string, ConfigString}, {kafka_config, KafkaConfig}, - {kafka_producers, ProducersConfigs} + {kafka_producers, ProducersConfigs}, + {bridge_kind, source}, + {connector_name, Name}, + {connector_type, ?CONNECTOR_TYPE_BIN}, + {source_type, ?SOURCE_TYPE_BIN}, + {source_name, Name} | Config ]. @@ -372,7 +390,7 @@ end_per_testcase(_Testcase, Config) -> ProxyPort = ?config(proxy_port, Config), ProducersConfigs = ?config(kafka_producers, Config), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), - delete_all_bridges(), + emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), #{clientid := KafkaProducerClientId, producers := ProducersMapping} = ProducersConfigs, lists:foreach( @@ -681,19 +699,6 @@ create_bridge_wait_for_balance(Config) -> kill_group_subscriber_spy() end. -delete_bridge(Config) -> - Type = ?BRIDGE_TYPE_BIN, - Name = ?config(kafka_name, Config), - emqx_bridge:remove(Type, Name). - -delete_all_bridges() -> - lists:foreach( - fun(#{name := Name, type := Type}) -> - emqx_bridge:remove(Type, Name) - end, - emqx_bridge:list() - ). - create_bridge_api(Config) -> create_bridge_api(Config, _Overrides = #{}). @@ -752,9 +757,8 @@ send_message(Config, Payload) -> emqx_bridge:send_message(BridgeId, Payload). resource_id(Config) -> - Type = ?BRIDGE_TYPE_BIN, Name = ?config(kafka_name, Config), - emqx_bridge_resource:resource_id(Type, Name). + emqx_bridge_v2:source_id(?SOURCE_TYPE_BIN, Name, Name). instance_id(Config) -> ResourceId = resource_id(Config), @@ -1084,6 +1088,12 @@ cluster(Config) -> ct:pal("cluster: ~p", [Cluster]), Cluster. +start_peer(Name, Opts) -> + Node = emqx_common_test_helpers:start_peer(Name, Opts), + % Make it possible to call `ct:pal` and friends (if running under rebar3) + _ = emqx_cth_cluster:share_load_module(Node, cthr), + Node. + start_async_publisher(Config, KafkaTopic) -> TId = ets:new(kafka_payloads, [public, ordered_set]), Loop = fun Go() -> @@ -1129,6 +1139,15 @@ kill_resource_managers() -> supervisor:which_children(emqx_resource_manager_sup) ). +health_check(Config) -> + health_check(node(), Config). + +health_check(Node, Config) -> + erpc:call(Node, fun() -> + #{status := Status} = emqx_bridge_v2_testlib:health_check_channel(Config), + {ok, Status} + end). + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -1344,19 +1363,13 @@ t_on_get_status(Config) -> ProxyPort = ?config(proxy_port, Config), ProxyHost = ?config(proxy_host, Config), ProxyName = ?config(proxy_name, Config), - KafkaName = ?config(kafka_name, Config), - ResourceId = emqx_bridge_resource:resource_id(kafka_consumer, KafkaName), ?assertMatch( {ok, _}, create_bridge(Config) ), - %% Since the connection process is async, we give it some time to - %% stabilize and avoid flakiness. - ct:sleep(1_200), - ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)), + ?retry(100, 20, ?assertEqual({ok, connected}, health_check(Config))), emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> - ct:sleep(500), - ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)) + ?retry(100, 20, ?assertEqual({ok, disconnected}, health_check(Config))) end), ok. @@ -1390,14 +1403,16 @@ t_failed_creation_then_fixed(Config) -> ?assertMatch( {{ok, _}, {ok, _}}, ?wait_async_action( - update_bridge_api(Config), + update_bridge_api(Config, #{ + <<"resource_opts">> => + #{<<"health_check_interval">> => <<"1s">>} + }), #{?snk_kind := kafka_consumer_subscriber_started}, 60_000 ) ), wait_until_subscribers_are_ready(NPartitions, 120_000), - ResourceId = resource_id(Config), - ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)), + ?assertEqual({ok, connected}, health_check(Config)), ping_until_healthy(Config, _Period = 1_500, _Timeout = 24_000), {ok, C} = emqtt:start_link(), @@ -1459,7 +1474,6 @@ t_receive_after_recovery(Config) -> KafkaName = ?config(kafka_name, Config), KafkaNameA = binary_to_atom(KafkaName), KafkaClientId = consumer_clientid(Config), - ResourceId = resource_id(Config), ?check_trace( begin {ok, _} = create_bridge( @@ -1467,7 +1481,7 @@ t_receive_after_recovery(Config) -> #{<<"kafka">> => #{<<"offset_reset_policy">> => <<"earliest">>}} ), ping_until_healthy(Config, _Period = 1_500, _Timeout0 = 24_000), - {ok, connected} = emqx_resource_manager:health_check(ResourceId), + {ok, connected} = health_check(Config), %% 0) ensure each partition commits its offset so it can %% recover later. Messages0 = [ @@ -1718,14 +1732,13 @@ t_cluster_group(Config) -> NPartitions = ?config(num_partitions, Config), KafkaTopic = ?config(kafka_topic, Config), KafkaName = ?config(kafka_name, Config), - ResourceId = resource_id(Config), BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, KafkaName), Cluster = cluster(Config), ?check_trace( begin Nodes = [_N1, N2 | _] = [ - emqx_common_test_helpers:start_peer(Name, Opts) + start_peer(Name, Opts) || {Name, Opts} <- Cluster ], on_exit(fun() -> @@ -1765,7 +1778,7 @@ t_cluster_group(Config) -> fun(N) -> ?assertEqual( {ok, connected}, - erpc:call(N, emqx_resource_manager, health_check, [ResourceId]), + health_check(N, Config), #{node => N} ) end, @@ -1801,14 +1814,13 @@ t_node_joins_existing_cluster(Config) -> NPartitions = ?config(num_partitions, Config), KafkaTopic = ?config(kafka_topic, Config), KafkaName = ?config(kafka_name, Config), - ResourceId = resource_id(Config), BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, KafkaName), Cluster = cluster(Config), ?check_trace( begin [{Name1, Opts1}, {Name2, Opts2} | _] = Cluster, ct:pal("starting ~p", [Name1]), - N1 = emqx_common_test_helpers:start_peer(Name1, Opts1), + N1 = start_peer(Name1, Opts1), on_exit(fun() -> ct:pal("stopping ~p", [N1]), ok = emqx_common_test_helpers:stop_peer(N1) @@ -1834,7 +1846,7 @@ t_node_joins_existing_cluster(Config) -> {ok, _} = wait_until_group_is_balanced(KafkaTopic, NPartitions, [N1], 30_000), ?assertEqual( {ok, connected}, - erpc:call(N1, emqx_resource_manager, health_check, [ResourceId]) + health_check(N1, Config) ), %% Now, we start the second node and have it join the cluster. @@ -1851,7 +1863,7 @@ t_node_joins_existing_cluster(Config) -> 30_000 ), ct:pal("starting ~p", [Name2]), - N2 = emqx_common_test_helpers:start_peer(Name2, Opts2), + N2 = start_peer(Name2, Opts2), on_exit(fun() -> ct:pal("stopping ~p", [N2]), ok = emqx_common_test_helpers:stop_peer(N2) @@ -1944,7 +1956,7 @@ t_cluster_node_down(Config) -> lists:map( fun({Name, Opts}) -> ct:pal("starting ~p", [Name]), - emqx_common_test_helpers:start_peer(Name, Opts) + start_peer(Name, Opts) end, Cluster ), @@ -2130,7 +2142,6 @@ t_resource_manager_crash_after_subscriber_started(Config) -> _ -> ct:fail("unexpected result: ~p", [Res]) end, - ?assertMatch(ok, delete_bridge(Config)), ?retry( _Sleep = 50, _Attempts = 50, @@ -2143,6 +2154,7 @@ t_resource_manager_crash_after_subscriber_started(Config) -> ok. t_resource_manager_crash_before_subscriber_started(Config) -> + Name = ?config(kafka_name, Config), ?check_trace( begin ?force_ordering( @@ -2183,11 +2195,15 @@ t_resource_manager_crash_before_subscriber_started(Config) -> {ok, _} -> %% the new manager may have had time to startup %% before the resource status cache is read... + {ok, {{_, 204, _}, _, _}} = + emqx_bridge_testlib:delete_bridge_http_api_v1(#{ + name => Name, + type => ?BRIDGE_TYPE_BIN + }), ok; _ -> ct:fail("unexpected result: ~p", [Res]) end, - ?assertMatch(ok, delete_bridge(Config)), ?retry( _Sleep = 50, _Attempts = 50, diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl new file mode 100644 index 0000000000..02a7a62794 --- /dev/null +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl @@ -0,0 +1,341 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_v2_kafka_consumer_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-import(emqx_common_test_helpers, [on_exit/1]). + +-define(CONNECTOR_TYPE_BIN, <<"kafka_consumer">>). +-define(SOURCE_TYPE_BIN, <<"kafka_consumer">>). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + All0 = emqx_common_test_helpers:all(?MODULE), + All = All0 -- matrix_cases(), + Groups = lists:map(fun({G, _, _}) -> {group, G} end, groups()), + Groups ++ All. + +groups() -> + emqx_common_test_helpers:matrix_to_groups(?MODULE, matrix_cases()). + +matrix_cases() -> + [ + t_start_stop + ]. + +init_per_suite(Config) -> + emqx_bridge_kafka_impl_consumer_SUITE:init_per_suite(Config). + +end_per_suite(Config) -> + emqx_bridge_kafka_impl_consumer_SUITE:end_per_suite(Config). + +init_per_testcase(TestCase, Config) -> + common_init_per_testcase(TestCase, Config). + +common_init_per_testcase(TestCase, Config0) -> + ct:timetrap({seconds, 60}), + UniqueNum = integer_to_binary(erlang:unique_integer()), + Name = <<(atom_to_binary(TestCase))/binary, UniqueNum/binary>>, + ConnectorConfig = connector_config(Name, Config0), + Topic = Name, + SourceConfig = source_config(#{ + connector => Name, + parameters => #{topic => Topic} + }), + Config1 = ensure_topic_and_producers(ConnectorConfig, SourceConfig, TestCase, Config0), + ct:comment(get_matrix_params(Config1)), + [ + {kafka_topic, Topic}, + {bridge_kind, source}, + {source_type, ?SOURCE_TYPE_BIN}, + {source_name, Name}, + {source_config, SourceConfig}, + {connector_name, Name}, + {connector_type, ?CONNECTOR_TYPE_BIN}, + {connector_config, ConnectorConfig}, + {proxy_host, "toxiproxy"}, + {proxy_port, 8474} + | Config1 + ]. + +end_per_testcase(TestCase, Config) -> + emqx_bridge_v2_testlib:end_per_testcase(TestCase, Config), + ok. + +auth_config(Config) -> + AuthType0 = maps:get(auth, get_matrix_params(Config)), + AuthType = + case AuthType0 of + none -> none; + sasl_auth_plain -> plain; + sasl_auth_scram256 -> scram_sha_256; + sasl_auth_scram512 -> scram_sha_512; + sasl_auth_kerberos -> kerberos + end, + {ok, #{<<"authentication">> := Auth}} = + hocon:binary(emqx_bridge_kafka_impl_consumer_SUITE:authentication(AuthType)), + Auth. + +get_matrix_params(Config) -> + case group_path(Config) of + undefined -> + #{ + host => <<"toxiproxy.emqx.net">>, + port => 9292, + tls => plain, + auth => none, + proxy_name => "kafka_plain" + }; + [TLS, Auth | _] -> + #{ + host => <<"toxiproxy.emqx.net">>, + port => toxiproxy_kafka_port(#{tls => TLS, auth => Auth}), + tls => TLS, + auth => Auth, + proxy_name => toxiproxy_proxy_name(#{tls => TLS, auth => Auth}) + } + end. + +toxiproxy_kafka_port(#{tls := plain, auth := none}) -> 9292; +toxiproxy_kafka_port(#{tls := tls, auth := none}) -> 9294; +toxiproxy_kafka_port(#{tls := tls, auth := sasl_auth_kerberos}) -> 9095; +toxiproxy_kafka_port(#{tls := plain, auth := sasl_auth_kerberos}) -> 9093; +toxiproxy_kafka_port(#{tls := plain, auth := _}) -> 9293; +toxiproxy_kafka_port(#{tls := tls, auth := _}) -> 9295. + +toxiproxy_proxy_name(#{tls := plain, auth := none}) -> "kafka_plain"; +toxiproxy_proxy_name(#{tls := tls, auth := none}) -> "kafka_ssl"; +toxiproxy_proxy_name(#{tls := plain, auth := _}) -> "kafka_sasl_plain"; +toxiproxy_proxy_name(#{tls := tls, auth := _}) -> "kafka_sasl_ssl". + +toxiproxy_host(#{auth := sasl_auth_kerberos}) -> <<"kafka-1.emqx.net">>; +toxiproxy_host(_) -> <<"toxiproxy.emqx.net">>. + +group_path(Config) -> + case emqx_common_test_helpers:group_path(Config) of + [] -> + undefined; + Path -> + Path + end. + +merge(Maps) -> + lists:foldl(fun(M, Acc) -> emqx_utils_maps:deep_merge(Acc, M) end, #{}, Maps). + +ensure_topic_and_producers(ConnectorConfig, SourceConfig, TestCase, TCConfig) -> + #{tls := TLS, auth := Auth} = get_matrix_params(TCConfig), + Topic = emqx_utils_maps:deep_get([<<"parameters">>, <<"topic">>], SourceConfig), + [{Host, Port}] = emqx_bridge_kafka_impl:hosts(maps:get(<<"bootstrap_hosts">>, ConnectorConfig)), + CreateConfig = maps:to_list(#{ + topic_mapping => [#{kafka_topic => Topic}], + kafka_host => Host, + kafka_port => Port, + direct_kafka_host => Host, + direct_kafka_port => Port, + use_tls => TLS =:= tls, + use_sasl => Auth =/= none, + num_partitions => 1 + }), + ok = emqx_bridge_kafka_impl_consumer_SUITE:ensure_topics(CreateConfig), + ProducerConfigs = emqx_bridge_kafka_impl_consumer_SUITE:start_producers(TestCase, CreateConfig), + [{kafka_producers, ProducerConfigs} | TCConfig]. + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + +connector_config(Name, Config) -> + connector_config1( + Name, + connector_overrides(Config) + ). + +connector_config1(Name, Overrides0 = #{}) -> + Overrides = emqx_utils_maps:binary_key_map(Overrides0), + InnerConfigMap0 = + #{ + <<"enable">> => true, + <<"tags">> => [<<"bridge">>], + <<"description">> => <<"my cool bridge">>, + + <<"authentication">> => <<"please override">>, + <<"bootstrap_hosts">> => <<"please override">>, + <<"connect_timeout">> => <<"5s">>, + <<"metadata_request_timeout">> => <<"5s">>, + <<"min_metadata_refresh_interval">> => <<"3s">>, + + <<"resource_opts">> => + #{ + <<"health_check_interval">> => <<"2s">>, + <<"start_after_created">> => true, + <<"start_timeout">> => <<"5s">> + } + }, + InnerConfigMap = emqx_utils_maps:deep_merge(InnerConfigMap0, Overrides), + emqx_bridge_v2_testlib:parse_and_check_connector(?SOURCE_TYPE_BIN, Name, InnerConfigMap). + +connector_overrides(TCConfig) -> + MatrixParams = #{tls := TLS} = get_matrix_params(TCConfig), + Host = toxiproxy_host(MatrixParams), + Port = toxiproxy_kafka_port(MatrixParams), + BootstrapHosts = <>, + AuthConfig = auth_config(TCConfig), + #{ + <<"bootstrap_hosts">> => BootstrapHosts, + <<"authentication">> => AuthConfig, + <<"ssl">> => #{<<"enable">> => TLS =:= tls} + }. + +source_config(Overrides0) -> + Overrides = emqx_utils_maps:binary_key_map(Overrides0), + CommonConfig = + #{ + <<"enable">> => true, + <<"connector">> => <<"please override">>, + <<"parameters">> => + #{ + <<"key_encoding_mode">> => <<"none">>, + <<"max_batch_bytes">> => <<"896KB">>, + <<"max_rejoin_attempts">> => <<"5">>, + <<"offset_reset_policy">> => <<"latest">>, + <<"topic">> => <<"please override">>, + <<"value_encoding_mode">> => <<"none">> + }, + <<"resource_opts">> => #{ + <<"health_check_interval">> => <<"2s">>, + <<"resume_interval">> => <<"2s">> + } + }, + maps:merge(CommonConfig, Overrides). + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_start_stop(matrix) -> + [ + [plain, none], + [plain, sasl_auth_plain], + [plain, sasl_auth_scram256], + [plain, sasl_auth_scram512], + [plain, sasl_auth_kerberos], + [tls, none], + [tls, sasl_auth_plain] + ]; +t_start_stop(Config) -> + ok = emqx_bridge_v2_testlib:t_start_stop(Config, kafka_consumer_subcriber_and_client_stopped), + ok. + +t_create_via_http(Config) -> + ok = emqx_bridge_v2_testlib:t_create_via_http(Config), + ok. + +t_consume(Config) -> + Topic = ?config(kafka_topic, Config), + NumPartitions = 1, + Key = <<"mykey">>, + Payload = #{<<"key">> => <<"value">>}, + Encoded = emqx_utils_json:encode(Payload), + Headers = [{<<"hkey">>, <<"hvalue">>}], + HeadersMap = maps:from_list(Headers), + ProduceFn = fun() -> + emqx_bridge_kafka_impl_consumer_SUITE:publish( + Config, + Topic, + [ + #{ + key => Key, + value => Encoded, + headers => Headers + } + ] + ) + end, + CheckFn = fun(Message) -> + ?assertMatch( + #{ + headers := HeadersMap, + key := Key, + offset := _, + topic := Topic, + ts := _, + ts_type := _, + value := Encoded + }, + Message + ) + end, + ok = emqx_bridge_v2_testlib:t_consume( + Config, + #{ + consumer_ready_tracepoint => ?match_n_events( + NumPartitions, + #{?snk_kind := kafka_consumer_subscriber_init} + ), + produce_fn => ProduceFn, + check_fn => CheckFn, + produce_tracepoint => ?match_event( + #{ + ?snk_kind := kafka_consumer_handle_message, + ?snk_span := {complete, _} + } + ) + } + ), + ok. + +t_update_topic(Config) -> + %% Tests that, if a bridge originally has the legacy field `topic_mapping' filled in + %% and later is updated using v2 APIs, then the legacy field is cleared and the new + %% `topic' field is used. + ConnectorConfig = ?config(connector_config, Config), + SourceConfig = ?config(source_config, Config), + Name = ?config(source_name, Config), + V1Config0 = emqx_action_info:connector_action_config_to_bridge_v1_config( + ?SOURCE_TYPE_BIN, + ConnectorConfig, + SourceConfig + ), + V1Config = emqx_utils_maps:deep_put( + [<<"kafka">>, <<"topic_mapping">>], + V1Config0, + [ + #{ + <<"kafka_topic">> => <<"old_topic">>, + <<"mqtt_topic">> => <<"">>, + <<"qos">> => 2, + <<"payload_template">> => <<"template">> + } + ] + ), + %% Note: using v1 API + {ok, {{_, 201, _}, _, _}} = emqx_bridge_testlib:create_bridge_api( + ?SOURCE_TYPE_BIN, + Name, + V1Config + ), + ?assertMatch( + {ok, {{_, 200, _}, _, #{<<"parameters">> := #{<<"topic">> := <<"old_topic">>}}}}, + emqx_bridge_v2_testlib:get_source_api(?SOURCE_TYPE_BIN, Name) + ), + %% Note: we don't add `topic_mapping' again here to the parameters. + {ok, {{_, 200, _}, _, _}} = emqx_bridge_v2_testlib:update_bridge_api( + Config, + #{<<"parameters">> => #{<<"topic">> => <<"new_topic">>}} + ), + ?assertMatch( + {ok, {{_, 200, _}, _, #{<<"parameters">> := #{<<"topic">> := <<"new_topic">>}}}}, + emqx_bridge_v2_testlib:get_source_api(?SOURCE_TYPE_BIN, Name) + ), + ok. diff --git a/apps/emqx_connector/src/emqx_connector_resource.erl b/apps/emqx_connector/src/emqx_connector_resource.erl index 75e9e9a845..213fd8229f 100644 --- a/apps/emqx_connector/src/emqx_connector_resource.erl +++ b/apps/emqx_connector/src/emqx_connector_resource.erl @@ -159,7 +159,7 @@ update(ConnectorId, {OldConf, Conf}) -> update(Type, Name, {OldConf, Conf}) -> update(Type, Name, {OldConf, Conf}, #{}). -update(Type, Name, {OldConf, Conf}, Opts) -> +update(Type, Name, {OldConf, Conf0}, Opts) -> %% TODO: sometimes its not necessary to restart the connector connection. %% %% - if the connection related configs like `servers` is updated, we should restart/start @@ -168,6 +168,7 @@ update(Type, Name, {OldConf, Conf}, Opts) -> %% the `method` or `headers` of a WebHook is changed, then the connector can be updated %% without restarting the connector. %% + Conf = Conf0#{connector_type => bin(Type), connector_name => bin(Name)}, case emqx_utils_maps:if_only_to_toggle_enable(OldConf, Conf) of false -> ?SLOG(info, #{ diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index 23bc5a8b4d..e85de7292d 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -34,6 +34,8 @@ resource_type(gcp_pubsub_producer) -> emqx_bridge_gcp_pubsub_impl_producer; resource_type(hstreamdb) -> emqx_bridge_hstreamdb_connector; +resource_type(kafka_consumer) -> + emqx_bridge_kafka_impl_consumer; resource_type(kafka_producer) -> emqx_bridge_kafka_impl_producer; resource_type(kinesis) -> @@ -156,11 +158,19 @@ connector_structs() -> required => false } )}, + {kafka_consumer, + mk( + hoconsc:map(name, ref(emqx_bridge_kafka_consumer_schema, "config_connector")), + #{ + desc => <<"Kafka Consumer Connector Config">>, + required => false + } + )}, {kafka_producer, mk( hoconsc:map(name, ref(emqx_bridge_kafka, "config_connector")), #{ - desc => <<"Kafka Connector Config">>, + desc => <<"Kafka Producer Connector Config">>, required => false } )}, @@ -344,6 +354,7 @@ schema_modules() -> emqx_bridge_gcp_pubsub_producer_schema, emqx_bridge_hstreamdb, emqx_bridge_kafka, + emqx_bridge_kafka_consumer_schema, emqx_bridge_kinesis, emqx_bridge_matrix, emqx_bridge_mongodb, @@ -392,6 +403,7 @@ api_schemas(Method) -> ), api_ref(emqx_bridge_hstreamdb, <<"hstreamdb">>, Method ++ "_connector"), api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"), + api_ref(emqx_bridge_kafka_consumer_schema, <<"kafka_consumer">>, Method ++ "_connector"), api_ref(emqx_bridge_kinesis, <<"kinesis">>, Method ++ "_connector"), api_ref(emqx_bridge_matrix, <<"matrix">>, Method ++ "_connector"), api_ref(emqx_bridge_mongodb, <<"mongodb">>, Method ++ "_connector"), diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index fc68bbd9d6..68df7a6f59 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -134,6 +134,8 @@ connector_type_to_bridge_types(gcp_pubsub_producer) -> [gcp_pubsub, gcp_pubsub_producer]; connector_type_to_bridge_types(hstreamdb) -> [hstreamdb]; +connector_type_to_bridge_types(kafka_consumer) -> + [kafka_consumer]; connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer]; connector_type_to_bridge_types(kinesis) -> @@ -205,7 +207,11 @@ bridge_configs_to_transform( emqx_utils_maps:deep_get( [<<"actions">>, to_bin(BridgeType), to_bin(BridgeName)], RawConfig, - undefined + emqx_utils_maps:deep_get( + [<<"sources">>, to_bin(BridgeType), to_bin(BridgeName)], + RawConfig, + undefined + ) ), [ {BridgeType, BridgeName, BridgeConf, ConnectorFields, PreviousRawConfig} diff --git a/apps/emqx_connector/test/emqx_connector_SUITE.erl b/apps/emqx_connector/test/emqx_connector_SUITE.erl index 82b5fe11df..5ca5fb315b 100644 --- a/apps/emqx_connector/test/emqx_connector_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_SUITE.erl @@ -136,20 +136,30 @@ t_connector_lifecycle(_Config) -> ?assert(meck:validate(?CONNECTOR)), ?assertMatch( [ - {_, {?CONNECTOR, callback_mode, []}, _}, {_, {?CONNECTOR, on_start, [_, _]}, {ok, connector_state}}, {_, {?CONNECTOR, on_get_status, [_, connector_state]}, connected}, {_, {?CONNECTOR, on_stop, [_, connector_state]}, ok}, - {_, {?CONNECTOR, on_stop, [_, connector_state]}, ok}, {_, {?CONNECTOR, on_start, [_, _]}, {ok, connector_state}}, {_, {?CONNECTOR, on_get_status, [_, connector_state]}, connected}, {_, {?CONNECTOR, on_stop, [_, connector_state]}, ok}, - {_, {?CONNECTOR, callback_mode, []}, _}, {_, {?CONNECTOR, on_start, [_, _]}, {ok, connector_state}}, {_, {?CONNECTOR, on_get_status, [_, connector_state]}, connected}, {_, {?CONNECTOR, on_stop, [_, connector_state]}, ok} ], - meck:history(?CONNECTOR) + lists:filter( + fun({_, {?CONNECTOR, Fun, _Args}, _}) -> + lists:member( + Fun, [ + on_start, + on_stop, + on_get_channels, + on_get_status, + on_add_channel + ] + ) + end, + meck:history(?CONNECTOR) + ) ), ok. diff --git a/changes/ee/feat-12595.en.md b/changes/ee/feat-12595.en.md new file mode 100644 index 0000000000..1930c9b159 --- /dev/null +++ b/changes/ee/feat-12595.en.md @@ -0,0 +1 @@ +The Kafka Consumer bridge has been split into connector and source components. Old Kafka Consumer bridges will be upgraded automatically. diff --git a/rel/i18n/emqx_bridge_kafka_consumer_schema.hocon b/rel/i18n/emqx_bridge_kafka_consumer_schema.hocon new file mode 100644 index 0000000000..6f29d7cc8f --- /dev/null +++ b/rel/i18n/emqx_bridge_kafka_consumer_schema.hocon @@ -0,0 +1,18 @@ +emqx_bridge_kafka_consumer_schema { + + source_parameters.desc: + """Source specific configs.""" + source_parameters.label: + """Source Specific Configs""" + + consumer_source.desc: + """Source configs.""" + consumer_source.label: + """Source""" + + config_connector.desc: + """Configuration for a Kafka Consumer Client.""" + config_connector.label: + """Kafka Consumer Client Configuration""" + +}