Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion lib/fluent/plugin/out_opensearch_data_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,11 @@ def write(chunk)
data_stream_template_name = @data_stream_template_name
host = nil
if @use_placeholder
host = extract_placeholders(@host, chunk)
host = if @hosts
extract_placeholders(@hosts, chunk)
else
extract_placeholders(@host, chunk)
end
data_stream_name = extract_placeholders(@data_stream_name, chunk)
data_stream_template_name = extract_placeholders(@data_stream_template_name, chunk)
unless @data_stream_names.include?(data_stream_name)
Expand Down
57 changes: 56 additions & 1 deletion test/plugin/test_out_opensearch_data_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,18 @@ def stub_default(datastream_name="foo", template_name="foo_tpl", host="http://lo
stub_data_stream(datastream_name)
end

def stub_opensearch_with_store_index_command_counts(url="http://localhost:9200/_bulk")
if @index_command_counts == nil
@index_command_counts = {}
@index_command_counts.default = 0
end

stub_request(:post, url).with do |req|
index_cmds = req.body.split("\n").map {|r| JSON.parse(r) }
@index_command_counts[url] += index_cmds.size
end
end

# ref. https://opensearch.org/docs/latest/opensearch/data-streams/
class DataStreamNameTest < self

Expand Down Expand Up @@ -484,13 +496,56 @@ def test_bulk_insert_feed
'@type' => OPENSEARCH_DATA_STREAM_TYPE,
'data_stream_name' => 'foo',
'data_stream_template_name' => 'foo_tpl'
})
})
driver(conf).run(default_tag: 'test') do
driver.feed(sample_record)
end
assert_equal 1, @bulk_records
end

def test_placeholder_writes_to_multi_hosts
stub_default("foo_bar", "foo_tpl_bar")
hosts = [['192.168.33.50', 9201], ['192.168.33.51', 9201], ['192.168.33.52', 9201]]
hosts_string = hosts.map {|x| "#{x[0]}:#{x[1]}"}.compact.join(',')
hosts.each do |host_info|
host, port = host_info
stub_opensearch_with_store_index_command_counts("http://#{host}:#{port}/foo_bar/_bulk")
stub_opensearch_info("http://#{host}:#{port}/")
stub_request(:get, "http://#{host}:#{port}/_data_stream/foo_bar").
to_return(status: 200, body: "", headers: {})
end

conf = config_element(
'ROOT', '', {
'@type' => OPENSEARCH_DATA_STREAM_TYPE,
'data_stream_name' => 'foo_${key1}',
'data_stream_template_name' => 'foo_tpl_${key1}',
'hosts' => "#{hosts_string}"
}, [config_element('buffer', 'tag,key1', {
'timekey' => '1d'
}, [])])
driver(conf).run(default_tag: 'test') do
hashes = {
'age' => rand(100),
'key1' => 'bar'
}
1000.times do
driver.feed(sample_record.merge(hashes))
end
end

# @note: we cannot make multi chunks with options (flush_interval, buffer_chunk_limit)
# it's Fluentd test driver's constraint
# so @index_command_counts.size is always 1
assert(@index_command_counts.size > 0, "not working with hosts options")

total = 0
@index_command_counts.each do |_, count|
total += count
end
assert_equal(2000, total)
end

def test_template_retry_install_fails
cwd = File.dirname(__FILE__)
template_file = File.join(cwd, 'test_index_template.json')
Expand Down