Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve test stability #1426

Merged
merged 19 commits into from
Jan 30, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
7384a03
remove unused comments
tagomoris Jan 30, 2017
14f9973
add example configuration for "heartbeat_type none"
tagomoris Jan 30, 2017
c7e4921
add missing require
tagomoris Jan 30, 2017
6e117cb
use safer random function for security reason
tagomoris Jan 30, 2017
5273c62
add missed Thread#abort_on_exception in event log thread
tagomoris Jan 30, 2017
0131ee5
fix not to cransh in unexpected error from IO#close not to break shut…
tagomoris Jan 30, 2017
ceea10f
fix to run shutdown sequence as complete as possible, especially for …
tagomoris Jan 30, 2017
1a2e920
loose assertion for floating point value errors
tagomoris Jan 30, 2017
8e689e6
make sure to call logger reset per test cases
tagomoris Jan 30, 2017
e2065c6
loose timing problem and use exact exception assertion
tagomoris Jan 30, 2017
fcdaf2c
make assertions readable for all chunk statuses at once
tagomoris Jan 30, 2017
94baf7d
use correct assertion for exception types
tagomoris Jan 30, 2017
c187826
fix bug for waiting status
tagomoris Jan 30, 2017
806310b
emitted events might be disorderd between plugin threads and test cod…
tagomoris Jan 30, 2017
11f0724
loose assertion for unexpected retry in output plugins
tagomoris Jan 30, 2017
71f5235
make sure to emit 2 events in a #emit; add comments for known issues
tagomoris Jan 30, 2017
a46d315
call driver#instance_start for non-plugin-code states; remove warning…
tagomoris Jan 30, 2017
2809669
trying to reduce accidental test failure, and adds comments for known…
tagomoris Jan 30, 2017
c4624cf
fix for timing threading test code issues, about flush timing, retry_…
tagomoris Jan 30, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 0 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,6 @@ matrix:
allow_failures:
- rvm: ruby-head

# no valid version/env for ruby 2.3 right now
# - rvm: 2.3.0
# os: osx
# osx_image: ....

branches:
only:
- master
Expand Down
16 changes: 16 additions & 0 deletions example/out_forward_heartbeat_none.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<source>
@type dummy
tag test
</source>

<match test>
@type forward
heartbeat_type none
<server>
host 127.0.0.1
port 24224
</server>
<buffer>
flush_mode immediate
</buffer>
</match>
1 change: 1 addition & 0 deletions lib/fluent/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ def run
if @log_event_router
$log.enable_event(true)
@log_emit_thread = Thread.new(&method(:log_event_loop))
@log_emit_thread.abort_on_exception = true
end

$log.info "fluentd worker is now running", worker: worker_id
Expand Down
1 change: 1 addition & 0 deletions lib/fluent/plugin/compressable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.
#

require 'stringio'
require 'zlib'

module Fluent
Expand Down
3 changes: 2 additions & 1 deletion lib/fluent/plugin/in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
require 'fluent/msgpack_factory'
require 'yajl'
require 'digest'
require 'securerandom'

module Fluent::Plugin
class ForwardInput < Input
Expand Down Expand Up @@ -394,7 +395,7 @@ def select_authenticate_users(node, username)
end

def generate_salt
OpenSSL::Random.random_bytes(16)
::SecureRandom.random_bytes(16)
end

def generate_helo(nonce, user_auth_salt)
Expand Down
5 changes: 4 additions & 1 deletion lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,10 @@ def start
end

def close
@usock.close if @usock
if @usock
# close socket and ignore errors: this socket will not be used anyway.
@usock.close rescue nil
end
super
end

Expand Down
29 changes: 27 additions & 2 deletions lib/fluent/plugin_helper/event_loop.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,14 @@ def event_loop_wait_until_start
end

def event_loop_wait_until_stop
sleep(0.1) while event_loop_running?
timeout_at = Fluent::Clock.now + EVENT_LOOP_SHUTDOWN_TIMEOUT
sleep(0.1) while event_loop_running? && Fluent::Clock.now < timeout_at
if @_event_loop_running
puts "terminating event_loop forcedly"
caller.each{|bt| puts "\t#{bt}" }
@_event_loop.stop rescue nil
@_event_loop_running = true
end
end

def event_loop_running?
Expand Down Expand Up @@ -85,11 +92,29 @@ def shutdown
@_event_loop_mutex.synchronize do
@_event_loop_attached_watchers.reverse.each do |w|
if w.attached?
w.detach
begin
w.detach
rescue => e
log.warn "unexpected error while detaching event loop watcher", error: e
end
end
end
end

super
end

def after_shutdown
timeout_at = Fluent::Clock.now + EVENT_LOOP_SHUTDOWN_TIMEOUT
@_event_loop_mutex.synchronize do
@_event_loop.watchers.reverse.each do |w|
begin
w.detach
rescue => e
log.warn "unexpected error while detaching event loop watcher", error: e
end
end
end
while @_event_loop_running
if Fluent::Clock.now >= timeout_at
log.warn "event loop does NOT exit until hard timeout."
Expand Down
20 changes: 16 additions & 4 deletions lib/fluent/plugin_helper/thread.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@
# limitations under the License.
#

require 'fluent/clock'

module Fluent
module PluginHelper
module Thread
THREAD_DEFAULT_WAIT_SECONDS = 1
THREAD_SHUTDOWN_HARD_TIMEOUT_IN_TESTS = 100 # second

# stop : mark callback thread as stopped
# shutdown : [-]
Expand All @@ -38,9 +41,18 @@ def thread_wait_until_start
end

def thread_wait_until_stop
until @_threads_mutex.synchronize{ @_threads.values.reduce(true){|r,t| r && ![:_fluentd_plugin_helper_thread_running] } }
timeout_at = Fluent::Clock.now + THREAD_SHUTDOWN_HARD_TIMEOUT_IN_TESTS
until @_threads_mutex.synchronize{ @_threads.values.reduce(true){|r,t| r && !t[:_fluentd_plugin_helper_thread_running] } }
break if Fluent::Clock.now > timeout_at
sleep 0.1
end
@_threads_mutex.synchronize{ @_threads.values }.each do |t|
if t.alive?
puts "going to kill the thread still running: #{t[:_fluentd_plugin_helper_thread_title]}"
t.kill rescue nil
t[:_fluentd_plugin_helper_thread_running] = false
end
end
end

# Ruby 2.2.3 or earlier (and all 2.1.x) cause bug about Threading ("Stack consistency error")
Expand Down Expand Up @@ -70,13 +82,13 @@ def thread_create(title)
thread_exit = true
raise
ensure
if ::Thread.current.alive? && !thread_exit
log.warn "thread doesn't exit correctly (killed or other reason)", plugin: self.class, title: title, thread: ::Thread.current, error: $!
end
@_threads_mutex.synchronize do
@_threads.delete(::Thread.current.object_id)
end
::Thread.current[:_fluentd_plugin_helper_thread_running] = false
if ::Thread.current.alive? && !thread_exit
log.warn "thread doesn't exit correctly (killed or other reason)", plugin: self.class, title: title, thread: ::Thread.current, error: $!
end
end
end
thread.abort_on_exception = true
Expand Down
43 changes: 21 additions & 22 deletions lib/fluent/test/driver/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -138,38 +138,37 @@ def instance_hook_before_stopped
def instance_shutdown
instance_hook_before_stopped

unless @instance.stopped?
@instance.stop rescue nil
end
unless @instance.before_shutdown?
@instance.before_shutdown rescue nil
end
unless @instance.shutdown?
@instance.shutdown rescue nil
show_errors_if_exists = ->(label, block){
begin
block.call
rescue => e
puts "unexpected error while #{label}, #{e.class}:#{e.message}"
e.backtrace.each do |bt|
puts "\t#{bt}"
end
end
}

show_errors_if_exists.call(:stop, ->(){ @instance.stop unless @instance.stopped? })
show_errors_if_exists.call(:before_shutdown, ->(){ @instance.before_shutdown unless @instance.before_shutdown? })
show_errors_if_exists.call(:shutdown, ->(){ @instance.shutdown unless @instance.shutdown? })
show_errors_if_exists.call(:after_shutdown, ->(){ @instance.after_shutdown unless @instance.after_shutdown? })

if @instance.respond_to?(:server_wait_until_stop)
@instance.server_wait_until_stop
end

if @instance.respond_to?(:event_loop_wait_until_stop)
@instance.event_loop_wait_until_stop
end

unless @instance.after_shutdown?
@instance.after_shutdown rescue nil
end
unless @instance.closed?
@instance.close rescue nil
end
show_errors_if_exists.call(:close, ->(){ @instance.close unless @instance.closed? })

if @instance.respond_to?(:thread_wait_until_stop)
@instance.thread_wait_until_stop
end

if @instance.respond_to?(:server_wait_until_stop)
@instance.server_wait_until_stop
end

unless @instance.terminated?
@instance.terminate rescue nil
end
show_errors_if_exists.call(:terminate, ->(){ @instance.terminate unless @instance.terminated? })

if @socket_manager_server
@socket_manager_server.close
Expand All @@ -194,7 +193,7 @@ def run_actual(timeout: DEFAULT_TIMEOUT, &block)

return_value = nil
begin
Timeout.timeout(timeout * 1.1) do |sec|
Timeout.timeout(timeout * 2) do |sec|
return_value = block.call if block_given?
end
rescue Timeout::Error
Expand Down
14 changes: 3 additions & 11 deletions test/plugin/test_buf_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -629,17 +629,9 @@ def write_metadata(path, chunk_id, metadata, size, ctime, mtime)

queue = @p.queue

assert_equal @bufdir_chunk_1, queue[0].unique_id
assert_equal 4, queue[0].size
assert_equal :queued, queue[0].state

assert_equal @bufdir_chunk_2, queue[1].unique_id
assert_equal 4, queue[1].size
assert_equal :queued, queue[1].state

assert_equal @worker_dir_chunk_1, queue[2].unique_id
assert_equal 3, queue[2].size
assert_equal :queued, queue[2].state
assert_equal [@bufdir_chunk_1, @bufdir_chunk_2, @worker_dir_chunk_1].sort, queue.map(&:unique_id).sort
assert_equal [3, 4, 4], queue.map(&:size).sort
assert_equal [:queued, :queued, :queued], queue.map(&:state)
end

test 'worker(id=1) #resume returns staged/queued chunks with metadata, only in worker dir' do
Expand Down
4 changes: 2 additions & 2 deletions test/plugin/test_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ def create_chunk_es(metadata, es)
define_method(:commit){ raise "yay" }
end

assert_raise "yay" do
assert_raise RuntimeError.new("yay") do
@p.write({m => [row]})
end

Expand Down Expand Up @@ -810,7 +810,7 @@ def create_chunk_es(metadata, es)
[event_time('2016-04-11 16:40:04 +0000'), {"message" => "z" * 1024 * 128}],
]
)
assert_raise "yay" do
assert_raise RuntimeError.new("yay") do
@p.write({m => es2}, format: ->(e){e.to_msgpack_stream})
end

Expand Down
12 changes: 6 additions & 6 deletions test/plugin/test_buffer_file_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ def hex_id(id)

test '.generate_stage_chunk_path generates path with staged mark & chunk unique_id' do
assert_equal gen_path("mychunk.b52fde6425d7406bdb19b936e1a1ba98c.log"), @klass.generate_stage_chunk_path(gen_path("mychunk.*.log"), gen_test_chunk_id)
assert_raise "BUG: buffer chunk path on stage MUST have '.*.'" do
assert_raise RuntimeError.new("BUG: buffer chunk path on stage MUST have '.*.'") do
@klass.generate_stage_chunk_path(gen_path("mychunk.log"), gen_test_chunk_id)
end
assert_raise "BUG: buffer chunk path on stage MUST have '.*.'" do
assert_raise RuntimeError.new("BUG: buffer chunk path on stage MUST have '.*.'") do
@klass.generate_stage_chunk_path(gen_path("mychunk.*"), gen_test_chunk_id)
end
assert_raise "BUG: buffer chunk path on stage MUST have '.*.'" do
assert_raise RuntimeError.new("BUG: buffer chunk path on stage MUST have '.*.'") do
@klass.generate_stage_chunk_path(gen_path("*.log"), gen_test_chunk_id)
end
end
Expand Down Expand Up @@ -679,7 +679,7 @@ def gen_chunk_path(prefix, unique_id)
assert_equal @d.bytesize, @c.bytesize
assert_equal @d, @c.read

assert_raise "BUG: appending to non-staged chunk, now 'queued'" do
assert_raise RuntimeError.new("BUG: concatenating to unwritable chunk, now 'queued'") do
@c.append(["queued chunk is read only"])
end
assert_raise IOError do
Expand Down Expand Up @@ -721,7 +721,7 @@ def gen_chunk_path(prefix, unique_id)
assert_equal 0, @c.size
assert_equal @d, @c.read

assert_raise "BUG: appending to non-staged chunk, now 'queued'" do
assert_raise RuntimeError.new("BUG: concatenating to unwritable chunk, now 'queued'") do
@c.append(["queued chunk is read only"])
end
assert_raise IOError do
Expand Down Expand Up @@ -763,7 +763,7 @@ def gen_chunk_path(prefix, unique_id)
assert_equal 0, @c.size
assert_equal @d, @c.read

assert_raise "BUG: appending to non-staged chunk, now 'queued'" do
assert_raise RuntimeError.new("BUG: concatenating to unwritable chunk, now 'queued'") do
@c.append(["queued chunk is read only"])
end
assert_raise IOError do
Expand Down
3 changes: 3 additions & 0 deletions test/plugin/test_compressable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class CompressableTest < Test::Unit::TestCase
test 'write compressed data to IO with output_io option' do
io = StringIO.new
compress(@src, output_io: io)
waiting(10){ sleep 0.1 until @gzipped_src == io.string }
assert_equal @gzipped_src, io.string
end
end
Expand All @@ -35,6 +36,7 @@ class CompressableTest < Test::Unit::TestCase
test 'write decompressed data to IO with output_io option' do
io = StringIO.new
decompress(@gzipped_src, output_io: io)
waiting(10){ sleep 0.1 until @src == io.string }
assert_equal @src, io.string
end

Expand All @@ -56,6 +58,7 @@ class CompressableTest < Test::Unit::TestCase
output_io = StringIO.new

decompress(input_io: input_io, output_io: output_io)
waiting(10){ sleep 0.1 until @src == output_io.string }
assert_equal @src, output_io.string
end

Expand Down
10 changes: 7 additions & 3 deletions test/plugin/test_in_dummy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,12 @@ def create_driver(conf)
d1.instance.emit(4)
end

first_id1 = d1.events.first[2]['id']
events = d1.events.sort{|a,b| a[2]['id'] <=> b[2]['id'] }

first_id1 = events.first[2]['id']
assert_equal 0, first_id1

last_id1 = d1.events.last[2]['id']
last_id1 = events.last[2]['id']
assert { last_id1 > 0 }

assert !File.exist?(File.join(TEST_PLUGIN_STORAGE_PATH, 'json', 'test-01.json'))
Expand All @@ -125,7 +127,9 @@ def create_driver(conf)
d2.instance.emit(4)
end

first_id2 = d2.events.first[2]['id']
events = d2.events.sort{|a,b| a[2]['id'] <=> b[2]['id'] }

first_id2 = events.first[2]['id']
assert_equal 0, first_id2

assert !File.exist?(File.join(TEST_PLUGIN_STORAGE_PATH, 'json', 'test-01.json'))
Expand Down