Skip to content

Commit

Permalink
Merge pull request #12427 from zmstone/0130-limit-kafka-partitions
Browse files Browse the repository at this point in the history
0130 limit kafka partitions
  • Loading branch information
zmstone committed Feb 1, 2024
2 parents 42a4bc3 + 8f9e676 commit 3e518c1
Show file tree
Hide file tree
Showing 12 changed files with 58 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,6 @@ t_session_unsubscription_idempotency(Config) ->
?check_trace(
#{timetrap => 30_000},
begin
#{timetrap => 20_000},
?force_ordering(
#{
?snk_kind := persistent_session_ds_subscription_delete
Expand Down Expand Up @@ -498,9 +497,7 @@ do_t_session_expiration(_Config, Opts) ->
ok.

t_session_gc(Config) ->
GCInterval = ?config(gc_interval, Config),
[Node1, Node2, _Node3] = Nodes = ?config(nodes, Config),
CoreNodes = [Node1, Node2],
[
Port1,
Port2,
Expand Down
10 changes: 5 additions & 5 deletions apps/emqx/src/emqx_broker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,13 @@
%% Guards
-define(IS_SUBID(Id), (is_binary(Id) orelse is_atom(Id))).

-define(cast_or_eval(Pid, Msg, Expr),
case Pid =:= self() of
true ->
-define(cast_or_eval(PICK, Msg, Expr),
case PICK of
__X_Pid when __X_Pid =:= self() ->
_ = Expr,
ok;
false ->
cast(Pid, Msg)
__X_Pid ->
cast(__X_Pid, Msg)
end
).

Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_bridge_azure_event_hub/rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

{erl_opts, [debug_info]}.
{deps, [
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.9.1"}}},
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.1"}}},
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.3"}}},
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}},
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_bridge_confluent/rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

{erl_opts, [debug_info]}.
{deps, [
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.9.1"}}},
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.1"}}},
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.3"}}},
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}},
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_bridge_kafka/rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

{erl_opts, [debug_info]}.
{deps, [
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.9.1"}}},
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.1"}}},
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.3"}}},
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}},
Expand Down
9 changes: 9 additions & 0 deletions apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ values(producer_values) ->
],
kafka_header_value_encode_mode => none,
max_inflight => 10,
partitions_limit => all_partitions,
buffer => #{
mode => <<"hybrid">>,
per_partition_limit => <<"2GB">>,
Expand Down Expand Up @@ -414,6 +415,14 @@ fields(producer_kafka_opts) ->
desc => ?DESC(partition_count_refresh_interval)
}
)},
{partitions_limit,
mk(
hoconsc:union([all_partitions, pos_integer()]),
#{
default => <<"all_partitions">>,
desc => ?DESC(partitions_limit)
}
)},
{max_inflight,
mk(
pos_integer(),
Expand Down
25 changes: 15 additions & 10 deletions apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ create_producers_for_bridge_v2(
KafkaHeadersTokens = preproc_kafka_headers(maps:get(kafka_headers, KafkaConfig, undefined)),
KafkaExtHeadersTokens = preproc_ext_headers(maps:get(kafka_ext_headers, KafkaConfig, [])),
KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none),
MaxPartitions = maps:get(partitions_limit, KafkaConfig, all_partitions),
#{name := BridgeName} = emqx_bridge_v2:parse_id(BridgeV2Id),
TestIdStart = string:find(BridgeV2Id, ?TEST_ID_PREFIX),
IsDryRun =
Expand All @@ -144,7 +145,7 @@ create_producers_for_bridge_v2(
_ ->
string:equal(TestIdStart, InstId)
end,
ok = check_topic_and_leader_connections(ClientId, KafkaTopic),
ok = check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions),
WolffProducerConfig = producers_config(
BridgeType, BridgeName, KafkaConfig, IsDryRun, BridgeV2Id
),
Expand All @@ -166,7 +167,8 @@ create_producers_for_bridge_v2(
kafka_config => KafkaConfig,
headers_tokens => KafkaHeadersTokens,
ext_headers_tokens => KafkaExtHeadersTokens,
headers_val_encode_mode => KafkaHeadersValEncodeMode
headers_val_encode_mode => KafkaHeadersValEncodeMode,
partitions_limit => MaxPartitions
}};
{error, Reason2} ->
?SLOG(error, #{
Expand Down Expand Up @@ -517,9 +519,9 @@ on_get_channel_status(
%% `?status_disconnected' will make resource manager try to restart the producers /
%% connector, thus potentially dropping data held in wolff producer's replayq. The
%% only exception is if the topic does not exist ("unhealthy target").
#{kafka_topic := KafkaTopic} = maps:get(ChannelId, Channels),
#{kafka_topic := KafkaTopic, partitions_limit := MaxPartitions} = maps:get(ChannelId, Channels),
try
ok = check_topic_and_leader_connections(ClientId, KafkaTopic),
ok = check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions),
?status_connected
catch
throw:{unhealthy_target, Msg} ->
Expand All @@ -528,11 +530,11 @@ on_get_channel_status(
{?status_connecting, {K, E}}
end.

check_topic_and_leader_connections(ClientId, KafkaTopic) ->
check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions) ->
case wolff_client_sup:find_client(ClientId) of
{ok, Pid} ->
ok = check_topic_status(ClientId, Pid, KafkaTopic),
ok = check_if_healthy_leaders(ClientId, Pid, KafkaTopic);
ok = check_if_healthy_leaders(ClientId, Pid, KafkaTopic, MaxPartitions);
{error, no_such_client} ->
throw(#{
reason => cannot_find_kafka_client,
Expand Down Expand Up @@ -562,9 +564,9 @@ check_client_connectivity(ClientId) ->
{error, {find_client, Reason}}
end.

check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic) when is_pid(ClientPid) ->
check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic, MaxPartitions) when is_pid(ClientPid) ->
Leaders =
case wolff_client:get_leader_connections(ClientPid, KafkaTopic) of
case wolff_client:get_leader_connections(ClientPid, KafkaTopic, MaxPartitions) of
{ok, LeadersToCheck} ->
%% Kafka is considered healthy as long as any of the partition leader is reachable.
lists:filtermap(
Expand All @@ -584,7 +586,8 @@ check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic) when is_pid(ClientPid)
throw(#{
error => no_connected_partition_leader,
kafka_client => ClientId,
kafka_topic => KafkaTopic
kafka_topic => KafkaTopic,
partitions_limit => MaxPartitions
});
_ ->
ok
Expand Down Expand Up @@ -619,6 +622,7 @@ producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) ->
required_acks := RequiredAcks,
partition_count_refresh_interval := PCntRefreshInterval,
max_inflight := MaxInflight,
partitions_limit := MaxPartitions,
buffer := #{
mode := BufferMode0,
per_partition_limit := PerPartitionLimit,
Expand Down Expand Up @@ -652,7 +656,8 @@ producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) ->
max_batch_bytes => MaxBatchBytes,
max_send_ahead => MaxInflight - 1,
compression => Compression,
telemetry_meta_data => #{bridge_id => BridgeV2Id}
telemetry_meta_data => #{bridge_id => BridgeV2Id},
max_partitions => MaxPartitions
}.

%% Wolff API is a batch API.
Expand Down
1 change: 1 addition & 0 deletions changes/ee/feat-12427.en.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Made possible to limit the number of Kafka partitions to utilize for Kafka data integration.
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ defmodule EMQXUmbrella.MixProject do
[
{:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.4.5+v0.16.1"},
{:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true},
{:wolff, github: "kafka4beam/wolff", tag: "1.9.1"},
{:wolff, github: "kafka4beam/wolff", tag: "1.10.1"},
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.3", override: true},
{:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
{:brod, github: "kafka4beam/brod", tag: "3.16.8"},
Expand Down
8 changes: 8 additions & 0 deletions rel/i18n/emqx_bridge_azure_event_hub.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -339,4 +339,12 @@ server_name_indication.desc:
server_name_indication.label:
"""SNI"""

partitions_limit.desc:
"""Limit the number of partitions to produce data for the given topic.
The special value `all_partitions` is to utilize all partitions for the topic.
Setting this to a value which is greater than the total number of partitions in has no effect."""

partitions_limit.label:
"""Max Partitions"""

}
8 changes: 8 additions & 0 deletions rel/i18n/emqx_bridge_confluent_producer.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,14 @@ This value is to specify the size of each on-disk buffer file."""
buffer_segment_bytes.label:
"""Segment File Bytes"""

partitions_limit.desc:
"""Limit the number of partitions to produce data for the given topic.
The special value `all_partitions` is to utilize all partitions for the topic.
Setting this to a value which is greater than the total number of partitions in has no effect."""

partitions_limit.label:
"""Max Partitions"""

max_inflight.desc:
"""Maximum number of batches allowed for Confluent producer (per-partition) to send before receiving acknowledgement from Confluent. Greater value typically means better throughput. However, there can be a risk of message reordering when this value is greater than 1."""

Expand Down
8 changes: 8 additions & 0 deletions rel/i18n/emqx_bridge_kafka.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,14 @@ consumer_kafka_opts.desc:
consumer_kafka_opts.label:
"""Kafka Consumer"""

partitions_limit.desc:
"""Limit the number of partitions to produce data for the given topic.
The special value `all_partitions` is to utilize all partitions for the topic.
Setting this to a value which is greater than the total number of partitions in has no effect."""

partitions_limit.label:
"""Max Partitions"""

max_inflight.desc:
"""Maximum number of batches allowed for Kafka producer (per-partition) to send before receiving acknowledgement from Kafka. Greater value typically means better throughput. However, there can be a risk of message reordering when this value is greater than 1."""

Expand Down

0 comments on commit 3e518c1

Please sign in to comment.