diff --git a/lib/fluent/plugin/opensearch_index_template.rb b/lib/fluent/plugin/opensearch_index_template.rb index 0d07715..49832ad 100644 --- a/lib/fluent/plugin/opensearch_index_template.rb +++ b/lib/fluent/plugin/opensearch_index_template.rb @@ -62,10 +62,10 @@ def host_unreachable_exceptions client.transport.transport.host_unreachable_exceptions end - def retry_operate(max_retries, fail_on_retry_exceed = true, catch_trasport_exceptions = true) + def retry_operate(max_retries, fail_on_retry_exceed = true, catch_transport_exceptions = true) return unless block_given? retries = 0 - transport_errors = OpenSearch::Transport::Transport::Errors.constants.map{ |c| OpenSearch::Transport::Transport::Errors.const_get c } if catch_trasport_exceptions + transport_errors = OpenSearch::Transport::Transport::Errors.constants.map{ |c| OpenSearch::Transport::Transport::Errors.const_get c } if catch_transport_exceptions begin yield rescue *host_unreachable_exceptions, *transport_errors, Timeout::Error => e diff --git a/lib/fluent/plugin/out_opensearch_data_stream.rb b/lib/fluent/plugin/out_opensearch_data_stream.rb index 16581cc..2ea5d98 100644 --- a/lib/fluent/plugin/out_opensearch_data_stream.rb +++ b/lib/fluent/plugin/out_opensearch_data_stream.rb @@ -36,7 +36,6 @@ def configure(conf) @fail_on_putting_template_retry_exceed, @catch_transport_exception_on_retry) do create_index_template(@data_stream_name, @data_stream_template_name) - create_data_stream(@data_stream_name) end rescue => e raise Fluent::ConfigError, "Failed to create data stream: <#{@data_stream_name}> #{e.message}" @@ -105,19 +104,6 @@ def data_stream_exist?(datastream_name, host = nil) end end - def create_data_stream(datastream_name, host = nil) - return if data_stream_exist?(datastream_name, host) - params = { - name: datastream_name - } - retry_operate(@max_retry_putting_template, - @fail_on_putting_template_retry_exceed, - @catch_transport_exception_on_retry) do - # TODO: Use X-Pack equivalent performing DataStream operation method on the following line - client(host).perform_request('PUT', "/_data_stream/#{datastream_name}", {}, params) - end - end - def template_exists?(name, host = nil) if @use_legacy_template client(host).indices.get_template(:name => name) @@ -176,7 +162,6 @@ def write(chunk) unless @data_stream_names.include?(data_stream_name) begin create_index_template(data_stream_name, data_stream_template_name, host) - create_data_stream(data_stream_name, host) @data_stream_names << data_stream_name rescue => e raise Fluent::ConfigError, "Failed to create data stream: <#{data_stream_name}> #{e.message}"