Skip to content

Commit

Permalink
Merge 6f3da6b into 28cdce7
Browse files Browse the repository at this point in the history
  • Loading branch information
thalesmg committed May 2, 2024
2 parents 28cdce7 + 6f3da6b commit bc1a388
Show file tree
Hide file tree
Showing 11 changed files with 90 additions and 7 deletions.
2 changes: 1 addition & 1 deletion apps/emqx_bridge_azure_event_hub/rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

{erl_opts, [debug_info]}.
{deps, [
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.2"}}},
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.3"}}},
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}},
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_bridge_confluent/rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

{erl_opts, [debug_info]}.
{deps, [
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.2"}}},
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.3"}}},
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}},
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_bridge_kafka/rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

{erl_opts, [debug_info]}.
{deps, [
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.2"}}},
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.3"}}},
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}},
Expand Down
5 changes: 5 additions & 0 deletions apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,11 @@ kafka_connector_config_fields() ->
default => none, desc => ?DESC("authentication")
})},
{socket_opts, mk(ref(socket_opts), #{required => false, desc => ?DESC(socket_opts)})},
{health_check_topic,
mk(binary(), #{
required => false,
desc => ?DESC(producer_health_check_topic)
})},
{ssl, mk(ref(ssl_client_opts), #{})}
] ++ emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts).

Expand Down
27 changes: 24 additions & 3 deletions apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,13 @@ on_start(InstId, Config) ->
%% only when its producers start.
case check_client_connectivity(ClientId) of
ok ->
{ok, #{
HealthCheckTopic = maps:get(health_check_topic, Config, undefined),
ConnectorState = #{
client_id => ClientId,
health_check_topic => HealthCheckTopic,
installed_bridge_v2s => #{}
}};
},
{ok, ConnectorState};
{error, {find_client, Reason}} ->
%% Race condition? Crash? We just checked it with `ensure_client'...
{error, Reason};
Expand Down Expand Up @@ -508,7 +511,7 @@ on_get_status(
%% held in wolff producer's replayq.
case check_client_connectivity(ClientId) of
ok ->
?status_connected;
maybe_check_health_check_topic(State);
{error, {find_client, _Error}} ->
?status_connecting;
{error, {connectivity, Error}} ->
Expand Down Expand Up @@ -572,6 +575,24 @@ check_client_connectivity(ClientId) ->
{error, {find_client, Reason}}
end.

maybe_check_health_check_topic(#{health_check_topic := Topic} = ConnectorState) when
is_binary(Topic)
->
#{client_id := ClientId} = ConnectorState,
MaxPartitions = all_partitions,
try check_topic_and_leader_connections(ClientId, Topic, MaxPartitions) of
ok ->
?status_connected
catch
throw:#{reason := {connection_down, _} = Reason} ->
{?status_disconnected, ConnectorState, Reason};
throw:#{reason := Reason} ->
{?status_connecting, ConnectorState, Reason}
end;
maybe_check_health_check_topic(_) ->
%% Cannot infer further information. Maybe upgraded from older version.
?status_connected.

check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic, MaxPartitions) when is_pid(ClientPid) ->
Leaders =
case wolff_client:get_leader_connections(ClientPid, KafkaTopic, MaxPartitions) of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -638,3 +638,43 @@ t_ancient_v1_config_migration_without_local_topic(Config) ->
erpc:call(Node, fun emqx_bridge_v2:list/0)
),
ok.

t_connector_health_check_topic(_Config) ->
?check_trace(
begin
%% We create a connector pointing to a broker that expects authentication, but
%% we don't provide it in the config.
%% Without a health check topic, we're unable to probe any topic leaders to
%% check the actual connection parameters, so the status is "connected".
Type = ?TYPE,
Name = ?FUNCTION_NAME,
PlainAuthBootstrapHost = <<"kafka-1.emqx.net:9093">>,
ConnectorConfig0 = connector_config(#{
<<"bootstrap_hosts">> => PlainAuthBootstrapHost
}),
?assertMatch(
{ok, {{_, 201, _}, _, #{<<"status">> := <<"connected">>}}},
emqx_bridge_v2_testlib:create_connector_api([
{connector_type, Type},
{connector_name, Name},
{connector_config, ConnectorConfig0}
])
),

%% By providing a health check topic, we should detect it's disconnected
%% without the need for an action.
ConnectorConfig1 = connector_config(#{
<<"bootstrap_hosts">> => PlainAuthBootstrapHost,
<<"health_check_topic">> =>
emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition()
}),
?assertMatch(
{ok, {{_, 200, _}, _, #{<<"status">> := <<"disconnected">>}}},
emqx_bridge_v2_testlib:update_connector_api(Name, Type, ConnectorConfig1)
),

ok
end,
[]
),
ok.
3 changes: 3 additions & 0 deletions changes/ee/feat-12959.en.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Added a new option to configure a topic solely for health check purposes in Kafka Producer connectors.

By configuring this option, it's now possible to more accurately detect connection issues towards partition leaders, such as wrong or missing credentials that prevent establishing the connection.
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ defmodule EMQXUmbrella.MixProject do
{:hstreamdb_erl,
github: "hstreamdb/hstreamdb_erl", tag: "0.5.18+v0.18.1+ezstd-v1.0.5-emqx1"},
{:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true},
{:wolff, github: "kafka4beam/wolff", tag: "1.10.2"},
{:wolff, github: "kafka4beam/wolff", tag: "1.10.3"},
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true},
{:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
{:brod, github: "kafka4beam/brod", tag: "3.16.8"},
Expand Down
5 changes: 5 additions & 0 deletions rel/i18n/emqx_bridge_azure_event_hub.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -350,4 +350,9 @@ Setting this to a value which is greater than the total number of partitions in
partitions_limit.label:
"""Max Partitions"""

producer_health_check_topic.desc:
"""Topic name used exclusively for more accurate connector health checks."""
producer_health_check_topic.label:
"""Connector health check topic"""

}
5 changes: 5 additions & 0 deletions rel/i18n/emqx_bridge_confluent_producer.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -350,4 +350,9 @@ server_name_indication.desc:
server_name_indication.label:
"""SNI"""

producer_health_check_topic.desc:
"""Topic name used exclusively for more accurate connector health checks."""
producer_health_check_topic.label:
"""Connector health check topic"""

}
4 changes: 4 additions & 0 deletions rel/i18n/emqx_bridge_kafka.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -446,5 +446,9 @@ server_name_indication.desc:
server_name_indication.label:
"""SNI"""

producer_health_check_topic.desc:
"""Topic name used exclusively for more accurate connector health checks."""
producer_health_check_topic.label:
"""Connector health check topic"""

}

0 comments on commit bc1a388

Please sign in to comment.