From 2bfa8d8c41eca8df2608021c7b98e3a5b5e50e03 Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Tue, 1 Mar 2022 10:07:08 +0900 Subject: [PATCH 1/8] retry_state: Fix wrong calcuration of interval The correct total retry time should be: c + c * b^1 + (...) + c*b^(k - 1) But the previous implementation was: c + c * b^0 + c * b^1 + (...) + c*b^(k - 1) where: * c: constant factor, @retry_wait * b: base factor, @retry_exponential_backoff_base * k: number of retry times, @max_retry_times Although the first retry interval is already added at constructor of ExponentialBackoffRetry, same value is added again unexpectedly on the first calc_interval call (as `c * b^0`). Signed-off-by: Takuro Ashie --- lib/fluent/plugin_helper/retry_state.rb | 8 +-- .../plugin/test_output_as_buffered_retries.rb | 4 +- test/plugin_helper/test_retry_state.rb | 61 +++++++++---------- 3 files changed, 35 insertions(+), 38 deletions(-) diff --git a/lib/fluent/plugin_helper/retry_state.rb b/lib/fluent/plugin_helper/retry_state.rb index 7851fd61ea..7a7f2b22ad 100644 --- a/lib/fluent/plugin_helper/retry_state.rb +++ b/lib/fluent/plugin_helper/retry_state.rb @@ -98,7 +98,7 @@ def calc_next_time naive end elsif @current == :secondary - naive = naive_next_time(@steps - @secondary_transition_steps + 1) + naive = naive_next_time(@steps - @secondary_transition_steps) if naive >= @timeout_at @timeout_at else @@ -159,13 +159,13 @@ def naive_next_time(retry_next_times) def calc_max_retry_timeout(max_steps) result = 0 max_steps.times { |i| - result += calc_interval(i + 1) + result += calc_interval(i) } result end def calc_interval(num) - interval = raw_interval(num - 1) + interval = raw_interval(num) if @max_interval && interval > @max_interval @max_interval else @@ -175,7 +175,7 @@ def calc_interval(num) # Calculate previous finite value to avoid inf related errors. If this re-computing is heavy, use cache. until interval.finite? num -= 1 - interval = raw_interval(num - 1) + interval = raw_interval(num) end interval end diff --git a/test/plugin/test_output_as_buffered_retries.rb b/test/plugin/test_output_as_buffered_retries.rb index f3a08e877f..3954513795 100644 --- a/test/plugin/test_output_as_buffered_retries.rb +++ b/test/plugin/test_output_as_buffered_retries.rb @@ -140,13 +140,13 @@ def get_log_time(msg, logs) retry_state = @i.retry_state( @i.buffer_config.retry_randomize ) retry_state.step - assert_equal 1, (retry_state.next_time - now) - retry_state.step assert_equal (1 * (2 ** 1)), (retry_state.next_time - now) retry_state.step assert_equal (1 * (2 ** 2)), (retry_state.next_time - now) retry_state.step assert_equal (1 * (2 ** 3)), (retry_state.next_time - now) + retry_state.step + assert_equal (1 * (2 ** 4)), (retry_state.next_time - now) end test 'does retries correctly when #write fails' do diff --git a/test/plugin_helper/test_retry_state.rb b/test/plugin_helper/test_retry_state.rb index 4edd41bdf9..a6cd7300dc 100644 --- a/test/plugin_helper/test_retry_state.rb +++ b/test/plugin_helper/test_retry_state.rb @@ -202,7 +202,7 @@ class Dummy < Fluent::Plugin::TestBase while i < 300 s.step assert_equal i, s.steps - assert_equal (dummy_current_time + 0.1 * (2 ** (i - 1))), s.next_time + assert_equal (dummy_current_time + 0.1 * (2 ** i)), s.next_time assert !s.limit? i += 1 end @@ -218,22 +218,22 @@ class Dummy < Fluent::Plugin::TestBase assert_equal 0, s.steps assert_equal (dummy_current_time + 0.1), s.next_time - # 0.1 * (2 ** (10 - 1)) == 0.1 * 2 ** 9 == 51.2 - # 0.1 * (2 ** (11 - 1)) == 0.1 * 2 ** 10 == 102.4 + # 0.1 * 2 ** 9 == 51.2 + # 0.1 * 2 ** 10 == 102.4 i = 1 - while i < 11 + while i < 10 s.step assert_equal i, s.steps - assert_equal (dummy_current_time + 0.1 * (2 ** (i - 1))), s.next_time, "start:#{dummy_current_time}, i:#{i}" + assert_equal (dummy_current_time + 0.1 * (2 ** i)), s.next_time, "start:#{dummy_current_time}, i:#{i}" i += 1 end s.step - assert_equal 11, s.steps + assert_equal 10, s.steps assert_equal (dummy_current_time + 100), s.next_time s.step - assert_equal 12, s.steps + assert_equal 11, s.steps assert_equal (dummy_current_time + 100), s.next_time end @@ -249,30 +249,25 @@ class Dummy < Fluent::Plugin::TestBase assert_equal 0, s.steps assert_equal (dummy_current_time + 1), s.next_time - # 1 + 1 + 2 + 4 (=8) + # 1 + 2 + 4 (=7) override_current_time(s, s.next_time) s.step assert_equal 1, s.steps - assert_equal (s.current_time + 1), s.next_time - - override_current_time(s, s.next_time) - s.step - assert_equal 2, s.steps assert_equal (s.current_time + 2), s.next_time override_current_time(s, s.next_time) s.step - assert_equal 3, s.steps + assert_equal 2, s.steps assert_equal (s.current_time + 4), s.next_time assert !s.limit? - # + 8 (=16) > 12 + # + 8 (=15) > 12 override_current_time(s, s.next_time) s.step - assert_equal 4, s.steps + assert_equal 3, s.steps assert_equal s.timeout_at, s.next_time assert s.limit? @@ -293,24 +288,24 @@ class Dummy < Fluent::Plugin::TestBase override_current_time(s, s.next_time) s.step assert_equal 1, s.steps - assert_equal (s.current_time + 1), s.next_time + assert_equal (s.current_time + 2), s.next_time override_current_time(s, s.next_time) s.step assert_equal 2, s.steps - assert_equal (s.current_time + 2), s.next_time + assert_equal (s.current_time + 4), s.next_time override_current_time(s, s.next_time) s.step assert_equal 3, s.steps - assert_equal (s.current_time + 4), s.next_time + assert_equal (s.current_time + 8), s.next_time assert !s.limit? override_current_time(s, s.next_time) s.step assert_equal 4, s.steps - assert_equal (s.current_time + 8), s.next_time + assert_equal (s.current_time + 10), s.next_time assert !s.limit? @@ -341,40 +336,42 @@ class Dummy < Fluent::Plugin::TestBase assert_equal (dummy_current_time + 1), s.next_time assert !s.secondary? - # 1, 1(2), 2(4), 4(8), 8(16), 16(32), 32(64), (80), (81), (83), (87), (95), (100) + # primary: 3, 7, 15, 31, 63, 80 (timeout * threashold) + # secondary: 81, 83, 87, 95, 100 i = 1 - while i < 7 + while i < 6 override_current_time(s, s.next_time) assert !s.secondary? s.step assert_equal i, s.steps - assert_equal (s.current_time + 1 * (2 ** (i - 1))), s.next_time + assert_equal (s.current_time + 1 * (2 ** i)), s.next_time assert !s.limit? i += 1 end - assert_equal 7, i - override_current_time(s, s.next_time) # 64 + assert_equal 6, i + override_current_time(s, s.next_time) # 63 assert !s.secondary? s.step - assert_equal 7, s.steps + assert_equal 6, s.steps assert_equal s.secondary_transition_at, s.next_time assert !s.limit? i += 1 - assert_equal 8, i + assert_equal 7, i override_current_time(s, s.next_time) # 80 assert s.secondary? s.step - assert_equal 8, s.steps + assert_equal 7, s.steps assert_equal s.steps, s.secondary_transition_steps - assert_equal (s.secondary_transition_at + 1.0), s.next_time + assert_equal (s.secondary_transition_at + 1.0), s.next_time # 81 assert !s.limit? + assert_equal :secondary, s.current - # 81, 82, 84, 88, 96, 100 + # 83, 87, 95, 100 j = 1 while j < 4 override_current_time(s, s.next_time) @@ -382,14 +379,14 @@ class Dummy < Fluent::Plugin::TestBase assert_equal :secondary, s.current s.step - assert_equal (8 + j), s.steps + assert_equal (7 + j), s.steps assert_equal (s.current_time + (1 * (2 ** j))), s.next_time assert !s.limit?, "j:#{j}" j += 1 end assert_equal 4, j - override_current_time(s, s.next_time) # 96 + override_current_time(s, s.next_time) # 95 assert s.secondary? s.step From adae64a7432411d594018c5ac9f0d8048c6f9b44 Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Wed, 23 Mar 2022 17:38:21 +0900 Subject: [PATCH 2/8] Fix wrong retry limit detection The previous implementation detects @max_retry_times + 1 as limit, but the limit should be @max_retry_times. Signed-off-by: Takuro Ashie --- lib/fluent/plugin/output.rb | 30 ++++++++++++------- lib/fluent/plugin_helper/retry_state.rb | 17 +++++++++-- .../plugin/test_output_as_buffered_retries.rb | 10 +++---- 3 files changed, 39 insertions(+), 18 deletions(-) diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 54dbbeb5e0..6feb030c91 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -1275,29 +1275,23 @@ def update_retry_state(chunk_id, using_secondary, error = nil) unless @retry @retry = retry_state(@buffer_config.retry_randomize) + if @retry.limit? - # @retry_max_times == 0, fail imediately by the following block + handle_limit_reached(error) else if error log.warn "failed to flush the buffer.", retry_times: @retry.steps, next_retry_time: @retry.next_time.round, chunk: chunk_id_hex, error: error log.warn_backtrace error.backtrace end - return end + + return end # @retry exists if @retry.limit? - if error - records = @buffer.queued_records - msg = "failed to flush the buffer, and hit limit for retries. dropping all chunks in the buffer queue." - log.error msg, retry_times: @retry.steps, records: records, error: error - log.error_backtrace error.backtrace - end - @buffer.clear_queue! - log.debug "buffer queue cleared" - @retry = nil + handle_limit_reached(error) else # Ensure that the current time is greater than or equal to @retry.next_time to avoid the situation when # @retry.step is called almost as many times as the number of flush threads in a short time. @@ -1318,9 +1312,23 @@ def update_retry_state(chunk_id, using_secondary, error = nil) end end end + + handle_limit_reached(error) if @retry && @retry.limit_step? end end + def handle_limit_reached(error) + if error + records = @buffer.queued_records + msg = "Hit limit for retries. dropping all chunks in the buffer queue." + log.error msg, retry_times: @retry.steps, records: records, error: error + log.error_backtrace error.backtrace + end + @buffer.clear_queue! + log.debug "buffer queue cleared" + @retry = nil + end + def retry_state(randomize) if @secondary retry_state_create( diff --git a/lib/fluent/plugin_helper/retry_state.rb b/lib/fluent/plugin_helper/retry_state.rb index 7a7f2b22ad..29bcf8c157 100644 --- a/lib/fluent/plugin_helper/retry_state.rb +++ b/lib/fluent/plugin_helper/retry_state.rb @@ -131,11 +131,24 @@ def recalc_next_time @next_time = calc_next_time end - def limit? + # Use @next_time for time by default to keep backward compatibility + def limit?(time: @next_time, steps: @steps) + timeout?(time) || limit_step?(steps) + end + + def timeout?(time = current_time) + if @forever + false + else + time >= @timeout_at + end + end + + def limit_step?(steps = @steps) if @forever false else - @next_time >= @timeout_at || !!(@max_steps && @steps >= @max_steps) + !!(@max_steps && steps >= @max_steps) end end end diff --git a/test/plugin/test_output_as_buffered_retries.rb b/test/plugin/test_output_as_buffered_retries.rb index 3954513795..1fd0307500 100644 --- a/test/plugin/test_output_as_buffered_retries.rb +++ b/test/plugin/test_output_as_buffered_retries.rb @@ -332,7 +332,7 @@ def get_log_time(msg, logs) @i.emit_events("test.tag.3", dummy_event_stream()) logs = @i.log.out.logs - assert{ logs.any?{|l| l.include?("[error]: failed to flush the buffer, and hit limit for retries. dropping all chunks in the buffer queue.") } } + assert{ logs.any?{|l| l.include?("[error]: Hit limit for retries. dropping all chunks in the buffer queue.") } } end test 'output plugin give retries up by retry_max_times, and clear queue in buffer' do @@ -409,7 +409,7 @@ def get_log_time(msg, logs) @i.emit_events("test.tag.3", dummy_event_stream()) logs = @i.log.out.logs - assert{ logs.any?{|l| l.include?("[error]: failed to flush the buffer, and hit limit for retries. dropping all chunks in the buffer queue.") && l.include?("retry_times=10") } } + assert{ logs.any?{|l| l.include?("[error]: Hit limit for retries. dropping all chunks in the buffer queue.") && l.include?("retry_times=10") } } assert{ @i.buffer.queue.size == 0 } assert{ @i.buffer.stage.size == 1 } @@ -607,7 +607,7 @@ def get_log_time(msg, logs) logs = @i.log.out.logs target_time = Time.parse("2016-04-13 18:35:31 -0700") - target_msg = "[error]: failed to flush the buffer, and hit limit for retries. dropping all chunks in the buffer queue." + target_msg = "[error]: Hit limit for retries. dropping all chunks in the buffer queue." assert{ logs.any?{|l| l.include?(target_msg) } } log_time = get_log_time(target_msg, logs) @@ -695,7 +695,7 @@ def get_log_time(msg, logs) @i.emit_events("test.tag.3", dummy_event_stream()) logs = @i.log.out.logs - assert{ logs.any?{|l| l.include?("[error]: failed to flush the buffer, and hit limit for retries. dropping all chunks in the buffer queue.") && l.include?("retry_times=10") } } + assert{ logs.any?{|l| l.include?("[error]: Hit limit for retries. dropping all chunks in the buffer queue.") && l.include?("retry_times=10") } } assert{ @i.buffer.queue.size == 0 } assert{ @i.buffer.stage.size == 1 } @@ -743,7 +743,7 @@ def get_log_time(msg, logs) assert(@i.write_count == 1) assert(@i.num_errors == 1) - assert(@i.log.out.logs.any?{|l| l.include?("[error]: failed to flush the buffer, and hit limit for retries. dropping all chunks in the buffer queue.") && l.include?("retry_times=0") }) + assert(@i.log.out.logs.any?{|l| l.include?("[error]: Hit limit for retries. dropping all chunks in the buffer queue.") && l.include?("retry_times=0") }) assert(@i.buffer.queue.size == 0) assert(@i.buffer.stage.size == 1) assert(@i.buffer.queue.all?{|c| c.empty? }) From 7e29932b6a02b72501ae05d1ae87710afcecb59c Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Wed, 23 Mar 2022 17:53:14 +0900 Subject: [PATCH 3/8] Simplify handling limit steps of retry Signed-off-by: Takuro Ashie --- lib/fluent/plugin/output.rb | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 6feb030c91..0db896fd4c 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -1311,9 +1311,8 @@ def update_retry_state(chunk_id, using_secondary, error = nil) log.warn_backtrace error.backtrace end end + handle_limit_reached(error) if @retry.limit_step? end - - handle_limit_reached(error) if @retry && @retry.limit_step? end end From a2cbb42df09b85be4a29863fd0dc23ad51254710 Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Wed, 23 Mar 2022 18:14:09 +0900 Subject: [PATCH 4/8] test_output_as_buffered_secondary: Follow a recent change Signed-off-by: Takuro Ashie --- test/plugin/test_output_as_buffered_secondary.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/plugin/test_output_as_buffered_secondary.rb b/test/plugin/test_output_as_buffered_secondary.rb index ae774b7455..b12bb154a3 100644 --- a/test/plugin/test_output_as_buffered_secondary.rb +++ b/test/plugin/test_output_as_buffered_secondary.rb @@ -874,7 +874,7 @@ def dummy_event_stream end logs = @i.log.out.logs - assert{ logs.any?{|l| l.include?("[error]: failed to flush the buffer, and hit limit for retries. dropping all chunks in the buffer queue.") } } + assert{ logs.any?{|l| l.include?("[error]: Hit limit for retries. dropping all chunks in the buffer queue.") } } assert{ now >= first_failure + 60 } end From 09f0aa5eded3544525acd741269d7dab6fd6eaaf Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Thu, 24 Mar 2022 12:50:34 +0900 Subject: [PATCH 5/8] Add scenario test of RetryState RetryStatement behavior is complicated, so scenario tests may be useful. Signed-off-by: Daijiro Fukuda --- test/plugin_helper/test_retry_state.rb | 570 +++++++++++++++++++++++++ 1 file changed, 570 insertions(+) diff --git a/test/plugin_helper/test_retry_state.rb b/test/plugin_helper/test_retry_state.rb index a6cd7300dc..824151c3a8 100644 --- a/test/plugin_helper/test_retry_state.rb +++ b/test/plugin_helper/test_retry_state.rb @@ -18,6 +18,21 @@ class Dummy < Fluent::Plugin::TestBase helpers :retry_state end + class RetryRecord + attr_reader :retry_count, :elapsed_sec, :is_secondary + def initialize(retry_count, elapsed_sec, is_secondary) + @retry_count = retry_count # This is Nth retryment + @elapsed_sec = elapsed_sec + @is_secondary = is_secondary + end + + def ==(obj) + @retry_count == obj.retry_count && + @elapsed_sec == obj.elapsed_sec && + @is_secondary == obj.is_secondary + end + end + setup do @d = Dummy.new end @@ -436,4 +451,559 @@ class Dummy < Fluent::Plugin::TestBase end end end + + sub_test_case "ExponentialBackOff_ScenarioTests" do + data("Simple timeout", { + timeout: 100, max_steps: nil, max_interval: nil, use_sec: false, sec_thres: 0.8, wait: 1, backoff_base: 2, + expected: [ + RetryRecord.new(1, 1, false), + RetryRecord.new(2, 3, false), + RetryRecord.new(3, 7, false), + RetryRecord.new(4, 15, false), + RetryRecord.new(5, 31, false), + RetryRecord.new(6, 63, false), + RetryRecord.new(7, 100, false), + ], + }) + data("Simple timeout with secondary", { + timeout: 100, max_steps: nil, max_interval: nil, use_sec: true, sec_thres: 0.8, wait: 1, backoff_base: 2, + expected: [ + RetryRecord.new(1, 1, false), + RetryRecord.new(2, 3, false), + RetryRecord.new(3, 7, false), + RetryRecord.new(4, 15, false), + RetryRecord.new(5, 31, false), + RetryRecord.new(6, 63, false), + RetryRecord.new(7, 80, true), + RetryRecord.new(8, 81, true), + RetryRecord.new(9, 83, true), + RetryRecord.new(10, 87, true), + RetryRecord.new(11, 95, true), + RetryRecord.new(12, 100, true), + ], + }) + data("Simple timeout with custom wait and backoff_base", { + timeout: 1000, max_steps: nil, max_interval: nil, use_sec: false, sec_thres: 0.8, wait: 2, backoff_base: 3, + expected: [ + RetryRecord.new(1, 2, false), + RetryRecord.new(2, 8, false), + RetryRecord.new(3, 26, false), + RetryRecord.new(4, 80, false), + RetryRecord.new(5, 242, false), + RetryRecord.new(6, 728, false), + RetryRecord.new(7, 1000, false), + ], + }) + data("Simple timeout with custom wait and backoff_base and secondary", { + timeout: 1000, max_steps: nil, max_interval: nil, use_sec: true, sec_thres: 0.8, wait: 2, backoff_base: 3, + expected: [ + RetryRecord.new(1, 2, false), + RetryRecord.new(2, 8, false), + RetryRecord.new(3, 26, false), + RetryRecord.new(4, 80, false), + RetryRecord.new(5, 242, false), + RetryRecord.new(6, 728, false), + RetryRecord.new(7, 800, true), + RetryRecord.new(8, 802, true), + RetryRecord.new(9, 808, true), + RetryRecord.new(10, 826, true), + RetryRecord.new(11, 880, true), + RetryRecord.new(12, 1000, true), + ], + }) + data("Default timeout", { + timeout: 72*3600, max_steps: nil, max_interval: nil, use_sec: false, sec_thres: 0.8, wait: 1, backoff_base: 2, + expected: [ + RetryRecord.new(1, 1, false), + RetryRecord.new(2, 3, false), + RetryRecord.new(3, 7, false), + RetryRecord.new(4, 15, false), + RetryRecord.new(5, 31, false), + RetryRecord.new(6, 63, false), + RetryRecord.new(7, 127, false), + RetryRecord.new(8, 255, false), + RetryRecord.new(9, 511, false), + RetryRecord.new(10, 1023, false), + RetryRecord.new(11, 2047, false), + RetryRecord.new(12, 4095, false), + RetryRecord.new(13, 8191, false), + RetryRecord.new(14, 16383, false), + RetryRecord.new(15, 32767, false), + RetryRecord.new(16, 65535, false), + RetryRecord.new(17, 131071, false), + RetryRecord.new(18, 259200, false), + ], + }) + data("Default timeout with secondary", { + timeout: 72*3600, max_steps: nil, max_interval: nil, use_sec: true, sec_thres: 0.8, wait: 1, backoff_base: 2, + expected: [ + RetryRecord.new(1, 1, false), + RetryRecord.new(2, 3, false), + RetryRecord.new(3, 7, false), + RetryRecord.new(4, 15, false), + RetryRecord.new(5, 31, false), + RetryRecord.new(6, 63, false), + RetryRecord.new(7, 127, false), + RetryRecord.new(8, 255, false), + RetryRecord.new(9, 511, false), + RetryRecord.new(10, 1023, false), + RetryRecord.new(11, 2047, false), + RetryRecord.new(12, 4095, false), + RetryRecord.new(13, 8191, false), + RetryRecord.new(14, 16383, false), + RetryRecord.new(15, 32767, false), + RetryRecord.new(16, 65535, false), + RetryRecord.new(17, 131071, false), + RetryRecord.new(18, 207360, true), + RetryRecord.new(19, 207361, true), + RetryRecord.new(20, 207363, true), + RetryRecord.new(21, 207367, true), + RetryRecord.new(22, 207375, true), + RetryRecord.new(23, 207391, true), + RetryRecord.new(24, 207423, true), + RetryRecord.new(25, 207487, true), + RetryRecord.new(26, 207615, true), + RetryRecord.new(27, 207871, true), + RetryRecord.new(28, 208383, true), + RetryRecord.new(29, 209407, true), + RetryRecord.new(30, 211455, true), + RetryRecord.new(31, 215551, true), + RetryRecord.new(32, 223743, true), + RetryRecord.new(33, 240127, true), + RetryRecord.new(34, 259200, true), + ], + }) + data("Default timeout with secondary and custom threshold", { + timeout: 72*3600, max_steps: nil, max_interval: nil, use_sec: true, sec_thres: 0.5, wait: 1, backoff_base: 2, + expected: [ + RetryRecord.new(1, 1, false), + RetryRecord.new(2, 3, false), + RetryRecord.new(3, 7, false), + RetryRecord.new(4, 15, false), + RetryRecord.new(5, 31, false), + RetryRecord.new(6, 63, false), + RetryRecord.new(7, 127, false), + RetryRecord.new(8, 255, false), + RetryRecord.new(9, 511, false), + RetryRecord.new(10, 1023, false), + RetryRecord.new(11, 2047, false), + RetryRecord.new(12, 4095, false), + RetryRecord.new(13, 8191, false), + RetryRecord.new(14, 16383, false), + RetryRecord.new(15, 32767, false), + RetryRecord.new(16, 65535, false), + RetryRecord.new(17, 129600, true), + RetryRecord.new(18, 129601, true), + RetryRecord.new(19, 129603, true), + RetryRecord.new(20, 129607, true), + RetryRecord.new(21, 129615, true), + RetryRecord.new(22, 129631, true), + RetryRecord.new(23, 129663, true), + RetryRecord.new(24, 129727, true), + RetryRecord.new(25, 129855, true), + RetryRecord.new(26, 130111, true), + RetryRecord.new(27, 130623, true), + RetryRecord.new(28, 131647, true), + RetryRecord.new(29, 133695, true), + RetryRecord.new(30, 137791, true), + RetryRecord.new(31, 145983, true), + RetryRecord.new(32, 162367, true), + RetryRecord.new(33, 195135, true), + RetryRecord.new(34, 259200, true), + ], + }) + data("Simple max_steps", { + timeout: 72*3600, max_steps: 10, max_interval: nil, use_sec: false, sec_thres: 0.8, wait: 1, backoff_base: 2, + expected: [ + RetryRecord.new(1, 1, false), + RetryRecord.new(2, 3, false), + RetryRecord.new(3, 7, false), + RetryRecord.new(4, 15, false), + RetryRecord.new(5, 31, false), + RetryRecord.new(6, 63, false), + RetryRecord.new(7, 127, false), + RetryRecord.new(8, 255, false), + RetryRecord.new(9, 511, false), + RetryRecord.new(10, 1023, false), + ], + }) + data("Simple max_steps with secondary", { + timeout: 72*3600, max_steps: 10, max_interval: nil, use_sec: true, sec_thres: 0.8, wait: 1, backoff_base: 2, + expected: [ + RetryRecord.new(1, 1, false), + RetryRecord.new(2, 3, false), + RetryRecord.new(3, 7, false), + RetryRecord.new(4, 15, false), + RetryRecord.new(5, 31, false), + RetryRecord.new(6, 63, false), + RetryRecord.new(7, 127, false), + RetryRecord.new(8, 255, false), + RetryRecord.new(9, 511, false), + RetryRecord.new(10, 818, true), + ], + }) + data("Simple interval", { + timeout: 72*3600, max_steps: nil, max_interval: 3600, use_sec: false, sec_thres: 0.8, wait: 1, backoff_base: 2, + expected: [ + RetryRecord.new(1, 1, false), + RetryRecord.new(2, 3, false), + RetryRecord.new(3, 7, false), + RetryRecord.new(4, 15, false), + RetryRecord.new(5, 31, false), + RetryRecord.new(6, 63, false), + RetryRecord.new(7, 127, false), + RetryRecord.new(8, 255, false), + RetryRecord.new(9, 511, false), + RetryRecord.new(10, 1023, false), + RetryRecord.new(11, 2047, false), + RetryRecord.new(12, 4095, false), + RetryRecord.new(13, 7695, false), + RetryRecord.new(14, 11295, false), + RetryRecord.new(15, 14895, false), + RetryRecord.new(16, 18495, false), + RetryRecord.new(17, 22095, false), + RetryRecord.new(18, 25695, false), + RetryRecord.new(19, 29295, false), + RetryRecord.new(20, 32895, false), + RetryRecord.new(21, 36495, false), + RetryRecord.new(22, 40095, false), + RetryRecord.new(23, 43695, false), + RetryRecord.new(24, 47295, false), + RetryRecord.new(25, 50895, false), + RetryRecord.new(26, 54495, false), + RetryRecord.new(27, 58095, false), + RetryRecord.new(28, 61695, false), + RetryRecord.new(29, 65295, false), + RetryRecord.new(30, 68895, false), + RetryRecord.new(31, 72495, false), + RetryRecord.new(32, 76095, false), + RetryRecord.new(33, 79695, false), + RetryRecord.new(34, 83295, false), + RetryRecord.new(35, 86895, false), + RetryRecord.new(36, 90495, false), + RetryRecord.new(37, 94095, false), + RetryRecord.new(38, 97695, false), + RetryRecord.new(39, 101295, false), + RetryRecord.new(40, 104895, false), + RetryRecord.new(41, 108495, false), + RetryRecord.new(42, 112095, false), + RetryRecord.new(43, 115695, false), + RetryRecord.new(44, 119295, false), + RetryRecord.new(45, 122895, false), + RetryRecord.new(46, 126495, false), + RetryRecord.new(47, 130095, false), + RetryRecord.new(48, 133695, false), + RetryRecord.new(49, 137295, false), + RetryRecord.new(50, 140895, false), + RetryRecord.new(51, 144495, false), + RetryRecord.new(52, 148095, false), + RetryRecord.new(53, 151695, false), + RetryRecord.new(54, 155295, false), + RetryRecord.new(55, 158895, false), + RetryRecord.new(56, 162495, false), + RetryRecord.new(57, 166095, false), + RetryRecord.new(58, 169695, false), + RetryRecord.new(59, 173295, false), + RetryRecord.new(60, 176895, false), + RetryRecord.new(61, 180495, false), + RetryRecord.new(62, 184095, false), + RetryRecord.new(63, 187695, false), + RetryRecord.new(64, 191295, false), + RetryRecord.new(65, 194895, false), + RetryRecord.new(66, 198495, false), + RetryRecord.new(67, 202095, false), + RetryRecord.new(68, 205695, false), + RetryRecord.new(69, 209295, false), + RetryRecord.new(70, 212895, false), + RetryRecord.new(71, 216495, false), + RetryRecord.new(72, 220095, false), + RetryRecord.new(73, 223695, false), + RetryRecord.new(74, 227295, false), + RetryRecord.new(75, 230895, false), + RetryRecord.new(76, 234495, false), + RetryRecord.new(77, 238095, false), + RetryRecord.new(78, 241695, false), + RetryRecord.new(79, 245295, false), + RetryRecord.new(80, 248895, false), + RetryRecord.new(81, 252495, false), + RetryRecord.new(82, 256095, false), + RetryRecord.new(83, 259200, false), + ], + }) + data("Simple interval with secondary", { + timeout: 72*3600, max_steps: nil, max_interval: 3600, use_sec: true, sec_thres: 0.8, wait: 1, backoff_base: 2, + expected: [ + RetryRecord.new(1, 1, false), + RetryRecord.new(2, 3, false), + RetryRecord.new(3, 7, false), + RetryRecord.new(4, 15, false), + RetryRecord.new(5, 31, false), + RetryRecord.new(6, 63, false), + RetryRecord.new(7, 127, false), + RetryRecord.new(8, 255, false), + RetryRecord.new(9, 511, false), + RetryRecord.new(10, 1023, false), + RetryRecord.new(11, 2047, false), + RetryRecord.new(12, 4095, false), + RetryRecord.new(13, 7695, false), + RetryRecord.new(14, 11295, false), + RetryRecord.new(15, 14895, false), + RetryRecord.new(16, 18495, false), + RetryRecord.new(17, 22095, false), + RetryRecord.new(18, 25695, false), + RetryRecord.new(19, 29295, false), + RetryRecord.new(20, 32895, false), + RetryRecord.new(21, 36495, false), + RetryRecord.new(22, 40095, false), + RetryRecord.new(23, 43695, false), + RetryRecord.new(24, 47295, false), + RetryRecord.new(25, 50895, false), + RetryRecord.new(26, 54495, false), + RetryRecord.new(27, 58095, false), + RetryRecord.new(28, 61695, false), + RetryRecord.new(29, 65295, false), + RetryRecord.new(30, 68895, false), + RetryRecord.new(31, 72495, false), + RetryRecord.new(32, 76095, false), + RetryRecord.new(33, 79695, false), + RetryRecord.new(34, 83295, false), + RetryRecord.new(35, 86895, false), + RetryRecord.new(36, 90495, false), + RetryRecord.new(37, 94095, false), + RetryRecord.new(38, 97695, false), + RetryRecord.new(39, 101295, false), + RetryRecord.new(40, 104895, false), + RetryRecord.new(41, 108495, false), + RetryRecord.new(42, 112095, false), + RetryRecord.new(43, 115695, false), + RetryRecord.new(44, 119295, false), + RetryRecord.new(45, 122895, false), + RetryRecord.new(46, 126495, false), + RetryRecord.new(47, 130095, false), + RetryRecord.new(48, 133695, false), + RetryRecord.new(49, 137295, false), + RetryRecord.new(50, 140895, false), + RetryRecord.new(51, 144495, false), + RetryRecord.new(52, 148095, false), + RetryRecord.new(53, 151695, false), + RetryRecord.new(54, 155295, false), + RetryRecord.new(55, 158895, false), + RetryRecord.new(56, 162495, false), + RetryRecord.new(57, 166095, false), + RetryRecord.new(58, 169695, false), + RetryRecord.new(59, 173295, false), + RetryRecord.new(60, 176895, false), + RetryRecord.new(61, 180495, false), + RetryRecord.new(62, 184095, false), + RetryRecord.new(63, 187695, false), + RetryRecord.new(64, 191295, false), + RetryRecord.new(65, 194895, false), + RetryRecord.new(66, 198495, false), + RetryRecord.new(67, 202095, false), + RetryRecord.new(68, 205695, false), + RetryRecord.new(69, 207360, true), + RetryRecord.new(70, 207361, true), + RetryRecord.new(71, 207363, true), + RetryRecord.new(72, 207367, true), + RetryRecord.new(73, 207375, true), + RetryRecord.new(74, 207391, true), + RetryRecord.new(75, 207423, true), + RetryRecord.new(76, 207487, true), + RetryRecord.new(77, 207615, true), + RetryRecord.new(78, 207871, true), + RetryRecord.new(79, 208383, true), + RetryRecord.new(80, 209407, true), + RetryRecord.new(81, 211455, true), + RetryRecord.new(82, 215055, true), + RetryRecord.new(83, 218655, true), + RetryRecord.new(84, 222255, true), + RetryRecord.new(85, 225855, true), + RetryRecord.new(86, 229455, true), + RetryRecord.new(87, 233055, true), + RetryRecord.new(88, 236655, true), + RetryRecord.new(89, 240255, true), + RetryRecord.new(90, 243855, true), + RetryRecord.new(91, 247455, true), + RetryRecord.new(92, 251055, true), + RetryRecord.new(93, 254655, true), + RetryRecord.new(94, 258255, true), + RetryRecord.new(95, 259200, true), + ], + }) + data("Max_steps and max_interval", { + timeout: 72*3600, max_steps: 30, max_interval: 3600, use_sec: false, sec_thres: 0.8, wait: 1, backoff_base: 2, + expected: [ + RetryRecord.new(1, 1, false), + RetryRecord.new(2, 3, false), + RetryRecord.new(3, 7, false), + RetryRecord.new(4, 15, false), + RetryRecord.new(5, 31, false), + RetryRecord.new(6, 63, false), + RetryRecord.new(7, 127, false), + RetryRecord.new(8, 255, false), + RetryRecord.new(9, 511, false), + RetryRecord.new(10, 1023, false), + RetryRecord.new(11, 2047, false), + RetryRecord.new(12, 4095, false), + RetryRecord.new(13, 7695, false), + RetryRecord.new(14, 11295, false), + RetryRecord.new(15, 14895, false), + RetryRecord.new(16, 18495, false), + RetryRecord.new(17, 22095, false), + RetryRecord.new(18, 25695, false), + RetryRecord.new(19, 29295, false), + RetryRecord.new(20, 32895, false), + RetryRecord.new(21, 36495, false), + RetryRecord.new(22, 40095, false), + RetryRecord.new(23, 43695, false), + RetryRecord.new(24, 47295, false), + RetryRecord.new(25, 50895, false), + RetryRecord.new(26, 54495, false), + RetryRecord.new(27, 58095, false), + RetryRecord.new(28, 61695, false), + RetryRecord.new(29, 65295, false), + RetryRecord.new(30, 68895, false), + ], + }) + data("Max_steps and max_interval with secondary", { + timeout: 72*3600, max_steps: 30, max_interval: 3600, use_sec: true, sec_thres: 0.8, wait: 1, backoff_base: 2, + expected: [ + RetryRecord.new(1, 1, false), + RetryRecord.new(2, 3, false), + RetryRecord.new(3, 7, false), + RetryRecord.new(4, 15, false), + RetryRecord.new(5, 31, false), + RetryRecord.new(6, 63, false), + RetryRecord.new(7, 127, false), + RetryRecord.new(8, 255, false), + RetryRecord.new(9, 511, false), + RetryRecord.new(10, 1023, false), + RetryRecord.new(11, 2047, false), + RetryRecord.new(12, 4095, false), + RetryRecord.new(13, 7695, false), + RetryRecord.new(14, 11295, false), + RetryRecord.new(15, 14895, false), + RetryRecord.new(16, 18495, false), + RetryRecord.new(17, 22095, false), + RetryRecord.new(18, 25695, false), + RetryRecord.new(19, 29295, false), + RetryRecord.new(20, 32895, false), + RetryRecord.new(21, 36495, false), + RetryRecord.new(22, 40095, false), + RetryRecord.new(23, 43695, false), + RetryRecord.new(24, 47295, false), + RetryRecord.new(25, 50895, false), + RetryRecord.new(26, 54495, false), + RetryRecord.new(27, 55116, true), + RetryRecord.new(28, 55117, true), + RetryRecord.new(29, 55119, true), + RetryRecord.new(30, 55123, true), + ], + }) + data("Max_steps and max_interval with timeout", { + timeout: 10000, max_steps: 30, max_interval: 1000, use_sec: false, sec_thres: 0.8, wait: 1, backoff_base: 2, + expected: [ + RetryRecord.new(1, 1, false), + RetryRecord.new(2, 3, false), + RetryRecord.new(3, 7, false), + RetryRecord.new(4, 15, false), + RetryRecord.new(5, 31, false), + RetryRecord.new(6, 63, false), + RetryRecord.new(7, 127, false), + RetryRecord.new(8, 255, false), + RetryRecord.new(9, 511, false), + RetryRecord.new(10, 1023, false), + RetryRecord.new(11, 2023, false), + RetryRecord.new(12, 3023, false), + RetryRecord.new(13, 4023, false), + RetryRecord.new(14, 5023, false), + RetryRecord.new(15, 6023, false), + RetryRecord.new(16, 7023, false), + RetryRecord.new(17, 8023, false), + RetryRecord.new(18, 9023, false), + RetryRecord.new(19, 10000, false), + ], + }) + data("Max_steps and max_interval with timeout and secondary", { + timeout: 10000, max_steps: 30, max_interval: 1000, use_sec: true, sec_thres: 0.8, wait: 1, backoff_base: 2, + expected: [ + RetryRecord.new(1, 1, false), + RetryRecord.new(2, 3, false), + RetryRecord.new(3, 7, false), + RetryRecord.new(4, 15, false), + RetryRecord.new(5, 31, false), + RetryRecord.new(6, 63, false), + RetryRecord.new(7, 127, false), + RetryRecord.new(8, 255, false), + RetryRecord.new(9, 511, false), + RetryRecord.new(10, 1023, false), + RetryRecord.new(11, 2023, false), + RetryRecord.new(12, 3023, false), + RetryRecord.new(13, 4023, false), + RetryRecord.new(14, 5023, false), + RetryRecord.new(15, 6023, false), + RetryRecord.new(16, 7023, false), + RetryRecord.new(17, 8000, true), + RetryRecord.new(18, 8001, true), + RetryRecord.new(19, 8003, true), + RetryRecord.new(20, 8007, true), + RetryRecord.new(21, 8015, true), + RetryRecord.new(22, 8031, true), + RetryRecord.new(23, 8063, true), + RetryRecord.new(24, 8127, true), + RetryRecord.new(25, 8255, true), + RetryRecord.new(26, 8511, true), + RetryRecord.new(27, 9023, true), + RetryRecord.new(28, 10000, true), + ], + }) + test "exponential backoff with senario" do |data| + print_for_debug = false # change this value true if need to see msg always. + trying_count = 1000 # just for avoiding infinite loop + + retry_records = [] + msg = "" + + s = @d.retry_state_create( + :t15, :exponential_backoff, data[:wait], data[:timeout], + max_steps: data[:max_steps], max_interval: data[:max_interval], + secondary: data[:use_sec], secondary_threshold: data[:sec_thres], + backoff_base: data[:backoff_base], randomize: false + ) + override_current_time(s, s.start) + + retry_count = 0 + trying_count.times do + next_elapsed = (s.next_time - s.start).to_i + + msg << "step: #{s.steps}, next: #{next_elapsed}s (#{next_elapsed / 3600}h)\n" + + # Wait until next time to trigger the next retry + override_current_time(s, s.next_time) + + # Retry will be triggered at this point. + retry_count += 1 + rec = RetryRecord.new(retry_count, next_elapsed, s.secondary?) + retry_records.append(rec) + msg << "[#{next_elapsed}s elapsed point] #{retry_count}th-Retry(#{s.secondary? ? "SEC" : "PRI"}) is triggered.\n" + + # Update retry statement + if s.limit? + msg << "--- Reach limit of timeout. ---\n" + break + end + + s.step + + if s.limit_step? + msg << "--- Reach limit of max step. ---\n" + break + end + end + + assert_equal(data[:expected], retry_records, msg) + + print(msg) if print_for_debug + end + end end From 4819ccfe24ae74022a488f045842a8f965b6ca94 Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Thu, 24 Mar 2022 14:10:08 +0900 Subject: [PATCH 6/8] Fix timeout handling Add 2 states to control timeout. Signed-off-by: Daijiro Fukuda --- lib/fluent/plugin/output.rb | 16 +++++++-------- lib/fluent/plugin_helper/retry_state.rb | 27 +++++++++++-------------- test/plugin_helper/test_retry_state.rb | 23 +++++++++------------ 3 files changed, 30 insertions(+), 36 deletions(-) diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 0db896fd4c..45290fda64 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -1290,16 +1290,17 @@ def update_retry_state(chunk_id, using_secondary, error = nil) # @retry exists + # Ensure that the current time is greater than or equal to @retry.next_time to avoid the situation when + # @retry.step is called almost as many times as the number of flush threads in a short time. + if Time.now >= @retry.next_time + @retry.step + else + @retry.recalc_next_time # to prevent all flush threads from retrying at the same time + end + if @retry.limit? handle_limit_reached(error) else - # Ensure that the current time is greater than or equal to @retry.next_time to avoid the situation when - # @retry.step is called almost as many times as the number of flush threads in a short time. - if Time.now >= @retry.next_time - @retry.step - else - @retry.recalc_next_time # to prevent all flush threads from retrying at the same time - end if error if using_secondary msg = "failed to flush the buffer with secondary output." @@ -1311,7 +1312,6 @@ def update_retry_state(chunk_id, using_secondary, error = nil) log.warn_backtrace error.backtrace end end - handle_limit_reached(error) if @retry.limit_step? end end end diff --git a/lib/fluent/plugin_helper/retry_state.rb b/lib/fluent/plugin_helper/retry_state.rb index 29bcf8c157..8816773d95 100644 --- a/lib/fluent/plugin_helper/retry_state.rb +++ b/lib/fluent/plugin_helper/retry_state.rb @@ -44,6 +44,8 @@ def initialize(title, wait, timeout, forever, max_steps, randomize, randomize_wi @timeout = timeout @timeout_at = @start + timeout + @has_reached_timeout = false + @has_timeouted = false @current = :primary if randomize_width < 0 || randomize_width > 0.5 @@ -123,7 +125,15 @@ def step @current = :secondary @secondary_transition_steps = @steps end + @next_time = calc_next_time + + unless @has_reached_timeout + @has_reached_timeout = @next_time >= @timeout_at + else + @has_timeouted = @next_time >= @timeout_at + end + nil end @@ -131,24 +141,11 @@ def recalc_next_time @next_time = calc_next_time end - # Use @next_time for time by default to keep backward compatibility - def limit?(time: @next_time, steps: @steps) - timeout?(time) || limit_step?(steps) - end - - def timeout?(time = current_time) - if @forever - false - else - time >= @timeout_at - end - end - - def limit_step?(steps = @steps) + def limit? if @forever false else - !!(@max_steps && steps >= @max_steps) + @has_timeouted || !!(@max_steps && @steps >= @max_steps) end end end diff --git a/test/plugin_helper/test_retry_state.rb b/test/plugin_helper/test_retry_state.rb index 824151c3a8..b61e5e35db 100644 --- a/test/plugin_helper/test_retry_state.rb +++ b/test/plugin_helper/test_retry_state.rb @@ -90,6 +90,7 @@ def ==(obj) override_current_time(s, s.next_time) s.step assert_equal s.timeout_at, s.next_time + s.step assert s.limit? end @@ -115,7 +116,6 @@ def ==(obj) assert_equal 5, i override_current_time(s, s.next_time) s.step - assert_equal (s.current_time + 3), s.next_time assert s.limit? end @@ -179,7 +179,9 @@ def ==(obj) assert s.secondary? s.step - assert_equal s.timeout_at, s.next_time + assert_equal s.timeout_at, s.next_time # 100 + + s.step assert s.limit? end @@ -285,6 +287,7 @@ def ==(obj) assert_equal 3, s.steps assert_equal s.timeout_at, s.next_time + s.step assert s.limit? end @@ -334,8 +337,6 @@ def ==(obj) override_current_time(s, s.next_time) s.step assert_equal 6, s.steps - assert_equal (s.current_time + 10), s.next_time - assert s.limit? end @@ -405,7 +406,9 @@ def ==(obj) assert s.secondary? s.step - assert_equal s.timeout_at, s.next_time + assert_equal s.timeout_at, s.next_time # 100 + + s.step assert s.limit? end @@ -988,15 +991,9 @@ def ==(obj) msg << "[#{next_elapsed}s elapsed point] #{retry_count}th-Retry(#{s.secondary? ? "SEC" : "PRI"}) is triggered.\n" # Update retry statement - if s.limit? - msg << "--- Reach limit of timeout. ---\n" - break - end - s.step - - if s.limit_step? - msg << "--- Reach limit of max step. ---\n" + if s.limit? + msg << "--- Reach limit. ---\n" break end end From e3f50343bf4d1d5b0a65b0349bcf61a56dea1ac5 Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Thu, 24 Mar 2022 14:33:16 +0900 Subject: [PATCH 7/8] Just reshape codes Signed-off-by: Daijiro Fukuda --- lib/fluent/plugin_helper/retry_state.rb | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/fluent/plugin_helper/retry_state.rb b/lib/fluent/plugin_helper/retry_state.rb index 8816773d95..82e5689af3 100644 --- a/lib/fluent/plugin_helper/retry_state.rb +++ b/lib/fluent/plugin_helper/retry_state.rb @@ -45,7 +45,7 @@ def initialize(title, wait, timeout, forever, max_steps, randomize, randomize_wi @timeout = timeout @timeout_at = @start + timeout @has_reached_timeout = false - @has_timeouted = false + @has_timed_out = false @current = :primary if randomize_width < 0 || randomize_width > 0.5 @@ -128,10 +128,10 @@ def step @next_time = calc_next_time - unless @has_reached_timeout - @has_reached_timeout = @next_time >= @timeout_at + if @has_reached_timeout + @has_timed_out = @next_time >= @timeout_at else - @has_timeouted = @next_time >= @timeout_at + @has_reached_timeout = @next_time >= @timeout_at end nil @@ -145,7 +145,7 @@ def limit? if @forever false else - @has_timeouted || !!(@max_steps && @steps >= @max_steps) + @has_timed_out || !!(@max_steps && @steps >= @max_steps) end end end From a4f15f9fc44ce2fe2de8c8d4d670bac5510342d5 Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Fri, 25 Mar 2022 16:03:26 +0900 Subject: [PATCH 8/8] output: Unify logging retry error Signed-off-by: Takuro Ashie --- lib/fluent/plugin/output.rb | 32 +++++++++++++++----------------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 45290fda64..976ff1002c 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -1278,11 +1278,8 @@ def update_retry_state(chunk_id, using_secondary, error = nil) if @retry.limit? handle_limit_reached(error) - else - if error - log.warn "failed to flush the buffer.", retry_times: @retry.steps, next_retry_time: @retry.next_time.round, chunk: chunk_id_hex, error: error - log.warn_backtrace error.backtrace - end + elsif error + log_retry_error(error, chunk_id_hex, using_secondary) end return @@ -1300,22 +1297,23 @@ def update_retry_state(chunk_id, using_secondary, error = nil) if @retry.limit? handle_limit_reached(error) - else - if error - if using_secondary - msg = "failed to flush the buffer with secondary output." - log.warn msg, retry_times: @retry.steps, next_retry_time: @retry.next_time.round, chunk: chunk_id_hex, error: error - log.warn_backtrace error.backtrace - else - msg = "failed to flush the buffer." - log.warn msg, retry_times: @retry.steps, next_retry_time: @retry.next_time.round, chunk: chunk_id_hex, error: error - log.warn_backtrace error.backtrace - end - end + elsif error + log_retry_error(error, chunk_id_hex, using_secondary) end end end + def log_retry_error(error, chunk_id_hex, using_secondary) + return unless error + if using_secondary + msg = "failed to flush the buffer with secondary output." + else + msg = "failed to flush the buffer." + end + log.warn(msg, retry_times: @retry.steps, next_retry_time: @retry.next_time.round, chunk: chunk_id_hex, error: error) + log.warn_backtrace(error.backtrace) + end + def handle_limit_reached(error) if error records = @buffer.queued_records