diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 18ccd63aae..fa7bc67acc 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -572,33 +572,54 @@ check_client_connectivity(ClientId) -> {error, {find_client, Reason}} end. +is_alive(Pid) -> + is_pid(Pid) andalso erlang:is_process_alive(Pid). + +error_summary(Map, [Error]) -> + Map#{error => Error}; +error_summary(Map, [Error | More]) -> + Map#{first_error => Error, total_errors => length(More) + 1}. + check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic, MaxPartitions) when is_pid(ClientPid) -> - Leaders = - case wolff_client:get_leader_connections(ClientPid, KafkaTopic, MaxPartitions) of - {ok, LeadersToCheck} -> - %% Kafka is considered healthy as long as any of the partition leader is reachable. - lists:filtermap( - fun({_Partition, Pid}) -> - case is_pid(Pid) andalso erlang:is_process_alive(Pid) of - true -> {true, Pid}; - _ -> false - end - end, - LeadersToCheck - ); - {error, _} -> - [] - end, - case Leaders of - [] -> + case wolff_client:get_leader_connections(ClientPid, KafkaTopic, MaxPartitions) of + {ok, Leaders} -> + %% Kafka is considered healthy as long as any of the partition leader is reachable. + case lists:partition(fun({_Partition, Pid}) -> is_alive(Pid) end, Leaders) of + {[], Errors} -> + throw( + error_summary( + #{ + cause => "no_connected_partition_leader", + kafka_client => ClientId, + kafka_topic => KafkaTopic + }, + Errors + ) + ); + {_, []} -> + ok; + {_, Errors} -> + ?SLOG( + warning, + "not_all_kafka_partitions_connected", + error_summary( + #{ + kafka_client => ClientId, + kafka_topic => KafkaTopic + }, + Errors + ) + ), + ok + end; + {error, Reason} -> + %% If failed to fetch metadata, wolff_client logs a warning level message + %% which includes the reason for each seed host throw(#{ - error => no_connected_partition_leader, + cause => Reason, kafka_client => ClientId, - kafka_topic => KafkaTopic, - partitions_limit => MaxPartitions - }); - _ -> - ok + kafka_topic => KafkaTopic + }) end. check_topic_status(ClientId, WolffClientPid, KafkaTopic) -> diff --git a/apps/emqx_license/test/emqx_license_http_api_SUITE.erl b/apps/emqx_license/test/emqx_license_http_api_SUITE.erl index 3809305273..be0b31dcf8 100644 --- a/apps/emqx_license/test/emqx_license_http_api_SUITE.erl +++ b/apps/emqx_license/test/emqx_license_http_api_SUITE.erl @@ -245,7 +245,7 @@ t_license_setting_bc(_Config) -> ?assertMatch(#{<<"max_connections">> := 25}, request_dump()), %% get GetRes = request(get, uri(["license", "setting"]), []), - %% aslo check that the settings return correctly + %% also check that the settings return correctly validate_setting(GetRes, <<"75%">>, <<"80%">>, 25), %% update Low = <<"50%">>, diff --git a/changes/ee/fix-13070.en.md b/changes/ee/fix-13070.en.md new file mode 100644 index 0000000000..e73cca3cf3 --- /dev/null +++ b/changes/ee/fix-13070.en.md @@ -0,0 +1,5 @@ +Improve Kafka connector error logs. + +Previously, specific error details, such as unreachable advertised listeners, were not logged. +Now, error details are captured in the logs to provide more diagnostic information. +To manage log verbosity, only the first occurrence of an error is logged, accompanied by the total count of similar errors.