Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions lib/fluent/plugin/out_opensearch_data_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@ def configure(conf)
unless @use_placeholder
begin
@data_stream_names = [@data_stream_name]
create_index_template(@data_stream_name, @data_stream_template_name, @host)
create_data_stream(@data_stream_name)
retry_operate(@max_retry_putting_template,
@fail_on_putting_template_retry_exceed,
@catch_transport_exception_on_retry) do
create_index_template(@data_stream_name, @data_stream_template_name, @host)
create_data_stream(@data_stream_name)
end
rescue => e
raise Fluent::ConfigError, "Failed to create data stream: <#{@data_stream_name}> #{e.message}"
end
Expand Down
20 changes: 20 additions & 0 deletions test/plugin/test_out_opensearch_data_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ def stub_nonexistent_template?(name="foo_tpl")
stub_request(:get, "http://localhost:9200/_index_template/#{name}").to_return(:status => [404, OpenSearch::Transport::Transport::Errors::NotFound])
end

def stub_nonexistent_template_retry?(name="foo_tpl")
stub_request(:get, "http://localhost:9200/_index_template/#{name}").
to_return({ status: 500, body: 'Internal Server Error' }, { status: 404, body: '{}' })
end

def stub_bulk_feed(datastream_name="foo", template_name="foo_tpl")
stub_request(:post, "http://localhost:9200/#{datastream_name}/_bulk").with do |req|
# bulk data must be pair of OP and records
Expand Down Expand Up @@ -315,6 +320,21 @@ def test_datastream_configure
assert_equal "foo", driver(conf).instance.data_stream_name
end

def test_datastream_configure_retry
stub_elastic_info
stub_nonexistent_template_retry?
stub_index_template
stub_nonexistent_data_stream?
stub_data_stream
conf = config_element(
'ROOT', '', {
'@type' => OPENSEARCH_DATA_STREAM_TYPE,
'data_stream_name' => 'foo',
'data_stream_template_name' => "foo_tpl"
})
assert_equal "foo", driver(conf).instance.data_stream_name
end

def test_existent_data_stream
stub_index_template
stub_existent_data_stream?
Expand Down