Skip to content

Commit

Permalink
fix for timing threading test code issues, about flush timing, retry_…
Browse files Browse the repository at this point in the history
…state & secondary status, or more.
  • Loading branch information
tagomoris committed Jan 30, 2017
1 parent 2809669 commit c4624cf
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 67 deletions.
3 changes: 3 additions & 0 deletions test/plugin/test_compressable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class CompressableTest < Test::Unit::TestCase
test 'write compressed data to IO with output_io option' do
io = StringIO.new
compress(@src, output_io: io)
waiting(10){ sleep 0.1 until @gzipped_src == io.string }
assert_equal @gzipped_src, io.string
end
end
Expand All @@ -35,6 +36,7 @@ class CompressableTest < Test::Unit::TestCase
test 'write decompressed data to IO with output_io option' do
io = StringIO.new
decompress(@gzipped_src, output_io: io)
waiting(10){ sleep 0.1 until @src == io.string }
assert_equal @src, io.string
end

Expand All @@ -56,6 +58,7 @@ class CompressableTest < Test::Unit::TestCase
output_io = StringIO.new

decompress(input_io: input_io, output_io: output_io)
waiting(10){ sleep 0.1 until @src == output_io.string }
assert_equal @src, output_io.string
end

Expand Down
10 changes: 6 additions & 4 deletions test/plugin/test_output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -824,26 +824,28 @@ def invoke_slow_flush_log_threshold_test(i)
test '#write flush took longer time than slow_flush_log_threshold' do
i = create_output(:buffered)
write_called = false
i.register(:write) { |chunk| sleep 1 }
i.register(:write) { |chunk| sleep 3 }
i.define_singleton_method(:test_finished?) { write_called }
i.define_singleton_method(:try_flush) { super(); write_called = true }

invoke_slow_flush_log_threshold_test(i) {
assert write_called
assert_equal 1, i.log.out.logs.select { |line| line =~ /buffer flush took longer time than slow_flush_log_threshold: elapsed_time/ }.size
logs = i.log.out.logs
assert{ logs.any?{|log| log.include?("buffer flush took longer time than slow_flush_log_threshold: elapsed_time") } }
}
end

test '#try_write flush took longer time than slow_flush_log_threshold' do
i = create_output(:delayed)
try_write_called = false
i.register(:try_write){ |chunk| sleep 1 }
i.register(:try_write){ |chunk| sleep 3 }
i.define_singleton_method(:test_finished?) { try_write_called }
i.define_singleton_method(:try_flush) { super(); try_write_called = true }

invoke_slow_flush_log_threshold_test(i) {
assert try_write_called
assert_equal 1, i.log.out.logs.select { |line| line =~ /buffer flush took longer time than slow_flush_log_threshold: elapsed_time/ }.size
logs = i.log.out.logs
assert{ logs.any?{|log| log.include?("buffer flush took longer time than slow_flush_log_threshold: elapsed_time") } }
}
end
end
Expand Down
62 changes: 37 additions & 25 deletions test/plugin/test_output_as_buffered.rb
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,22 @@ def waiting(seconds)
ary.reject!{|e| true }
end
end
end

sub_test_case 'with much longer flush_interval' do
setup do
hash = {
'flush_mode' => 'interval',
'flush_interval' => 3000,
'flush_thread_count' => 1,
'flush_thread_burst_interval' => 0.01,
'chunk_limit_size' => 1024,
}
@i = create_output(:buffered)
@i.configure(config_element('ROOT','',{},[config_element('buffer','',hash)]))
@i.start
@i.after_start
end

test 'flush_at_shutdown work well when plugin is shutdown' do
ary = []
Expand All @@ -658,16 +674,15 @@ def waiting(seconds)
(1024 * 0.9 / event_size).to_i.times do |i|
@i.emit_events("test.tag", Fluent::ArrayEventStream.new([ [t, r] ]))
end
assert{ @i.buffer.queue.size == 0 && ary.size == 0 }
queue_size = @i.buffer.queue.size
assert{ queue_size == 0 && ary.size == 0 }

@i.stop
@i.before_shutdown
@i.shutdown
@i.after_shutdown

waiting(10) do
Thread.pass until ary.size == 1
end
waiting(10){ sleep 0.1 until ary.size == 1 }
assert_equal [tag,t,r].to_json * (1024 * 0.9 / event_size), ary.first
end
end
Expand Down Expand Up @@ -730,11 +745,9 @@ def waiting(seconds)
assert_equal rand_records, es.size
@i.emit_events("test.tag", es)

assert{ @i.buffer.stage.size == 0 && (@i.buffer.queue.size == 1 || @i.buffer.dequeued.size == 1 || ary.size > 0) }

waiting(10) do
Thread.pass until @i.buffer.queue.size == 0 && @i.buffer.dequeued.size == 0
end
waiting(10){ sleep 0.1 until @i.buffer.stage.size == 0 } # make sure that the emitted es is enqueued by "flush_mode immediate"
waiting(10){ sleep 0.1 until @i.buffer.queue.size == 0 && @i.buffer.dequeued.size == 0 }
waiting(10){ sleep 0.1 until ary.size == rand_records }

assert_equal rand_records, ary.size
ary.reject!{|e| true }
Expand Down Expand Up @@ -863,12 +876,12 @@ def waiting(seconds)

@i.enqueue_thread_wait

waiting(4) do
Thread.pass until @i.write_count > 0
end
waiting(4){ sleep 0.1 until @i.write_count > 0 }

assert{ @i.buffer.stage.size == 2 && @i.write_count == 1 }

waiting(4){ sleep 0.1 until ary.size == 3 }

assert_equal 3, ary.size
assert_equal 2, ary.select{|e| e[0] == "test.tag.1" }.size
assert_equal 1, ary.select{|e| e[0] == "test.tag.2" }.size
Expand All @@ -882,9 +895,7 @@ def waiting(seconds)
Timecop.freeze( Time.parse('2016-04-13 14:04:06 +0900') )

@i.enqueue_thread_wait
waiting(4) do
Thread.pass until @i.write_count > 1
end
waiting(4){ sleep 0.1 until @i.write_count > 1 }

assert{ @i.buffer.stage.size == 1 && @i.write_count == 2 }

Expand All @@ -904,7 +915,13 @@ def waiting(seconds)
metachecks = []

@i.register(:format){|tag,time,record| [tag,time,record].to_json + "\n" }
@i.register(:write){|chunk| chunk.read.split("\n").reject{|l| l.empty? }.each{|data| e = JSON.parse(data); ary << e; metachecks << (chunk.metadata.timekey.to_i <= e[1].to_i && e[1].to_i < chunk.metadata.timekey.to_i + 30) } }
@i.register(:write){|chunk|
chunk.read.split("\n").reject{|l| l.empty? }.each{|data|
e = JSON.parse(data)
ary << e
metachecks << (chunk.metadata.timekey.to_i <= e[1].to_i && e[1].to_i < chunk.metadata.timekey.to_i + 30)
}
}

r = {}
(0...10).each do |i|
Expand Down Expand Up @@ -942,9 +959,7 @@ def waiting(seconds)

@i.enqueue_thread_wait

waiting(4) do
Thread.pass until @i.write_count > 0
end
waiting(4){ sleep 0.1 until @i.write_count > 0 }

assert{ @i.buffer.stage.size == 2 && @i.write_count == 1 }

Expand All @@ -957,24 +972,21 @@ def waiting(seconds)
Timecop.freeze( Time.parse('2016-04-13 14:04:06 +0900') )

@i.enqueue_thread_wait
waiting(4) do
Thread.pass until @i.write_count > 1
end
waiting(4){ sleep 0.1 until @i.write_count > 1 }

assert{ @i.buffer.stage.size == 1 && @i.write_count == 2 }

Timecop.freeze( Time.parse('2016-04-13 14:04:13 +0900') )

waiting(4){ sleep 0.1 until ary.size == 9 }
assert_equal 9, ary.size

@i.stop
@i.before_shutdown
@i.shutdown
@i.after_shutdown

waiting(4) do
Thread.pass until @i.write_count > 2
end
waiting(4){ sleep 0.1 until @i.write_count > 2 && ary.size == 11 }

assert_equal 11, ary.size
assert metachecks.all?{|e| e }
Expand Down
2 changes: 1 addition & 1 deletion test/plugin/test_output_as_buffered_compress.rb
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def dummy_event_stream
@i.emit_events('tag', es)
@i.enqueue_thread_wait
@i.flush_thread_wakeup
waiting(4) { Thread.pass until io.size > 0 }
waiting(4) { sleep 0.1 until io.size > 0 }

assert_equal expected, decompress(compressed_data)
assert_equal expected, io.string
Expand Down
12 changes: 6 additions & 6 deletions test/plugin/test_output_as_buffered_retries.rb
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ def get_log_time(msg, logs)

@i.enqueue_thread_wait
@i.flush_thread_wakeup
waiting(4){ Thread.pass until @i.write_count > 0 && @i.num_errors > 0 }
waiting(4){ sleep 0.1 until @i.write_count > 0 && @i.num_errors > 0 }

assert{ @i.buffer.queue.size > 0 }
assert{ @i.buffer.queue.first.metadata.tag == 'test.tag.1' }
Expand All @@ -703,10 +703,10 @@ def get_log_time(msg, logs)
15.times do |i|
now = @i.next_flush_time

Timecop.freeze( now )
Timecop.freeze( now + 1 )
@i.enqueue_thread_wait
@i.flush_thread_wakeup
waiting(4){ Thread.pass until @i.write_count > prev_write_count && @i.num_errors > prev_num_errors }
waiting(4){ sleep 0.1 until @i.write_count > prev_write_count && @i.num_errors > prev_num_errors }

assert{ @i.write_count > prev_write_count }
assert{ @i.num_errors > prev_num_errors }
Expand Down Expand Up @@ -758,7 +758,7 @@ def get_log_time(msg, logs)

@i.enqueue_thread_wait
@i.flush_thread_wakeup
waiting(4){ Thread.pass until @i.write_count > 0 && @i.num_errors > 0 }
waiting(4){ sleep 0.1 until @i.write_count > 0 && @i.num_errors > 0 }

assert{ @i.buffer.queue.size > 0 }
assert{ @i.buffer.queue.first.metadata.tag == 'test.tag.1' }
Expand All @@ -774,10 +774,10 @@ def get_log_time(msg, logs)
15.times do |i|
now = @i.next_flush_time

Timecop.freeze( now )
Timecop.freeze( now + 1 )
@i.enqueue_thread_wait
@i.flush_thread_wakeup
waiting(4){ Thread.pass until @i.write_count > prev_write_count && @i.num_errors > prev_num_errors }
waiting(4){ sleep 0.1 until @i.write_count > prev_write_count && @i.num_errors > prev_num_errors }

assert{ @i.write_count > prev_write_count }
assert{ @i.num_errors > prev_num_errors }
Expand Down
Loading

0 comments on commit c4624cf

Please sign in to comment.