From 2a3fb51a763f74bb1bbfc174a851ed187ab73d4d Mon Sep 17 00:00:00 2001 From: Anders Swanson Date: Thu, 3 Mar 2022 09:52:28 -0800 Subject: [PATCH] Honor @time_key (data streams) Signed-off-by: Anders Swanson --- .../plugin/out_opensearch_data_stream.rb | 12 +++- .../plugin/test_out_opensearch_data_stream.rb | 72 ++++++++++++++++++- 2 files changed, 81 insertions(+), 3 deletions(-) diff --git a/lib/fluent/plugin/out_opensearch_data_stream.rb b/lib/fluent/plugin/out_opensearch_data_stream.rb index 5100140..eea7146 100644 --- a/lib/fluent/plugin/out_opensearch_data_stream.rb +++ b/lib/fluent/plugin/out_opensearch_data_stream.rb @@ -191,9 +191,17 @@ def write(chunk) tag = chunk.metadata.tag chunk.msgpack_each do |time, record| next unless record.is_a? Hash - begin - record.merge!({"@timestamp" => Time.at(time).iso8601(@time_precision)}) + if record.has_key?(TIMESTAMP_FIELD) + rts = record[TIMESTAMP_FIELD] + dt = parse_time(rts, time, tag) + elsif record.has_key?(@time_key) + rts = record[@time_key] + dt = parse_time(rts, time, tag) + else + dt = Time.at(time).to_datetime + end + record.merge!({"@timestamp" => dt.iso8601(@time_precision)}) bulk_message = append_record_to_messages(CREATE_OP, {}, headers, record, bulk_message) rescue => e router.emit_error_event(tag, time, record, e) diff --git a/test/plugin/test_out_opensearch_data_stream.rb b/test/plugin/test_out_opensearch_data_stream.rb index 959dcd9..3b2e499 100644 --- a/test/plugin/test_out_opensearch_data_stream.rb +++ b/test/plugin/test_out_opensearch_data_stream.rb @@ -9,7 +9,7 @@ class OpenSearchOutputDataStreamTest < Test::Unit::TestCase include FlexMock::TestCase include Fluent::Test::Helpers - attr_accessor :bulk_records + attr_accessor :bulk_records, :index_cmds OPENSEARCH_DATA_STREAM_TYPE = "opensearch_data_stream" @@ -96,12 +96,14 @@ def stub_bulk_feed(datastream_name="foo", template_name="foo_tpl", url="http://l # {"create": {}}\nhttp://localhost:9200/_data_stream/foo_bar # {"@timestamp": ...} @bulk_records += req.body.split("\n").size / 2 + @index_cmds = req.body.split("\n").map {|r| JSON.parse(r) } end stub_request(:post, "http://#{url}#{template_name}/_bulk").with do |req| # bulk data must be pair of OP and records # {"create": {}}\nhttp://localhost:9200/_data_stream/foo_bar # {"@timestamp": ...} @bulk_records += req.body.split("\n").size / 2 + @index_cmds = req.body.split("\n").map {|r| JSON.parse(r) } end end @@ -591,4 +593,72 @@ def test_template_retry_install_fails assert_equal(4, connection_resets) end + + def test_uses_custom_time_key + stub_default + stub_bulk_feed + conf = config_element( + 'ROOT', '', { + '@type' => OPENSEARCH_DATA_STREAM_TYPE, + 'data_stream_name' => 'foo', + 'data_stream_template_name' => 'foo_tpl', + 'time_key' => 'vtm' + }) + + ts = DateTime.new(2021,2,3).iso8601(9) + record = { + 'vtm' => ts, + 'message' => 'Sample Record' + } + + driver(conf).run(default_tag: 'test') do + driver.feed(record) + end + assert(index_cmds[1].has_key? '@timestamp') + assert_equal(ts, index_cmds[1]['@timestamp']) + end + + def test_uses_custom_time_key_with_format + stub_default + stub_bulk_feed + conf = config_element( + 'ROOT', '', { + '@type' => OPENSEARCH_DATA_STREAM_TYPE, + 'data_stream_name' => 'foo', + 'data_stream_template_name' => 'foo_tpl', + 'time_key' => 'vtm', + 'time_key_format' => '%Y-%m-%d %H:%M:%S.%N%z' + }) + ts = "2021-02-03 13:14:01.673+02:00" + record = { + 'vtm' => ts, + 'message' => 'Sample Record' + } + driver(conf).run(default_tag: 'test') do + driver.feed(record) + end + assert(index_cmds[1].has_key? '@timestamp') + assert_equal(DateTime.parse(ts).iso8601(9), index_cmds[1]['@timestamp']) + end + + def test_record_no_timestamp + stub_default + stub_bulk_feed + stub_default + stub_bulk_feed + conf = config_element( + 'ROOT', '', { + '@type' => OPENSEARCH_DATA_STREAM_TYPE, + 'data_stream_name' => 'foo', + 'data_stream_template_name' => 'foo_tpl' + }) + record = { + 'message' => 'Sample Record' + } + driver(conf).run(default_tag: 'test') do + driver.feed(record) + end + assert(index_cmds[1].has_key? '@timestamp') + end + end