diff --git a/lib/fluent/plugin/out_rdkafka2.rb b/lib/fluent/plugin/out_rdkafka2.rb index 0fc18e7..ecc2791 100644 --- a/lib/fluent/plugin/out_rdkafka2.rb +++ b/lib/fluent/plugin/out_rdkafka2.rb @@ -126,6 +126,8 @@ class Fluent::Rdkafka2Output < Output config_param :max_enqueue_bytes_per_second, :size, :default => nil, :desc => 'The maximum number of enqueueing bytes per second' config_param :service_name, :string, :default => nil, :desc => 'Used for sasl.kerberos.service.name' + config_param :unrecoverable_error_types, :array, :default => [:topic_authorization_failed, :msg_size_too_large], + :desc => 'Handle some of the errors should be unrecoverable if specified' config_section :buffer do config_set_default :chunk_keys, ["topic"] @@ -522,7 +524,12 @@ def enqueue_with_retry(producer, topic, record_buf, message_key, partition, head raise e else - raise e + if unrecoverable_error_types.include?(e.code) + # some of the errors should be handled as an unrecoverable error + raise Fluent::UnrecoverableError, "Rejected due to #{e}" + else + raise e + end end end end