Skip to content

Commit

Permalink
Handle some of the exceptions as unrecoverable errors
Browse files Browse the repository at this point in the history
This is because msg_size_too_large and topic_authorization_failed error
codes should be treated as an unrecoverable error.
In Fluentd, we need to mark as unrecoverable with raise
Fluent::Unrecoverable. But current implementation does not handle them
as unrecoverable errors.

Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
  • Loading branch information
cosmo0920 committed Jun 25, 2024
1 parent ea0f10a commit d23d2e9
Showing 1 changed file with 8 additions and 1 deletion.
9 changes: 8 additions & 1 deletion lib/fluent/plugin/out_rdkafka2.rb
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ class Fluent::Rdkafka2Output < Output
config_param :max_enqueue_bytes_per_second, :size, :default => nil, :desc => 'The maximum number of enqueueing bytes per second'

config_param :service_name, :string, :default => nil, :desc => 'Used for sasl.kerberos.service.name'
config_param :unrecoverable_error_types, :array, :default => [:topic_authorization_failed, :msg_size_too_large],
:desc => 'Handle some of the errors should be unrecoverable if specified'

config_section :buffer do
config_set_default :chunk_keys, ["topic"]
Expand Down Expand Up @@ -522,7 +524,12 @@ def enqueue_with_retry(producer, topic, record_buf, message_key, partition, head

raise e
else
raise e
if unrecoverable_error_types.include?(e.code)
# some of the errors should be handled as an unrecoverable error
raise Fluent::UnrecoverableError, "Rejected due to #{e}"
else
raise e
end
end
end
end
Expand Down

0 comments on commit d23d2e9

Please sign in to comment.