Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions README.OpenSearchInput.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
+ [reload_on_failure](#reload_on_failure)
+ [resurrect_after](#resurrect_after)
+ [with_transporter_log](#with_transporter_log)
+ [emit_error_label_event](#emit-error-label-event)
+ [Client/host certificate options](#clienthost-certificate-options)
+ [sniffer_class_name](#sniffer-class-name)
+ [custom_headers](#custom_headers)
Expand Down Expand Up @@ -190,6 +191,18 @@ We recommend to set this true if you start to debug this plugin.
with_transporter_log true
```

### emit_error_label_event

Default `emit_error_label_event` value is `true`.

Emitting error label events is default behavior.

When using the followin configuration, OpenSearch plugin will cut error events on error handler:

```aconf
emit_error_label_event false
```

### Client/host certificate options

Need to verify OpenSearch's certificate? You can use the following parameter to specify a CA instead of using an environment variable.
Expand Down
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ Send your logs to OpenSearch (and search them with OpenSearch Dashboards maybe?)
+ [reload_after](#reload-after)
+ [validate_client_version](#validate-client-version)
+ [unrecoverable_error_types](#unrecoverable-error-types)
+ [emit_error_label_event](#emit-error-label-event)
+ [verify os version at startup](#verify_os_version_at_startup)
+ [default_opensearch_version](#default_opensearch_version)
+ [custom_headers](#custom_headers)
Expand Down Expand Up @@ -1097,6 +1098,18 @@ Then, remove `rejected_execution_exception` from `unrecoverable_error_types` par
unrecoverable_error_types ["out_of_memory_error"]
```

### emit_error_label_event

Default `emit_error_label_event` value is `true`.

Emitting error label events is default behavior.

When using the followin configuration, OpenSearch plugin will cut error events on error handler:

```aconf
emit_error_label_event false
```

### verify_os_version_at_startup

Because OpenSearch plugin will ought to change behavior each of OpenSearch major versions.
Expand Down
12 changes: 11 additions & 1 deletion lib/fluent/plugin/in_opensearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class UnrecoverableRequestFailure < Fluent::UnrecoverableError; end
config_param :ca_file, :string, :default => nil
config_param :ssl_version, :enum, list: [:SSLv23, :TLSv1, :TLSv1_1, :TLSv1_2], :default => :TLSv1_2
config_param :with_transporter_log, :bool, :default => false
config_param :emit_error_label_event, :bool, :default => true
config_param :sniffer_class_name, :string, :default => nil
config_param :custom_headers, :hash, :default => {}
config_param :docinfo_fields, :array, :default => ['_index', '_type', '_id']
Expand Down Expand Up @@ -180,6 +181,13 @@ def get_connection_options(con_host=nil)
}
end

def emit_error_label_event(&block)
# If `emit_error_label_event` is specified as false, error event emittions are not occurred.
if emit_error_label_event
block.call
end
end

def start
super

Expand Down Expand Up @@ -224,7 +232,9 @@ def convert_numeric_time_into_string(numeric_time, timestamp_key_format = "%Y-%m
def parse_time(value, event_time, tag)
@timestamp_parser.call(value)
rescue => e
router.emit_error_event(@timestamp_parse_error_tag, Fluent::Engine.now, {'tag' => tag, 'time' => event_time, 'format' => @timestamp_key_format, 'value' => value}, e)
emit_error_label_event do
router.emit_error_event(@timestamp_parse_error_tag, Fluent::Engine.now, {'tag' => tag, 'time' => event_time, 'format' => @timestamp_key_format, 'value' => value}, e)
end
return Time.at(event_time).to_time
end

Expand Down
23 changes: 19 additions & 4 deletions lib/fluent/plugin/opensearch_error_handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ def log_os_400_reason(&block)
end
end

def emit_error_label_event(&block)
# If `emit_error_label_event` is specified as false, error event emittions are not occurred.
if @plugin.emit_error_label_event
block.call
end
end

def handle_error(response, tag, chunk, bulk_message_count, extracted_values)
items = response['items']
if items.nil? || !items.is_a?(Array)
Expand Down Expand Up @@ -127,12 +134,16 @@ def handle_error(response, tag, chunk, bulk_message_count, extracted_values)
reason += " [reason]: \'#{item[write_operation]['error']['reason']}\'"
end
end
@plugin.router.emit_error_event(tag, time, rawrecord, OpenSearchError.new("400 - Rejected by OpenSearch#{reason}"))
emit_error_label_event do
@plugin.router.emit_error_event(tag, time, rawrecord, OpenSearchError.new("400 - Rejected by OpenSearch#{reason}"))
end
else
if item[write_operation]['error'].is_a?(String)
reason = item[write_operation]['error']
stats[:errors_block_resp] += 1
@plugin.router.emit_error_event(tag, time, rawrecord, OpenSearchError.new("#{status} - #{reason}"))
emit_error_label_event do
@plugin.router.emit_error_event(tag, time, rawrecord, OpenSearchError.new("#{status} - #{reason}"))
end
next
elsif item[write_operation].has_key?('error') && item[write_operation]['error'].has_key?('type')
type = item[write_operation]['error']['type']
Expand All @@ -141,7 +152,9 @@ def handle_error(response, tag, chunk, bulk_message_count, extracted_values)
raise OpenSearchRequestAbortError, "Rejected OpenSearch due to #{type}"
end
if unrecoverable_record_error?(type)
@plugin.router.emit_error_event(tag, time, rawrecord, OpenSearchError.new("#{status} - #{type}: #{reason}"))
emit_error_label_event do
@plugin.router.emit_error_event(tag, time, rawrecord, OpenSearchError.new("#{status} - #{type}: #{reason}"))
end
next
else
retry_stream.add(time, rawrecord) unless unrecoverable_record_error?(type)
Expand All @@ -150,7 +163,9 @@ def handle_error(response, tag, chunk, bulk_message_count, extracted_values)
# When we don't have a type field, something changed in the API
# expected return values.
stats[:errors_bad_resp] += 1
@plugin.router.emit_error_event(tag, time, rawrecord, OpenSearchError.new("#{status} - No error type provided in the response"))
emit_error_label_event do
@plugin.router.emit_error_event(tag, time, rawrecord, OpenSearchError.new("#{status} - No error type provided in the response"))
end
next
end
stats[type] += 1
Expand Down
16 changes: 14 additions & 2 deletions lib/fluent/plugin/out_opensearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ def initialize(retry_stream)
config_param :validate_client_version, :bool, :default => false
config_param :prefer_oj_serializer, :bool, :default => false
config_param :unrecoverable_error_types, :array, :default => ["out_of_memory_error", "rejected_execution_exception"]
config_param :emit_error_label_event, :bool, :default => true
config_param :verify_os_version_at_startup, :bool, :default => true
config_param :default_opensearch_version, :integer, :default => DEFAULT_OPENSEARCH_VERSION
config_param :log_os_400_reason, :bool, :default => false
Expand Down Expand Up @@ -462,6 +463,13 @@ def placeholder?(name, param)
placeholder_validities.include?(true)
end

def emit_error_label_event(&block)
# If `emit_error_label_event` is specified as false, error event emittions are not occurred.
if @emit_error_label_event
block.call
end
end

def compression
!(@compression_level == :no_compression)
end
Expand Down Expand Up @@ -578,7 +586,9 @@ def convert_numeric_time_into_string(numeric_time, time_key_format = "%Y-%m-%d %
def parse_time(value, event_time, tag)
@time_parser.call(value)
rescue => e
router.emit_error_event(@time_parse_error_tag, Fluent::Engine.now, {'tag' => tag, 'time' => event_time, 'format' => @time_key_format, 'value' => value}, e)
emit_error_label_event do
router.emit_error_event(@time_parse_error_tag, Fluent::Engine.now, {'tag' => tag, 'time' => event_time, 'format' => @time_key_format, 'value' => value}, e)
end
return Time.at(event_time).to_datetime
end

Expand Down Expand Up @@ -870,7 +880,9 @@ def write(chunk)
end
end
rescue => e
router.emit_error_event(tag, time, record, e)
emit_error_label_event do
router.emit_error_event(tag, time, record, e)
end
end
end

Expand Down
4 changes: 3 additions & 1 deletion lib/fluent/plugin/out_opensearch_data_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,9 @@ def write(chunk)
record.merge!({"@timestamp" => Time.at(time).iso8601(@time_precision)})
bulk_message = append_record_to_messages(CREATE_OP, {}, headers, record, bulk_message)
rescue => e
router.emit_error_event(tag, time, record, e)
emit_error_label_event do
router.emit_error_event(tag, time, record, e)
end
end
end

Expand Down
79 changes: 79 additions & 0 deletions test/plugin/test_opensearch_error_handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@ class TestPlugin
attr_accessor :unrecoverable_error_types
attr_accessor :log_os_400_reason
attr_accessor :write_operation
attr_accessor :emit_error_label_event
def initialize(log, log_os_400_reason = false)
@log = log
@write_operation = 'index'
@error_events = []
@unrecoverable_error_types = ["out_of_memory_error", "rejected_execution_exception"]
@log_os_400_reason = log_os_400_reason
@emit_error_label_event = true
end

def router
Expand Down Expand Up @@ -135,6 +137,44 @@ def test_400_responses_reason_log
end
end

class TEST400ResponseReasonWithoutErrorLog < self
def setup
Fluent::Test.setup
@log_device = Fluent::Test::DummyLogDevice.new
dl_opts = {:log_level => ServerEngine::DaemonLogger::DEBUG}
logger = ServerEngine::DaemonLogger.new(@log_device, dl_opts)
@log = Fluent::Log.new(logger)
@plugin = TestPlugin.new(@log)
@handler = Fluent::Plugin::OpenSearchErrorHandler.new(@plugin)
@plugin.emit_error_label_event = false
end

def test_400_responses_reason_log
records = [{time: 123, record: {"foo" => "bar", '_id' => 'abc'}}]
response = parse_response(%({
"took" : 0,
"errors" : true,
"items" : [
{
"create" : {
"_index" : "foo",
"status" : 400,
"error" : {
"type" : "mapper_parsing_exception",
"reason" : "failed to parse"
}
}
}
]
}))
chunk = MockChunk.new(records)
dummy_extracted_values = []
@handler.handle_error(response, 'atag', chunk, records.length, dummy_extracted_values)
assert_equal(0, @plugin.error_events.size)
assert_true(@plugin.error_events.empty?)
end
end

class TEST400ResponseReasonNoDebug < self
def setup
Fluent::Test.setup
Expand Down Expand Up @@ -177,6 +217,45 @@ def test_400_responses_reason_log
end
end

class TEST400ResponseReasonNoDebugAndNoErrorLog < self
def setup
Fluent::Test.setup
@log_device = Fluent::Test::DummyLogDevice.new
dl_opts = {:log_level => ServerEngine::DaemonLogger::INFO}
logger = ServerEngine::DaemonLogger.new(@log_device, dl_opts)
@log = Fluent::Log.new(logger)
@plugin = TestPlugin.new(@log)
@handler = Fluent::Plugin::OpenSearchErrorHandler.new(@plugin)
@plugin.log_os_400_reason = true
@plugin.emit_error_label_event = false
end

def test_400_responses_reason_log
records = [{time: 123, record: {"foo" => "bar", '_id' => 'abc'}}]
response = parse_response(%({
"took" : 0,
"errors" : true,
"items" : [
{
"create" : {
"_index" : "foo",
"status" : 400,
"error" : {
"type" : "mapper_parsing_exception",
"reason" : "failed to parse"
}
}
}
]
}))
chunk = MockChunk.new(records)
dummy_extracted_values = []
@handler.handle_error(response, 'atag', chunk, records.length, dummy_extracted_values)
assert_equal(0, @plugin.error_events.size)
assert_true(@plugin.error_events.empty?)
end
end

def test_nil_items_responses
records = [{time: 123, record: {"foo" => "bar", '_id' => 'abc'}}]
response = parse_response(%({
Expand Down