Skip to content

Commit

Permalink
make sleep reliable by measuring how long we actually slept
Browse files Browse the repository at this point in the history
  • Loading branch information
grosser committed Sep 11, 2018
1 parent c947627 commit f027e5b
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 6 deletions.
21 changes: 15 additions & 6 deletions lib/fluent/plugin/kinesis_helper/api.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

require 'fluent_plugin_kinesis/version'
require 'fluent/configurable'
require 'benchmark'

module Fluent
module Plugin
Expand Down Expand Up @@ -79,7 +80,7 @@ def split_to_batches(records, &block)
yield(batch, size)
batch = []
size = 0
end
end
batch << record
size += record_size
end
Expand All @@ -96,18 +97,26 @@ def batch_request_with_retry(batch, retry_count=0, backoff: nil, &block)
wait_second = backoff.next
msg = 'Retrying to request batch. Retry count: %3d, Retry records: %3d, Wait seconds %3.2f' % [retry_count+1, failed_records.size, wait_second]
log.warn(truncate msg)
# TODO: sleep() doesn't wait the given seconds sometime.
# The root cause is unknown so far, so I'd like to add debug print only. It should be fixed in the future.
log.debug("#{Thread.current.object_id} sleep start")
sleep(wait_second)
log.debug("#{Thread.current.object_id} sleep finish")
reliable_sleep(wait_second)
batch_request_with_retry(retry_records(failed_records), retry_count+1, backoff: backoff, &block)
else
give_up_retries(failed_records)
end
end
end

# Sleep seems to not sleep as long as we ask it, our guess is that something wakes up the thread,
# so we keep on going to sleep if that happens.
# TODO: find out who is causing the sleep to be too short and try to make them stop it instead
def reliable_sleep(wait_second)
loop do
actual = Benchmark.realtime { sleep(wait_second) }
break if actual >= wait_second
log.error("#{Thread.current.object_id} sleep failed expected #{wait_second} but slept #{actual}")
wait_second -= actual
end
end

def any_records_shipped?(res)
results(res).size > failed_count(res)
end
Expand Down
11 changes: 11 additions & 0 deletions test/kinesis_helper/test_api.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

require_relative '../helper'
require 'fluent/plugin/kinesis_helper/api'
require 'benchmark'

class KinesisHelperAPITest < Test::Unit::TestCase
class Mock
Expand Down Expand Up @@ -114,6 +115,16 @@ def test_batch_request_with_retry(data)
assert_equal expected, @object.request_series
end

def test_reliable_sleep
time = Benchmark.realtime do
t = Thread.new { @object.send(:reliable_sleep, 0.2) }
sleep 0.1
t.run
t.join
end
assert_operator time, :>, 0.15
end

data(
'reset_everytime' => [true, [4,3,2,1], 3],
'disable_reset' => [false, [4,3,2,1], 0],
Expand Down

0 comments on commit f027e5b

Please sign in to comment.