Skip to content

Commit

Permalink
Merge pull request #13070 from zmstone/0518-improve-kafka-connection-…
Browse files Browse the repository at this point in the history
…error-logs

0518 improve kafka connection error logs
  • Loading branch information
zmstone committed May 29, 2024
2 parents 5017ef8 + 7696f78 commit c54d25d
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 25 deletions.
69 changes: 45 additions & 24 deletions apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -572,33 +572,54 @@ check_client_connectivity(ClientId) ->
{error, {find_client, Reason}}
end.

is_alive(Pid) ->
is_pid(Pid) andalso erlang:is_process_alive(Pid).

error_summary(Map, [Error]) ->
Map#{error => Error};
error_summary(Map, [Error | More]) ->
Map#{first_error => Error, total_errors => length(More) + 1}.

check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic, MaxPartitions) when is_pid(ClientPid) ->
Leaders =
case wolff_client:get_leader_connections(ClientPid, KafkaTopic, MaxPartitions) of
{ok, LeadersToCheck} ->
%% Kafka is considered healthy as long as any of the partition leader is reachable.
lists:filtermap(
fun({_Partition, Pid}) ->
case is_pid(Pid) andalso erlang:is_process_alive(Pid) of
true -> {true, Pid};
_ -> false
end
end,
LeadersToCheck
);
{error, _} ->
[]
end,
case Leaders of
[] ->
case wolff_client:get_leader_connections(ClientPid, KafkaTopic, MaxPartitions) of
{ok, Leaders} ->
%% Kafka is considered healthy as long as any of the partition leader is reachable.
case lists:partition(fun({_Partition, Pid}) -> is_alive(Pid) end, Leaders) of
{[], Errors} ->
throw(
error_summary(
#{
cause => "no_connected_partition_leader",
kafka_client => ClientId,
kafka_topic => KafkaTopic
},
Errors
)
);
{_, []} ->
ok;
{_, Errors} ->
?SLOG(
warning,
"not_all_kafka_partitions_connected",
error_summary(
#{
kafka_client => ClientId,
kafka_topic => KafkaTopic
},
Errors
)
),
ok
end;
{error, Reason} ->
%% If failed to fetch metadata, wolff_client logs a warning level message
%% which includes the reason for each seed host
throw(#{
error => no_connected_partition_leader,
cause => Reason,
kafka_client => ClientId,
kafka_topic => KafkaTopic,
partitions_limit => MaxPartitions
});
_ ->
ok
kafka_topic => KafkaTopic
})
end.

check_topic_status(ClientId, WolffClientPid, KafkaTopic) ->
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_license/test/emqx_license_http_api_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ t_license_setting_bc(_Config) ->
?assertMatch(#{<<"max_connections">> := 25}, request_dump()),
%% get
GetRes = request(get, uri(["license", "setting"]), []),
%% aslo check that the settings return correctly
%% also check that the settings return correctly
validate_setting(GetRes, <<"75%">>, <<"80%">>, 25),
%% update
Low = <<"50%">>,
Expand Down
5 changes: 5 additions & 0 deletions changes/ee/fix-13070.en.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Improve Kafka connector error logs.

Previously, specific error details, such as unreachable advertised listeners, were not logged.
Now, error details are captured in the logs to provide more diagnostic information.
To manage log verbosity, only the first occurrence of an error is logged, accompanied by the total count of similar errors.

0 comments on commit c54d25d

Please sign in to comment.