Skip to content

Commit

Permalink
Merge pull request #1426 from fluent/improve-test-stability-201701
Browse files Browse the repository at this point in the history
Improve test stability
  • Loading branch information
tagomoris committed Jan 30, 2017
2 parents 1adaa0f + c4624cf commit d7182c5
Show file tree
Hide file tree
Showing 29 changed files with 375 additions and 232 deletions.
5 changes: 0 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,6 @@ matrix:
allow_failures:
- rvm: ruby-head

# no valid version/env for ruby 2.3 right now
# - rvm: 2.3.0
# os: osx
# osx_image: ....

branches:
only:
- master
Expand Down
16 changes: 16 additions & 0 deletions example/out_forward_heartbeat_none.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<source>
@type dummy
tag test
</source>

<match test>
@type forward
heartbeat_type none
<server>
host 127.0.0.1
port 24224
</server>
<buffer>
flush_mode immediate
</buffer>
</match>
1 change: 1 addition & 0 deletions lib/fluent/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ def run
if @log_event_router
$log.enable_event(true)
@log_emit_thread = Thread.new(&method(:log_event_loop))
@log_emit_thread.abort_on_exception = true
end

$log.info "fluentd worker is now running", worker: worker_id
Expand Down
1 change: 1 addition & 0 deletions lib/fluent/plugin/compressable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.
#

require 'stringio'
require 'zlib'

module Fluent
Expand Down
3 changes: 2 additions & 1 deletion lib/fluent/plugin/in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
require 'fluent/msgpack_factory'
require 'yajl'
require 'digest'
require 'securerandom'

module Fluent::Plugin
class ForwardInput < Input
Expand Down Expand Up @@ -394,7 +395,7 @@ def select_authenticate_users(node, username)
end

def generate_salt
OpenSSL::Random.random_bytes(16)
::SecureRandom.random_bytes(16)
end

def generate_helo(nonce, user_auth_salt)
Expand Down
5 changes: 4 additions & 1 deletion lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,10 @@ def start
end

def close
@usock.close if @usock
if @usock
# close socket and ignore errors: this socket will not be used anyway.
@usock.close rescue nil
end
super
end

Expand Down
29 changes: 27 additions & 2 deletions lib/fluent/plugin_helper/event_loop.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,14 @@ def event_loop_wait_until_start
end

def event_loop_wait_until_stop
sleep(0.1) while event_loop_running?
timeout_at = Fluent::Clock.now + EVENT_LOOP_SHUTDOWN_TIMEOUT
sleep(0.1) while event_loop_running? && Fluent::Clock.now < timeout_at
if @_event_loop_running
puts "terminating event_loop forcedly"
caller.each{|bt| puts "\t#{bt}" }
@_event_loop.stop rescue nil
@_event_loop_running = true
end
end

def event_loop_running?
Expand Down Expand Up @@ -85,11 +92,29 @@ def shutdown
@_event_loop_mutex.synchronize do
@_event_loop_attached_watchers.reverse.each do |w|
if w.attached?
w.detach
begin
w.detach
rescue => e
log.warn "unexpected error while detaching event loop watcher", error: e
end
end
end
end

super
end

def after_shutdown
timeout_at = Fluent::Clock.now + EVENT_LOOP_SHUTDOWN_TIMEOUT
@_event_loop_mutex.synchronize do
@_event_loop.watchers.reverse.each do |w|
begin
w.detach
rescue => e
log.warn "unexpected error while detaching event loop watcher", error: e
end
end
end
while @_event_loop_running
if Fluent::Clock.now >= timeout_at
log.warn "event loop does NOT exit until hard timeout."
Expand Down
20 changes: 16 additions & 4 deletions lib/fluent/plugin_helper/thread.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@
# limitations under the License.
#

require 'fluent/clock'

module Fluent
module PluginHelper
module Thread
THREAD_DEFAULT_WAIT_SECONDS = 1
THREAD_SHUTDOWN_HARD_TIMEOUT_IN_TESTS = 100 # second

# stop : mark callback thread as stopped
# shutdown : [-]
Expand All @@ -38,9 +41,18 @@ def thread_wait_until_start
end

def thread_wait_until_stop
until @_threads_mutex.synchronize{ @_threads.values.reduce(true){|r,t| r && ![:_fluentd_plugin_helper_thread_running] } }
timeout_at = Fluent::Clock.now + THREAD_SHUTDOWN_HARD_TIMEOUT_IN_TESTS
until @_threads_mutex.synchronize{ @_threads.values.reduce(true){|r,t| r && !t[:_fluentd_plugin_helper_thread_running] } }
break if Fluent::Clock.now > timeout_at
sleep 0.1
end
@_threads_mutex.synchronize{ @_threads.values }.each do |t|
if t.alive?
puts "going to kill the thread still running: #{t[:_fluentd_plugin_helper_thread_title]}"
t.kill rescue nil
t[:_fluentd_plugin_helper_thread_running] = false
end
end
end

# Ruby 2.2.3 or earlier (and all 2.1.x) cause bug about Threading ("Stack consistency error")
Expand Down Expand Up @@ -70,13 +82,13 @@ def thread_create(title)
thread_exit = true
raise
ensure
if ::Thread.current.alive? && !thread_exit
log.warn "thread doesn't exit correctly (killed or other reason)", plugin: self.class, title: title, thread: ::Thread.current, error: $!
end
@_threads_mutex.synchronize do
@_threads.delete(::Thread.current.object_id)
end
::Thread.current[:_fluentd_plugin_helper_thread_running] = false
if ::Thread.current.alive? && !thread_exit
log.warn "thread doesn't exit correctly (killed or other reason)", plugin: self.class, title: title, thread: ::Thread.current, error: $!
end
end
end
thread.abort_on_exception = true
Expand Down
43 changes: 21 additions & 22 deletions lib/fluent/test/driver/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -138,38 +138,37 @@ def instance_hook_before_stopped
def instance_shutdown
instance_hook_before_stopped

unless @instance.stopped?
@instance.stop rescue nil
end
unless @instance.before_shutdown?
@instance.before_shutdown rescue nil
end
unless @instance.shutdown?
@instance.shutdown rescue nil
show_errors_if_exists = ->(label, block){
begin
block.call
rescue => e
puts "unexpected error while #{label}, #{e.class}:#{e.message}"
e.backtrace.each do |bt|
puts "\t#{bt}"
end
end
}

show_errors_if_exists.call(:stop, ->(){ @instance.stop unless @instance.stopped? })
show_errors_if_exists.call(:before_shutdown, ->(){ @instance.before_shutdown unless @instance.before_shutdown? })
show_errors_if_exists.call(:shutdown, ->(){ @instance.shutdown unless @instance.shutdown? })
show_errors_if_exists.call(:after_shutdown, ->(){ @instance.after_shutdown unless @instance.after_shutdown? })

if @instance.respond_to?(:server_wait_until_stop)
@instance.server_wait_until_stop
end

if @instance.respond_to?(:event_loop_wait_until_stop)
@instance.event_loop_wait_until_stop
end

unless @instance.after_shutdown?
@instance.after_shutdown rescue nil
end
unless @instance.closed?
@instance.close rescue nil
end
show_errors_if_exists.call(:close, ->(){ @instance.close unless @instance.closed? })

if @instance.respond_to?(:thread_wait_until_stop)
@instance.thread_wait_until_stop
end

if @instance.respond_to?(:server_wait_until_stop)
@instance.server_wait_until_stop
end

unless @instance.terminated?
@instance.terminate rescue nil
end
show_errors_if_exists.call(:terminate, ->(){ @instance.terminate unless @instance.terminated? })

if @socket_manager_server
@socket_manager_server.close
Expand All @@ -194,7 +193,7 @@ def run_actual(timeout: DEFAULT_TIMEOUT, &block)

return_value = nil
begin
Timeout.timeout(timeout * 1.1) do |sec|
Timeout.timeout(timeout * 2) do |sec|
return_value = block.call if block_given?
end
rescue Timeout::Error
Expand Down
14 changes: 3 additions & 11 deletions test/plugin/test_buf_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -629,17 +629,9 @@ def write_metadata(path, chunk_id, metadata, size, ctime, mtime)

queue = @p.queue

assert_equal @bufdir_chunk_1, queue[0].unique_id
assert_equal 4, queue[0].size
assert_equal :queued, queue[0].state

assert_equal @bufdir_chunk_2, queue[1].unique_id
assert_equal 4, queue[1].size
assert_equal :queued, queue[1].state

assert_equal @worker_dir_chunk_1, queue[2].unique_id
assert_equal 3, queue[2].size
assert_equal :queued, queue[2].state
assert_equal [@bufdir_chunk_1, @bufdir_chunk_2, @worker_dir_chunk_1].sort, queue.map(&:unique_id).sort
assert_equal [3, 4, 4], queue.map(&:size).sort
assert_equal [:queued, :queued, :queued], queue.map(&:state)
end

test 'worker(id=1) #resume returns staged/queued chunks with metadata, only in worker dir' do
Expand Down
4 changes: 2 additions & 2 deletions test/plugin/test_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ def create_chunk_es(metadata, es)
define_method(:commit){ raise "yay" }
end

assert_raise "yay" do
assert_raise RuntimeError.new("yay") do
@p.write({m => [row]})
end

Expand Down Expand Up @@ -810,7 +810,7 @@ def create_chunk_es(metadata, es)
[event_time('2016-04-11 16:40:04 +0000'), {"message" => "z" * 1024 * 128}],
]
)
assert_raise "yay" do
assert_raise RuntimeError.new("yay") do
@p.write({m => es2}, format: ->(e){e.to_msgpack_stream})
end

Expand Down
12 changes: 6 additions & 6 deletions test/plugin/test_buffer_file_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ def hex_id(id)

test '.generate_stage_chunk_path generates path with staged mark & chunk unique_id' do
assert_equal gen_path("mychunk.b52fde6425d7406bdb19b936e1a1ba98c.log"), @klass.generate_stage_chunk_path(gen_path("mychunk.*.log"), gen_test_chunk_id)
assert_raise "BUG: buffer chunk path on stage MUST have '.*.'" do
assert_raise RuntimeError.new("BUG: buffer chunk path on stage MUST have '.*.'") do
@klass.generate_stage_chunk_path(gen_path("mychunk.log"), gen_test_chunk_id)
end
assert_raise "BUG: buffer chunk path on stage MUST have '.*.'" do
assert_raise RuntimeError.new("BUG: buffer chunk path on stage MUST have '.*.'") do
@klass.generate_stage_chunk_path(gen_path("mychunk.*"), gen_test_chunk_id)
end
assert_raise "BUG: buffer chunk path on stage MUST have '.*.'" do
assert_raise RuntimeError.new("BUG: buffer chunk path on stage MUST have '.*.'") do
@klass.generate_stage_chunk_path(gen_path("*.log"), gen_test_chunk_id)
end
end
Expand Down Expand Up @@ -679,7 +679,7 @@ def gen_chunk_path(prefix, unique_id)
assert_equal @d.bytesize, @c.bytesize
assert_equal @d, @c.read

assert_raise "BUG: appending to non-staged chunk, now 'queued'" do
assert_raise RuntimeError.new("BUG: concatenating to unwritable chunk, now 'queued'") do
@c.append(["queued chunk is read only"])
end
assert_raise IOError do
Expand Down Expand Up @@ -721,7 +721,7 @@ def gen_chunk_path(prefix, unique_id)
assert_equal 0, @c.size
assert_equal @d, @c.read

assert_raise "BUG: appending to non-staged chunk, now 'queued'" do
assert_raise RuntimeError.new("BUG: concatenating to unwritable chunk, now 'queued'") do
@c.append(["queued chunk is read only"])
end
assert_raise IOError do
Expand Down Expand Up @@ -763,7 +763,7 @@ def gen_chunk_path(prefix, unique_id)
assert_equal 0, @c.size
assert_equal @d, @c.read

assert_raise "BUG: appending to non-staged chunk, now 'queued'" do
assert_raise RuntimeError.new("BUG: concatenating to unwritable chunk, now 'queued'") do
@c.append(["queued chunk is read only"])
end
assert_raise IOError do
Expand Down
3 changes: 3 additions & 0 deletions test/plugin/test_compressable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class CompressableTest < Test::Unit::TestCase
test 'write compressed data to IO with output_io option' do
io = StringIO.new
compress(@src, output_io: io)
waiting(10){ sleep 0.1 until @gzipped_src == io.string }
assert_equal @gzipped_src, io.string
end
end
Expand All @@ -35,6 +36,7 @@ class CompressableTest < Test::Unit::TestCase
test 'write decompressed data to IO with output_io option' do
io = StringIO.new
decompress(@gzipped_src, output_io: io)
waiting(10){ sleep 0.1 until @src == io.string }
assert_equal @src, io.string
end

Expand All @@ -56,6 +58,7 @@ class CompressableTest < Test::Unit::TestCase
output_io = StringIO.new

decompress(input_io: input_io, output_io: output_io)
waiting(10){ sleep 0.1 until @src == output_io.string }
assert_equal @src, output_io.string
end

Expand Down
10 changes: 7 additions & 3 deletions test/plugin/test_in_dummy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,12 @@ def create_driver(conf)
d1.instance.emit(4)
end

first_id1 = d1.events.first[2]['id']
events = d1.events.sort{|a,b| a[2]['id'] <=> b[2]['id'] }

first_id1 = events.first[2]['id']
assert_equal 0, first_id1

last_id1 = d1.events.last[2]['id']
last_id1 = events.last[2]['id']
assert { last_id1 > 0 }

assert !File.exist?(File.join(TEST_PLUGIN_STORAGE_PATH, 'json', 'test-01.json'))
Expand All @@ -125,7 +127,9 @@ def create_driver(conf)
d2.instance.emit(4)
end

first_id2 = d2.events.first[2]['id']
events = d2.events.sort{|a,b| a[2]['id'] <=> b[2]['id'] }

first_id2 = events.first[2]['id']
assert_equal 0, first_id2

assert !File.exist?(File.join(TEST_PLUGIN_STORAGE_PATH, 'json', 'test-01.json'))
Expand Down

0 comments on commit d7182c5

Please sign in to comment.