diff --git a/lib/fluent/plugin/out_opensearch_data_stream.rb b/lib/fluent/plugin/out_opensearch_data_stream.rb index 6418031..805baf4 100644 --- a/lib/fluent/plugin/out_opensearch_data_stream.rb +++ b/lib/fluent/plugin/out_opensearch_data_stream.rb @@ -160,7 +160,11 @@ def write(chunk) data_stream_template_name = @data_stream_template_name host = nil if @use_placeholder - host = extract_placeholders(@host, chunk) + host = if @hosts + extract_placeholders(@hosts, chunk) + else + extract_placeholders(@host, chunk) + end data_stream_name = extract_placeholders(@data_stream_name, chunk) data_stream_template_name = extract_placeholders(@data_stream_template_name, chunk) unless @data_stream_names.include?(data_stream_name) diff --git a/test/plugin/test_out_opensearch_data_stream.rb b/test/plugin/test_out_opensearch_data_stream.rb index 742eb47..6f9bc41 100644 --- a/test/plugin/test_out_opensearch_data_stream.rb +++ b/test/plugin/test_out_opensearch_data_stream.rb @@ -118,6 +118,18 @@ def stub_default(datastream_name="foo", template_name="foo_tpl", host="http://lo stub_data_stream(datastream_name) end + def stub_opensearch_with_store_index_command_counts(url="http://localhost:9200/_bulk") + if @index_command_counts == nil + @index_command_counts = {} + @index_command_counts.default = 0 + end + + stub_request(:post, url).with do |req| + index_cmds = req.body.split("\n").map {|r| JSON.parse(r) } + @index_command_counts[url] += index_cmds.size + end + end + # ref. https://opensearch.org/docs/latest/opensearch/data-streams/ class DataStreamNameTest < self @@ -484,13 +496,56 @@ def test_bulk_insert_feed '@type' => OPENSEARCH_DATA_STREAM_TYPE, 'data_stream_name' => 'foo', 'data_stream_template_name' => 'foo_tpl' - }) + }) driver(conf).run(default_tag: 'test') do driver.feed(sample_record) end assert_equal 1, @bulk_records end + def test_placeholder_writes_to_multi_hosts + stub_default("foo_bar", "foo_tpl_bar") + hosts = [['192.168.33.50', 9201], ['192.168.33.51', 9201], ['192.168.33.52', 9201]] + hosts_string = hosts.map {|x| "#{x[0]}:#{x[1]}"}.compact.join(',') + hosts.each do |host_info| + host, port = host_info + stub_opensearch_with_store_index_command_counts("http://#{host}:#{port}/foo_bar/_bulk") + stub_opensearch_info("http://#{host}:#{port}/") + stub_request(:get, "http://#{host}:#{port}/_data_stream/foo_bar"). + to_return(status: 200, body: "", headers: {}) + end + + conf = config_element( + 'ROOT', '', { + '@type' => OPENSEARCH_DATA_STREAM_TYPE, + 'data_stream_name' => 'foo_${key1}', + 'data_stream_template_name' => 'foo_tpl_${key1}', + 'hosts' => "#{hosts_string}" + }, [config_element('buffer', 'tag,key1', { + 'timekey' => '1d' + }, [])]) + driver(conf).run(default_tag: 'test') do + hashes = { + 'age' => rand(100), + 'key1' => 'bar' + } + 1000.times do + driver.feed(sample_record.merge(hashes)) + end + end + + # @note: we cannot make multi chunks with options (flush_interval, buffer_chunk_limit) + # it's Fluentd test driver's constraint + # so @index_command_counts.size is always 1 + assert(@index_command_counts.size > 0, "not working with hosts options") + + total = 0 + @index_command_counts.each do |_, count| + total += count + end + assert_equal(2000, total) + end + def test_template_retry_install_fails cwd = File.dirname(__FILE__) template_file = File.join(cwd, 'test_index_template.json')