Skip to content

Commit

Permalink
Merge 905d04f into 8f780ae
Browse files Browse the repository at this point in the history
  • Loading branch information
thalesmg committed May 2, 2024
2 parents 8f780ae + 905d04f commit 9f710e2
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 11 deletions.
11 changes: 11 additions & 0 deletions apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,17 @@ fields(source_parameters) ->
required => true,
desc => ?DESC(emqx_bridge_kafka, consumer_kafka_topic)
}
)},
{group_id,
mk(
binary(),
#{
required => false,
validator => [
emqx_resource_validator:not_empty("Group id must not be empty")
],
desc => ?DESC(group_id)
}
)}
| Fields
];
Expand Down
11 changes: 7 additions & 4 deletions apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
]).

-ifdef(TEST).
-export([consumer_group_id/1]).
-export([consumer_group_id/2]).
-endif.

-include_lib("emqx/include/logger.hrl").
Expand All @@ -50,6 +50,7 @@
parameters := source_parameters()
}.
-type source_parameters() :: #{
group_id => binary(),
key_encoding_mode := encoding_mode(),
max_batch_bytes := emqx_schema:bytesize(),
max_rejoin_attempts := non_neg_integer(),
Expand Down Expand Up @@ -431,7 +432,7 @@ start_consumer(Config, ConnectorResId, SourceResId, ClientID) ->
%% note: the group id should be the same for all nodes in the
%% cluster, so that the load gets distributed between all
%% consumers and we don't repeat messages in the same cluster.
GroupID = consumer_group_id(BridgeName),
GroupID = consumer_group_id(Params0, BridgeName),
%% earliest or latest
BeginOffset = OffsetResetPolicy0,
OffsetResetPolicy =
Expand Down Expand Up @@ -623,8 +624,10 @@ log_when_error(Fun, Log) ->
})
end.

-spec consumer_group_id(atom() | binary()) -> binary().
consumer_group_id(BridgeName0) ->
-spec consumer_group_id(#{group_id => binary(), any() => term()}, atom() | binary()) -> binary().
consumer_group_id(#{group_id := GroupId}, _BridgeName) when is_binary(GroupId) ->
GroupId;
consumer_group_id(_ConsumerParams, BridgeName0) ->
BridgeName = to_bin(BridgeName0),
<<"emqx-kafka-consumer-", BridgeName/binary>>.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1542,7 +1542,7 @@ t_receive_after_recovery(Config) ->
_Interval = 500,
_NAttempts = 20,
begin
GroupId = emqx_bridge_kafka_impl_consumer:consumer_group_id(KafkaNameA),
GroupId = emqx_bridge_kafka_impl_consumer:consumer_group_id(#{}, KafkaNameA),
{ok, [#{partitions := Partitions}]} = brod:fetch_committed_offsets(
KafkaClientId, GroupId
),
Expand Down
59 changes: 53 additions & 6 deletions apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,41 @@ test_keepalive_validation(Name, Conf) ->
[?_assertThrow(_, check(C)) || C <- InvalidConfs] ++
[?_assertThrow(_, check_atom_key(C)) || C <- InvalidConfs].

%% assert compatibility
bridge_schema_json_test() ->
JSON = iolist_to_binary(emqx_dashboard_schema_api:bridge_schema_json()),
Map = emqx_utils_json:decode(JSON),
Path = [<<"components">>, <<"schemas">>, <<"bridge_kafka.post_producer">>, <<"properties">>],
?assertMatch(#{<<"kafka">> := _}, emqx_utils_maps:deep_get(Path, Map)).

custom_group_id_test() ->
BaseConfig = kafka_consumer_source_config(),
BadSourceConfig = emqx_utils_maps:deep_merge(
BaseConfig,
#{<<"parameters">> => #{<<"group_id">> => <<>>}}
),
?assertThrow(
{_, [
#{
path := "sources.kafka_consumer.my_consumer.parameters.group_id",
reason := "Group id must not be empty"
}
]},
emqx_bridge_v2_testlib:parse_and_check(source, kafka_consumer, my_consumer, BadSourceConfig)
),

CustomId = <<"custom_id">>,
OkSourceConfig = emqx_utils_maps:deep_merge(
BaseConfig,
#{<<"parameters">> => #{<<"group_id">> => CustomId}}
),
?assertMatch(
#{<<"parameters">> := #{<<"group_id">> := CustomId}},
emqx_bridge_v2_testlib:parse_and_check(source, kafka_consumer, my_consumer, OkSourceConfig)
),

ok.

%%===========================================================================
%% Helper functions
%%===========================================================================
Expand Down Expand Up @@ -355,9 +390,21 @@ kafka_consumer_hocon() ->
"\n }"
"\n }".

%% assert compatibility
bridge_schema_json_test() ->
JSON = iolist_to_binary(emqx_dashboard_schema_api:bridge_schema_json()),
Map = emqx_utils_json:decode(JSON),
Path = [<<"components">>, <<"schemas">>, <<"bridge_kafka.post_producer">>, <<"properties">>],
?assertMatch(#{<<"kafka">> := _}, emqx_utils_maps:deep_get(Path, Map)).
kafka_consumer_source_config() ->
#{
<<"enable">> => true,
<<"connector">> => <<"my_connector">>,
<<"parameters">> =>
#{
<<"key_encoding_mode">> => <<"none">>,
<<"max_batch_bytes">> => <<"896KB">>,
<<"max_rejoin_attempts">> => <<"5">>,
<<"offset_reset_policy">> => <<"latest">>,
<<"topic">> => <<"please override">>,
<<"value_encoding_mode">> => <<"none">>
},
<<"resource_opts">> => #{
<<"health_check_interval">> => <<"2s">>,
<<"resume_interval">> => <<"2s">>
}
}.
Original file line number Diff line number Diff line change
Expand Up @@ -351,3 +351,28 @@ t_bad_bootstrap_host(Config) ->
)
),
ok.

t_custom_group_id(Config) ->
?check_trace(
begin
#{<<"bootstrap_hosts">> := BootstrapHosts} = ?config(connector_config, Config),
CustomGroupId = <<"my_group_id">>,
{ok, {{_, 201, _}, _, _}} =
emqx_bridge_v2_testlib:create_bridge_api(
Config,
#{<<"parameters">> => #{<<"group_id">> => CustomGroupId}}
),
[Endpoint] = emqx_bridge_kafka_impl:hosts(BootstrapHosts),
?retry(100, 10, begin
{ok, Groups} = brod:list_groups(Endpoint, _ConnOpts = #{}),
?assertMatch(
[_],
[Group || Group = {_, Id, _} <- Groups, Id == CustomGroupId],
#{groups => Groups}
)
end),
ok
end,
[]
),
ok.
1 change: 1 addition & 0 deletions changes/ee/feat-12961.en.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added the option to customize group ids in advance for Kafka Consumer sources.
5 changes: 5 additions & 0 deletions rel/i18n/emqx_bridge_kafka_consumer_schema.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,9 @@ emqx_bridge_kafka_consumer_schema {
config_connector.label:
"""Kafka Consumer Client Configuration"""

group_id.desc:
"""Consumer group identifier to be used for this source. If omitted, one based off the source name will be automatically generated."""
group_id.label:
"""Custom Consumer Group Id"""

}

0 comments on commit 9f710e2

Please sign in to comment.