Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

0130 limit kafka partitions #12427

Merged
merged 4 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,6 @@ t_session_unsubscription_idempotency(Config) ->
?check_trace(
#{timetrap => 30_000},
begin
#{timetrap => 20_000},
?force_ordering(
#{
?snk_kind := persistent_session_ds_subscription_delete
Expand Down Expand Up @@ -498,9 +497,7 @@ do_t_session_expiration(_Config, Opts) ->
ok.

t_session_gc(Config) ->
GCInterval = ?config(gc_interval, Config),
[Node1, Node2, _Node3] = Nodes = ?config(nodes, Config),
CoreNodes = [Node1, Node2],
[
Port1,
Port2,
Expand Down
10 changes: 5 additions & 5 deletions apps/emqx/src/emqx_broker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,13 @@
%% Guards
-define(IS_SUBID(Id), (is_binary(Id) orelse is_atom(Id))).

-define(cast_or_eval(Pid, Msg, Expr),
case Pid =:= self() of
true ->
-define(cast_or_eval(PICK, Msg, Expr),
case PICK of
__X_Pid when __X_Pid =:= self() ->
_ = Expr,
ok;
false ->
cast(Pid, Msg)
__X_Pid ->
cast(__X_Pid, Msg)
end
).

Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_bridge_azure_event_hub/rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

{erl_opts, [debug_info]}.
{deps, [
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.9.1"}}},
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.1"}}},
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.3"}}},
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}},
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_bridge_confluent/rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

{erl_opts, [debug_info]}.
{deps, [
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.9.1"}}},
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.1"}}},
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.3"}}},
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}},
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_bridge_kafka/rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

{erl_opts, [debug_info]}.
{deps, [
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.9.1"}}},
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.1"}}},
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.3"}}},
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}},
Expand Down
9 changes: 9 additions & 0 deletions apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ values(producer_values) ->
],
kafka_header_value_encode_mode => none,
max_inflight => 10,
partitions_limit => all_partitions,
buffer => #{
mode => <<"hybrid">>,
per_partition_limit => <<"2GB">>,
Expand Down Expand Up @@ -414,6 +415,14 @@ fields(producer_kafka_opts) ->
desc => ?DESC(partition_count_refresh_interval)
}
)},
{partitions_limit,
mk(
hoconsc:union([all_partitions, pos_integer()]),
#{
default => <<"all_partitions">>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that the default value for atoms should be atoms. Is it indifferent in this case?

desc => ?DESC(partitions_limit)
}
)},
{max_inflight,
mk(
pos_integer(),
Expand Down
25 changes: 15 additions & 10 deletions apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ create_producers_for_bridge_v2(
KafkaHeadersTokens = preproc_kafka_headers(maps:get(kafka_headers, KafkaConfig, undefined)),
KafkaExtHeadersTokens = preproc_ext_headers(maps:get(kafka_ext_headers, KafkaConfig, [])),
KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none),
MaxPartitions = maps:get(partitions_limit, KafkaConfig, all_partitions),
#{name := BridgeName} = emqx_bridge_v2:parse_id(BridgeV2Id),
TestIdStart = string:find(BridgeV2Id, ?TEST_ID_PREFIX),
IsDryRun =
Expand All @@ -144,7 +145,7 @@ create_producers_for_bridge_v2(
_ ->
string:equal(TestIdStart, InstId)
end,
ok = check_topic_and_leader_connections(ClientId, KafkaTopic),
ok = check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions),
WolffProducerConfig = producers_config(
BridgeType, BridgeName, KafkaConfig, IsDryRun, BridgeV2Id
),
Expand All @@ -166,7 +167,8 @@ create_producers_for_bridge_v2(
kafka_config => KafkaConfig,
headers_tokens => KafkaHeadersTokens,
ext_headers_tokens => KafkaExtHeadersTokens,
headers_val_encode_mode => KafkaHeadersValEncodeMode
headers_val_encode_mode => KafkaHeadersValEncodeMode,
partitions_limit => MaxPartitions
}};
{error, Reason2} ->
?SLOG(error, #{
Expand Down Expand Up @@ -517,9 +519,9 @@ on_get_channel_status(
%% `?status_disconnected' will make resource manager try to restart the producers /
%% connector, thus potentially dropping data held in wolff producer's replayq. The
%% only exception is if the topic does not exist ("unhealthy target").
#{kafka_topic := KafkaTopic} = maps:get(ChannelId, Channels),
#{kafka_topic := KafkaTopic, partitions_limit := MaxPartitions} = maps:get(ChannelId, Channels),
try
ok = check_topic_and_leader_connections(ClientId, KafkaTopic),
ok = check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions),
?status_connected
catch
throw:{unhealthy_target, Msg} ->
Expand All @@ -528,11 +530,11 @@ on_get_channel_status(
{?status_connecting, {K, E}}
end.

check_topic_and_leader_connections(ClientId, KafkaTopic) ->
check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions) ->
case wolff_client_sup:find_client(ClientId) of
{ok, Pid} ->
ok = check_topic_status(ClientId, Pid, KafkaTopic),
ok = check_if_healthy_leaders(ClientId, Pid, KafkaTopic);
ok = check_if_healthy_leaders(ClientId, Pid, KafkaTopic, MaxPartitions);
{error, no_such_client} ->
throw(#{
reason => cannot_find_kafka_client,
Expand Down Expand Up @@ -562,9 +564,9 @@ check_client_connectivity(ClientId) ->
{error, {find_client, Reason}}
end.

check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic) when is_pid(ClientPid) ->
check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic, MaxPartitions) when is_pid(ClientPid) ->
Leaders =
case wolff_client:get_leader_connections(ClientPid, KafkaTopic) of
case wolff_client:get_leader_connections(ClientPid, KafkaTopic, MaxPartitions) of
{ok, LeadersToCheck} ->
%% Kafka is considered healthy as long as any of the partition leader is reachable.
lists:filtermap(
Expand All @@ -584,7 +586,8 @@ check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic) when is_pid(ClientPid)
throw(#{
error => no_connected_partition_leader,
kafka_client => ClientId,
kafka_topic => KafkaTopic
kafka_topic => KafkaTopic,
partitions_limit => MaxPartitions
});
_ ->
ok
Expand Down Expand Up @@ -619,6 +622,7 @@ producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) ->
required_acks := RequiredAcks,
partition_count_refresh_interval := PCntRefreshInterval,
max_inflight := MaxInflight,
partitions_limit := MaxPartitions,
buffer := #{
mode := BufferMode0,
per_partition_limit := PerPartitionLimit,
Expand Down Expand Up @@ -652,7 +656,8 @@ producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) ->
max_batch_bytes => MaxBatchBytes,
max_send_ahead => MaxInflight - 1,
compression => Compression,
telemetry_meta_data => #{bridge_id => BridgeV2Id}
telemetry_meta_data => #{bridge_id => BridgeV2Id},
max_partitions => MaxPartitions
}.

%% Wolff API is a batch API.
Expand Down
1 change: 1 addition & 0 deletions changes/ee/feat-12427.en.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Made possible to limit the number of Kafka partitions to utilize for Kafka data integration.
HJianBo marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ defmodule EMQXUmbrella.MixProject do
[
{:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.4.5+v0.16.1"},
{:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true},
{:wolff, github: "kafka4beam/wolff", tag: "1.9.1"},
{:wolff, github: "kafka4beam/wolff", tag: "1.10.1"},
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.3", override: true},
{:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
{:brod, github: "kafka4beam/brod", tag: "3.16.8"},
Expand Down
8 changes: 8 additions & 0 deletions rel/i18n/emqx_bridge_azure_event_hub.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -339,4 +339,12 @@ server_name_indication.desc:
server_name_indication.label:
"""SNI"""

partitions_limit.desc:
"""Limit the number of partitions to produce data for the given topic.
The special value `all_partitions` is to utilize all partitions for the topic.
Setting this to a value which is greater than the total number of partitions in has no effect."""

partitions_limit.label:
"""Max Partitions"""

}
8 changes: 8 additions & 0 deletions rel/i18n/emqx_bridge_confluent_producer.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,14 @@ This value is to specify the size of each on-disk buffer file."""
buffer_segment_bytes.label:
"""Segment File Bytes"""

partitions_limit.desc:
"""Limit the number of partitions to produce data for the given topic.
The special value `all_partitions` is to utilize all partitions for the topic.
Setting this to a value which is greater than the total number of partitions in has no effect."""

partitions_limit.label:
"""Max Partitions"""

max_inflight.desc:
"""Maximum number of batches allowed for Confluent producer (per-partition) to send before receiving acknowledgement from Confluent. Greater value typically means better throughput. However, there can be a risk of message reordering when this value is greater than 1."""

Expand Down
8 changes: 8 additions & 0 deletions rel/i18n/emqx_bridge_kafka.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,14 @@ consumer_kafka_opts.desc:
consumer_kafka_opts.label:
"""Kafka Consumer"""

partitions_limit.desc:
"""Limit the number of partitions to produce data for the given topic.
The special value `all_partitions` is to utilize all partitions for the topic.
Setting this to a value which is greater than the total number of partitions in has no effect."""

partitions_limit.label:
"""Max Partitions"""

max_inflight.desc:
"""Maximum number of batches allowed for Kafka producer (per-partition) to send before receiving acknowledgement from Kafka. Greater value typically means better throughput. However, there can be a risk of message reordering when this value is greater than 1."""

Expand Down