diff --git a/lib/fluent/plugin/opensearch_error_handler.rb b/lib/fluent/plugin/opensearch_error_handler.rb index 3d68178..5002866 100644 --- a/lib/fluent/plugin/opensearch_error_handler.rb +++ b/lib/fluent/plugin/opensearch_error_handler.rb @@ -65,11 +65,8 @@ def log_os_400_reason(&block) end end - def emit_error_label_event(&block) - # If `emit_error_label_event` is specified as false, error event emittions are not occurred. - if @plugin.emit_error_label_event - block.call - end + def emit_error_label_event? + !!@plugin.emit_error_label_event end def handle_error(response, tag, chunk, bulk_message_count, extracted_values) @@ -138,14 +135,14 @@ def handle_error(response, tag, chunk, bulk_message_count, extracted_values) reason += " [reason]: \'#{item[write_operation]['error']['reason']}\'" end end - emit_error_label_event do + if emit_error_label_event? @plugin.router.emit_error_event(tag, time, rawrecord, OpenSearchError.new("400 - Rejected by OpenSearch#{reason}")) end else if item[write_operation]['error'].is_a?(String) reason = item[write_operation]['error'] stats[:errors_block_resp] += 1 - emit_error_label_event do + if emit_error_label_event? @plugin.router.emit_error_event(tag, time, rawrecord, OpenSearchError.new("#{status} - #{reason}")) end next @@ -156,7 +153,7 @@ def handle_error(response, tag, chunk, bulk_message_count, extracted_values) raise OpenSearchRequestAbortError, "Rejected OpenSearch due to #{type}" end if unrecoverable_record_error?(type) - emit_error_label_event do + if emit_error_label_event? @plugin.router.emit_error_event(tag, time, rawrecord, OpenSearchError.new("#{status} - #{type}: #{reason}")) end next @@ -167,7 +164,7 @@ def handle_error(response, tag, chunk, bulk_message_count, extracted_values) # When we don't have a type field, something changed in the API # expected return values. stats[:errors_bad_resp] += 1 - emit_error_label_event do + if emit_error_label_event? @plugin.router.emit_error_event(tag, time, rawrecord, OpenSearchError.new("#{status} - No error type provided in the response")) end next diff --git a/lib/fluent/plugin/out_opensearch.rb b/lib/fluent/plugin/out_opensearch.rb index a6dce93..90a756b 100644 --- a/lib/fluent/plugin/out_opensearch.rb +++ b/lib/fluent/plugin/out_opensearch.rb @@ -465,11 +465,8 @@ def placeholder?(name, param) placeholder_validities.include?(true) end - def emit_error_label_event(&block) - # If `emit_error_label_event` is specified as false, error event emittions are not occurred. - if @emit_error_label_event - block.call - end + def emit_error_label_event? + !!@emit_error_label_event end def compression @@ -588,7 +585,7 @@ def convert_numeric_time_into_string(numeric_time, time_key_format = "%Y-%m-%d % def parse_time(value, event_time, tag) @time_parser.call(value) rescue => e - emit_error_label_event do + if emit_error_label_event? router.emit_error_event(@time_parse_error_tag, Fluent::Engine.now, {'tag' => tag, 'time' => event_time, 'format' => @time_key_format, 'value' => value}, e) end return Time.at(event_time).to_datetime @@ -882,7 +879,7 @@ def write(chunk) end end rescue => e - emit_error_label_event do + if emit_error_label_event? router.emit_error_event(tag, time, record, e) end end