Skip to content

Commit

Permalink
Merge pull request #496 from kubotat/enhancement/discard_kafka_delive…
Browse files Browse the repository at this point in the history
…ry_failed_regex

Enhancement : Add "discard_kafka_delivery_failed_regex" option
  • Loading branch information
ashie committed Oct 13, 2023
2 parents 6fa9ce2 + 3a9191c commit 20ba524
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 3 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,7 @@ You need to install rdkafka gem.
exclude_topic_key (bool) :default => false
exclude_partition_key (bool) :default => false
discard_kafka_delivery_failed (bool) :default => false (No discard)
discard_kafka_delivery_failed_regex (regexp) :default => nil (No discard)
use_event_time (bool) :default => false

# same with kafka2
Expand Down Expand Up @@ -559,6 +560,9 @@ You need to install rdkafka gem.
max_enqueue_bytes_per_second (integer) :default => nil
</match>

`rdkafka2` supports `discard_kafka_delivery_failed_regex` parameter:
- `discard_kafka_delivery_failed_regex` - default: nil - discard the record where the Kafka::DeliveryFailed occurred and the emitted message matches the given regex pattern, such as `/unknown_topic/`.

If you use v0.12, use `rdkafka` instead.

<match kafka.**>
Expand Down
11 changes: 8 additions & 3 deletions lib/fluent/plugin/out_rdkafka2.rb
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ class Fluent::Rdkafka2Output < Output
config_param :use_event_time, :bool, :default => false, :desc => 'Use fluentd event time for rdkafka timestamp'
config_param :max_send_limit_bytes, :size, :default => nil
config_param :discard_kafka_delivery_failed, :bool, :default => false
config_param :discard_kafka_delivery_failed_regex, :regexp, :default => nil
config_param :rdkafka_buffering_max_ms, :integer, :default => nil, :desc => 'Used for queue.buffering.max.ms'
config_param :rdkafka_buffering_max_messages, :integer, :default => nil, :desc => 'Used for queue.buffering.max.messages'
config_param :rdkafka_message_max_bytes, :integer, :default => nil, :desc => 'Used for message.max.bytes'
Expand Down Expand Up @@ -461,9 +462,13 @@ def write(chunk)
if @discard_kafka_delivery_failed
log.warn "Delivery failed. Discard events:", :error => e.to_s, :error_class => e.class.to_s, :tag => tag
else
log.warn "Send exception occurred: #{e} at #{e.backtrace.first}"
# Raise exception to retry sendind messages
raise e
if @discard_kafka_delivery_failed_regex != nil && @discard_kafka_delivery_failed_regex.match?(e.to_s)
log.warn "Delivery failed and matched regexp pattern #{@discard_kafka_delivery_failed_regex}. Discard events:", :error => e.to_s, :error_class => e.class.to_s, :tag => tag
else
log.warn "Send exception occurred: #{e} at #{e.backtrace.first}"
# Raise exception to retry sendind messages
raise e
end
end
ensure
@writing_threads_mutex.synchronize { @writing_threads.delete(Thread.current) }
Expand Down

0 comments on commit 20ba524

Please sign in to comment.