Skip to content

Commit

Permalink
rewritten retry state with the above recommendations in mind.
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh  <trahterber@gmail.com>

Signed-off-by: OlehPalanskyi <Trahterber@gmail.com>
  • Loading branch information
OlehPalanskyi committed Apr 25, 2024
1 parent b9d00bc commit 6529888
Showing 1 changed file with 21 additions and 11 deletions.
32 changes: 21 additions & 11 deletions lib/fluent/plugin/in_opensearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def configure(conf)

@timestamp_parser = create_time_parser
@backend_options = backend_options
@retry = retry_state(@retry_randomize)
@retry = nil

raise Fluent::ConfigError, "`password` must be present if `user` is present" if @user && @password.nil?

Expand Down Expand Up @@ -337,6 +337,23 @@ def is_existing_connection(host)
return true
end

def update_retry_state(error=nil)
if error
unless @retry
@retry = retry_state(@retry_randomize)
end
@retry.step
#Raise error if the retry limit has been reached
raise "Hit limit for retries. retry_times: #{@retry.steps}, error: #{error.message}" if @retry.limit?
#Retry if the limit hasn't been reached
log.warn("failed to connect or search.", retry_times: @retry.steps, next_retry_time: @retry.next_time.round, error: error.message)
sleep(@retry.next_time - Time.now)
else
log.debug("retry succeeded.") unless @retry.nil?
@retry = nil unless @retry.nil?
end
end

def run
return run_slice if @num_slices <= 1

Expand All @@ -347,13 +364,8 @@ def run
run_slice(slice_id)
end
end
rescue Faraday::ConnectionFailed, OpenSearch::Transport::Transport::Error => e
@retry.step
#Raise error if the retry limit has been reached
raise "Hit limit for retries. retry_times: #{@retry.steps}, error: #{e.message}" if @retry.limit?
#Retry if the retry limit hasn't been reached
log.warn("failed to connect or search.", retry_times: @retry.steps, next_retry_time: @retry.next_time.round, error: e.message)
sleep(@retry.next_time - Time.now)
rescue Faraday::ConnectionFailed, OpenSearch::Transport::Transport::Error => error
update_retry_state(error)
retry
end

Expand All @@ -375,9 +387,7 @@ def run_slice(slice_id=nil)

router.emit_stream(@tag, es)
clear_scroll(scroll_id)
#reset steps and next_time if our function successful ends
@retry.instance_variable_set(:@steps, 0)
@retry.instance_variable_set(:@next_time, nil)
update_retry_state
end

def clear_scroll(scroll_id)
Expand Down

0 comments on commit 6529888

Please sign in to comment.