Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make sleep reliable by measuring how long we actually slept #162

Merged
merged 1 commit into from
Mar 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions lib/fluent/plugin/kinesis_helper/api.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

require 'fluent_plugin_kinesis/version'
require 'fluent/configurable'
require 'benchmark'

module Fluent
module Plugin
Expand Down Expand Up @@ -79,7 +80,7 @@ def split_to_batches(records, &block)
yield(batch, size)
batch = []
size = 0
end
end
batch << record
size += record_size
end
Expand All @@ -96,18 +97,26 @@ def batch_request_with_retry(batch, retry_count=0, backoff: nil, &block)
wait_second = backoff.next
msg = 'Retrying to request batch. Retry count: %3d, Retry records: %3d, Wait seconds %3.2f' % [retry_count+1, failed_records.size, wait_second]
log.warn(truncate msg)
# TODO: sleep() doesn't wait the given seconds sometime.
# The root cause is unknown so far, so I'd like to add debug print only. It should be fixed in the future.
log.debug("#{Thread.current.object_id} sleep start")
sleep(wait_second)
log.debug("#{Thread.current.object_id} sleep finish")
reliable_sleep(wait_second)
batch_request_with_retry(retry_records(failed_records), retry_count+1, backoff: backoff, &block)
else
give_up_retries(failed_records)
end
end
end

# Sleep seems to not sleep as long as we ask it, our guess is that something wakes up the thread,
# so we keep on going to sleep if that happens.
# TODO: find out who is causing the sleep to be too short and try to make them stop it instead
def reliable_sleep(wait_second)
loop do
actual = Benchmark.realtime { sleep(wait_second) }
break if actual >= wait_second
log.error("#{Thread.current.object_id} sleep failed expected #{wait_second} but slept #{actual}")
wait_second -= actual
end
end

def any_records_shipped?(res)
results(res).size > failed_count(res)
end
Expand Down
11 changes: 11 additions & 0 deletions test/kinesis_helper/test_api.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

require_relative '../helper'
require 'fluent/plugin/kinesis_helper/api'
require 'benchmark'

class KinesisHelperAPITest < Test::Unit::TestCase
class Mock
Expand Down Expand Up @@ -114,6 +115,16 @@ def test_batch_request_with_retry(data)
assert_equal expected, @object.request_series
end

def test_reliable_sleep
time = Benchmark.realtime do
t = Thread.new { @object.send(:reliable_sleep, 0.2) }
sleep 0.1
t.run
t.join
end
assert_operator time, :>, 0.15
end

data(
'reset_everytime' => [true, [4,3,2,1], 3],
'disable_reset' => [false, [4,3,2,1], 0],
Expand Down