Skip to content

Commit

Permalink
Merge pull request #28 from anders-swanson/anders/time-key
Browse files Browse the repository at this point in the history
Honor @time_key (data streams)
  • Loading branch information
cosmo0920 committed Mar 14, 2022
2 parents 0a36acf + 2a3fb51 commit 13c37ed
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 3 deletions.
12 changes: 10 additions & 2 deletions lib/fluent/plugin/out_opensearch_data_stream.rb
Expand Up @@ -191,9 +191,17 @@ def write(chunk)
tag = chunk.metadata.tag
chunk.msgpack_each do |time, record|
next unless record.is_a? Hash

begin
record.merge!({"@timestamp" => Time.at(time).iso8601(@time_precision)})
if record.has_key?(TIMESTAMP_FIELD)
rts = record[TIMESTAMP_FIELD]
dt = parse_time(rts, time, tag)
elsif record.has_key?(@time_key)
rts = record[@time_key]
dt = parse_time(rts, time, tag)
else
dt = Time.at(time).to_datetime
end
record.merge!({"@timestamp" => dt.iso8601(@time_precision)})
bulk_message = append_record_to_messages(CREATE_OP, {}, headers, record, bulk_message)
rescue => e
router.emit_error_event(tag, time, record, e)
Expand Down
72 changes: 71 additions & 1 deletion test/plugin/test_out_opensearch_data_stream.rb
Expand Up @@ -9,7 +9,7 @@ class OpenSearchOutputDataStreamTest < Test::Unit::TestCase
include FlexMock::TestCase
include Fluent::Test::Helpers

attr_accessor :bulk_records
attr_accessor :bulk_records, :index_cmds

OPENSEARCH_DATA_STREAM_TYPE = "opensearch_data_stream"

Expand Down Expand Up @@ -96,12 +96,14 @@ def stub_bulk_feed(datastream_name="foo", template_name="foo_tpl", url="http://l
# {"create": {}}\nhttp://localhost:9200/_data_stream/foo_bar
# {"@timestamp": ...}
@bulk_records += req.body.split("\n").size / 2
@index_cmds = req.body.split("\n").map {|r| JSON.parse(r) }
end
stub_request(:post, "http://#{url}#{template_name}/_bulk").with do |req|
# bulk data must be pair of OP and records
# {"create": {}}\nhttp://localhost:9200/_data_stream/foo_bar
# {"@timestamp": ...}
@bulk_records += req.body.split("\n").size / 2
@index_cmds = req.body.split("\n").map {|r| JSON.parse(r) }
end
end

Expand Down Expand Up @@ -591,4 +593,72 @@ def test_template_retry_install_fails

assert_equal(4, connection_resets)
end

def test_uses_custom_time_key
stub_default
stub_bulk_feed
conf = config_element(
'ROOT', '', {
'@type' => OPENSEARCH_DATA_STREAM_TYPE,
'data_stream_name' => 'foo',
'data_stream_template_name' => 'foo_tpl',
'time_key' => 'vtm'
})

ts = DateTime.new(2021,2,3).iso8601(9)
record = {
'vtm' => ts,
'message' => 'Sample Record'
}

driver(conf).run(default_tag: 'test') do
driver.feed(record)
end
assert(index_cmds[1].has_key? '@timestamp')
assert_equal(ts, index_cmds[1]['@timestamp'])
end

def test_uses_custom_time_key_with_format
stub_default
stub_bulk_feed
conf = config_element(
'ROOT', '', {
'@type' => OPENSEARCH_DATA_STREAM_TYPE,
'data_stream_name' => 'foo',
'data_stream_template_name' => 'foo_tpl',
'time_key' => 'vtm',
'time_key_format' => '%Y-%m-%d %H:%M:%S.%N%z'
})
ts = "2021-02-03 13:14:01.673+02:00"
record = {
'vtm' => ts,
'message' => 'Sample Record'
}
driver(conf).run(default_tag: 'test') do
driver.feed(record)
end
assert(index_cmds[1].has_key? '@timestamp')
assert_equal(DateTime.parse(ts).iso8601(9), index_cmds[1]['@timestamp'])
end

def test_record_no_timestamp
stub_default
stub_bulk_feed
stub_default
stub_bulk_feed
conf = config_element(
'ROOT', '', {
'@type' => OPENSEARCH_DATA_STREAM_TYPE,
'data_stream_name' => 'foo',
'data_stream_template_name' => 'foo_tpl'
})
record = {
'message' => 'Sample Record'
}
driver(conf).run(default_tag: 'test') do
driver.feed(record)
end
assert(index_cmds[1].has_key? '@timestamp')
end

end

0 comments on commit 13c37ed

Please sign in to comment.