diff --git a/.travis.yml b/.travis.yml
index 6f628ff8a4..a8c0ce3be4 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -35,11 +35,6 @@ matrix:
allow_failures:
- rvm: ruby-head
-# no valid version/env for ruby 2.3 right now
-# - rvm: 2.3.0
-# os: osx
-# osx_image: ....
-
branches:
only:
- master
diff --git a/example/out_forward_heartbeat_none.conf b/example/out_forward_heartbeat_none.conf
new file mode 100644
index 0000000000..8fa0dbec1b
--- /dev/null
+++ b/example/out_forward_heartbeat_none.conf
@@ -0,0 +1,16 @@
+
+
+
+ @type forward
+ heartbeat_type none
+
+ host 127.0.0.1
+ port 24224
+
+
+ flush_mode immediate
+
+
diff --git a/lib/fluent/engine.rb b/lib/fluent/engine.rb
index eb343b3258..ea05954ab4 100644
--- a/lib/fluent/engine.rb
+++ b/lib/fluent/engine.rb
@@ -211,6 +211,7 @@ def run
if @log_event_router
$log.enable_event(true)
@log_emit_thread = Thread.new(&method(:log_event_loop))
+ @log_emit_thread.abort_on_exception = true
end
$log.info "fluentd worker is now running", worker: worker_id
diff --git a/lib/fluent/plugin/compressable.rb b/lib/fluent/plugin/compressable.rb
index e508be1e3c..2a4824a28a 100644
--- a/lib/fluent/plugin/compressable.rb
+++ b/lib/fluent/plugin/compressable.rb
@@ -14,6 +14,7 @@
# limitations under the License.
#
+require 'stringio'
require 'zlib'
module Fluent
diff --git a/lib/fluent/plugin/in_forward.rb b/lib/fluent/plugin/in_forward.rb
index 54e7da80fb..b6786608c7 100644
--- a/lib/fluent/plugin/in_forward.rb
+++ b/lib/fluent/plugin/in_forward.rb
@@ -19,6 +19,7 @@
require 'fluent/msgpack_factory'
require 'yajl'
require 'digest'
+require 'securerandom'
module Fluent::Plugin
class ForwardInput < Input
@@ -394,7 +395,7 @@ def select_authenticate_users(node, username)
end
def generate_salt
- OpenSSL::Random.random_bytes(16)
+ ::SecureRandom.random_bytes(16)
end
def generate_helo(nonce, user_auth_salt)
diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb
index 8886c7f131..848c8f2cc2 100644
--- a/lib/fluent/plugin/out_forward.rb
+++ b/lib/fluent/plugin/out_forward.rb
@@ -218,7 +218,10 @@ def start
end
def close
- @usock.close if @usock
+ if @usock
+ # close socket and ignore errors: this socket will not be used anyway.
+ @usock.close rescue nil
+ end
super
end
diff --git a/lib/fluent/plugin_helper/event_loop.rb b/lib/fluent/plugin_helper/event_loop.rb
index 362f1be180..52e70e648d 100644
--- a/lib/fluent/plugin_helper/event_loop.rb
+++ b/lib/fluent/plugin_helper/event_loop.rb
@@ -48,7 +48,14 @@ def event_loop_wait_until_start
end
def event_loop_wait_until_stop
- sleep(0.1) while event_loop_running?
+ timeout_at = Fluent::Clock.now + EVENT_LOOP_SHUTDOWN_TIMEOUT
+ sleep(0.1) while event_loop_running? && Fluent::Clock.now < timeout_at
+ if @_event_loop_running
+ puts "terminating event_loop forcedly"
+ caller.each{|bt| puts "\t#{bt}" }
+ @_event_loop.stop rescue nil
+ @_event_loop_running = true
+ end
end
def event_loop_running?
@@ -85,11 +92,29 @@ def shutdown
@_event_loop_mutex.synchronize do
@_event_loop_attached_watchers.reverse.each do |w|
if w.attached?
- w.detach
+ begin
+ w.detach
+ rescue => e
+ log.warn "unexpected error while detaching event loop watcher", error: e
+ end
end
end
end
+
+ super
+ end
+
+ def after_shutdown
timeout_at = Fluent::Clock.now + EVENT_LOOP_SHUTDOWN_TIMEOUT
+ @_event_loop_mutex.synchronize do
+ @_event_loop.watchers.reverse.each do |w|
+ begin
+ w.detach
+ rescue => e
+ log.warn "unexpected error while detaching event loop watcher", error: e
+ end
+ end
+ end
while @_event_loop_running
if Fluent::Clock.now >= timeout_at
log.warn "event loop does NOT exit until hard timeout."
diff --git a/lib/fluent/plugin_helper/thread.rb b/lib/fluent/plugin_helper/thread.rb
index c02f645e2b..7cc430373e 100644
--- a/lib/fluent/plugin_helper/thread.rb
+++ b/lib/fluent/plugin_helper/thread.rb
@@ -14,10 +14,13 @@
# limitations under the License.
#
+require 'fluent/clock'
+
module Fluent
module PluginHelper
module Thread
THREAD_DEFAULT_WAIT_SECONDS = 1
+ THREAD_SHUTDOWN_HARD_TIMEOUT_IN_TESTS = 100 # second
# stop : mark callback thread as stopped
# shutdown : [-]
@@ -38,9 +41,18 @@ def thread_wait_until_start
end
def thread_wait_until_stop
- until @_threads_mutex.synchronize{ @_threads.values.reduce(true){|r,t| r && ![:_fluentd_plugin_helper_thread_running] } }
+ timeout_at = Fluent::Clock.now + THREAD_SHUTDOWN_HARD_TIMEOUT_IN_TESTS
+ until @_threads_mutex.synchronize{ @_threads.values.reduce(true){|r,t| r && !t[:_fluentd_plugin_helper_thread_running] } }
+ break if Fluent::Clock.now > timeout_at
sleep 0.1
end
+ @_threads_mutex.synchronize{ @_threads.values }.each do |t|
+ if t.alive?
+ puts "going to kill the thread still running: #{t[:_fluentd_plugin_helper_thread_title]}"
+ t.kill rescue nil
+ t[:_fluentd_plugin_helper_thread_running] = false
+ end
+ end
end
# Ruby 2.2.3 or earlier (and all 2.1.x) cause bug about Threading ("Stack consistency error")
@@ -70,13 +82,13 @@ def thread_create(title)
thread_exit = true
raise
ensure
- if ::Thread.current.alive? && !thread_exit
- log.warn "thread doesn't exit correctly (killed or other reason)", plugin: self.class, title: title, thread: ::Thread.current, error: $!
- end
@_threads_mutex.synchronize do
@_threads.delete(::Thread.current.object_id)
end
::Thread.current[:_fluentd_plugin_helper_thread_running] = false
+ if ::Thread.current.alive? && !thread_exit
+ log.warn "thread doesn't exit correctly (killed or other reason)", plugin: self.class, title: title, thread: ::Thread.current, error: $!
+ end
end
end
thread.abort_on_exception = true
diff --git a/lib/fluent/test/driver/base.rb b/lib/fluent/test/driver/base.rb
index bc61405c8e..6f5aa18e31 100644
--- a/lib/fluent/test/driver/base.rb
+++ b/lib/fluent/test/driver/base.rb
@@ -138,38 +138,37 @@ def instance_hook_before_stopped
def instance_shutdown
instance_hook_before_stopped
- unless @instance.stopped?
- @instance.stop rescue nil
- end
- unless @instance.before_shutdown?
- @instance.before_shutdown rescue nil
- end
- unless @instance.shutdown?
- @instance.shutdown rescue nil
+ show_errors_if_exists = ->(label, block){
+ begin
+ block.call
+ rescue => e
+ puts "unexpected error while #{label}, #{e.class}:#{e.message}"
+ e.backtrace.each do |bt|
+ puts "\t#{bt}"
+ end
+ end
+ }
+
+ show_errors_if_exists.call(:stop, ->(){ @instance.stop unless @instance.stopped? })
+ show_errors_if_exists.call(:before_shutdown, ->(){ @instance.before_shutdown unless @instance.before_shutdown? })
+ show_errors_if_exists.call(:shutdown, ->(){ @instance.shutdown unless @instance.shutdown? })
+ show_errors_if_exists.call(:after_shutdown, ->(){ @instance.after_shutdown unless @instance.after_shutdown? })
+
+ if @instance.respond_to?(:server_wait_until_stop)
+ @instance.server_wait_until_stop
end
if @instance.respond_to?(:event_loop_wait_until_stop)
@instance.event_loop_wait_until_stop
end
- unless @instance.after_shutdown?
- @instance.after_shutdown rescue nil
- end
- unless @instance.closed?
- @instance.close rescue nil
- end
+ show_errors_if_exists.call(:close, ->(){ @instance.close unless @instance.closed? })
if @instance.respond_to?(:thread_wait_until_stop)
@instance.thread_wait_until_stop
end
- if @instance.respond_to?(:server_wait_until_stop)
- @instance.server_wait_until_stop
- end
-
- unless @instance.terminated?
- @instance.terminate rescue nil
- end
+ show_errors_if_exists.call(:terminate, ->(){ @instance.terminate unless @instance.terminated? })
if @socket_manager_server
@socket_manager_server.close
@@ -194,7 +193,7 @@ def run_actual(timeout: DEFAULT_TIMEOUT, &block)
return_value = nil
begin
- Timeout.timeout(timeout * 1.1) do |sec|
+ Timeout.timeout(timeout * 2) do |sec|
return_value = block.call if block_given?
end
rescue Timeout::Error
diff --git a/test/plugin/test_buf_file.rb b/test/plugin/test_buf_file.rb
index 96974c2b4d..9d29ed0d66 100644
--- a/test/plugin/test_buf_file.rb
+++ b/test/plugin/test_buf_file.rb
@@ -629,17 +629,9 @@ def write_metadata(path, chunk_id, metadata, size, ctime, mtime)
queue = @p.queue
- assert_equal @bufdir_chunk_1, queue[0].unique_id
- assert_equal 4, queue[0].size
- assert_equal :queued, queue[0].state
-
- assert_equal @bufdir_chunk_2, queue[1].unique_id
- assert_equal 4, queue[1].size
- assert_equal :queued, queue[1].state
-
- assert_equal @worker_dir_chunk_1, queue[2].unique_id
- assert_equal 3, queue[2].size
- assert_equal :queued, queue[2].state
+ assert_equal [@bufdir_chunk_1, @bufdir_chunk_2, @worker_dir_chunk_1].sort, queue.map(&:unique_id).sort
+ assert_equal [3, 4, 4], queue.map(&:size).sort
+ assert_equal [:queued, :queued, :queued], queue.map(&:state)
end
test 'worker(id=1) #resume returns staged/queued chunks with metadata, only in worker dir' do
diff --git a/test/plugin/test_buffer.rb b/test/plugin/test_buffer.rb
index 15dfa5c470..431cb370e6 100644
--- a/test/plugin/test_buffer.rb
+++ b/test/plugin/test_buffer.rb
@@ -631,7 +631,7 @@ def create_chunk_es(metadata, es)
define_method(:commit){ raise "yay" }
end
- assert_raise "yay" do
+ assert_raise RuntimeError.new("yay") do
@p.write({m => [row]})
end
@@ -810,7 +810,7 @@ def create_chunk_es(metadata, es)
[event_time('2016-04-11 16:40:04 +0000'), {"message" => "z" * 1024 * 128}],
]
)
- assert_raise "yay" do
+ assert_raise RuntimeError.new("yay") do
@p.write({m => es2}, format: ->(e){e.to_msgpack_stream})
end
diff --git a/test/plugin/test_buffer_file_chunk.rb b/test/plugin/test_buffer_file_chunk.rb
index c1d059c72a..686166f451 100644
--- a/test/plugin/test_buffer_file_chunk.rb
+++ b/test/plugin/test_buffer_file_chunk.rb
@@ -63,13 +63,13 @@ def hex_id(id)
test '.generate_stage_chunk_path generates path with staged mark & chunk unique_id' do
assert_equal gen_path("mychunk.b52fde6425d7406bdb19b936e1a1ba98c.log"), @klass.generate_stage_chunk_path(gen_path("mychunk.*.log"), gen_test_chunk_id)
- assert_raise "BUG: buffer chunk path on stage MUST have '.*.'" do
+ assert_raise RuntimeError.new("BUG: buffer chunk path on stage MUST have '.*.'") do
@klass.generate_stage_chunk_path(gen_path("mychunk.log"), gen_test_chunk_id)
end
- assert_raise "BUG: buffer chunk path on stage MUST have '.*.'" do
+ assert_raise RuntimeError.new("BUG: buffer chunk path on stage MUST have '.*.'") do
@klass.generate_stage_chunk_path(gen_path("mychunk.*"), gen_test_chunk_id)
end
- assert_raise "BUG: buffer chunk path on stage MUST have '.*.'" do
+ assert_raise RuntimeError.new("BUG: buffer chunk path on stage MUST have '.*.'") do
@klass.generate_stage_chunk_path(gen_path("*.log"), gen_test_chunk_id)
end
end
@@ -679,7 +679,7 @@ def gen_chunk_path(prefix, unique_id)
assert_equal @d.bytesize, @c.bytesize
assert_equal @d, @c.read
- assert_raise "BUG: appending to non-staged chunk, now 'queued'" do
+ assert_raise RuntimeError.new("BUG: concatenating to unwritable chunk, now 'queued'") do
@c.append(["queued chunk is read only"])
end
assert_raise IOError do
@@ -721,7 +721,7 @@ def gen_chunk_path(prefix, unique_id)
assert_equal 0, @c.size
assert_equal @d, @c.read
- assert_raise "BUG: appending to non-staged chunk, now 'queued'" do
+ assert_raise RuntimeError.new("BUG: concatenating to unwritable chunk, now 'queued'") do
@c.append(["queued chunk is read only"])
end
assert_raise IOError do
@@ -763,7 +763,7 @@ def gen_chunk_path(prefix, unique_id)
assert_equal 0, @c.size
assert_equal @d, @c.read
- assert_raise "BUG: appending to non-staged chunk, now 'queued'" do
+ assert_raise RuntimeError.new("BUG: concatenating to unwritable chunk, now 'queued'") do
@c.append(["queued chunk is read only"])
end
assert_raise IOError do
diff --git a/test/plugin/test_compressable.rb b/test/plugin/test_compressable.rb
index 763d200ebd..c7d685cafb 100644
--- a/test/plugin/test_compressable.rb
+++ b/test/plugin/test_compressable.rb
@@ -18,6 +18,7 @@ class CompressableTest < Test::Unit::TestCase
test 'write compressed data to IO with output_io option' do
io = StringIO.new
compress(@src, output_io: io)
+ waiting(10){ sleep 0.1 until @gzipped_src == io.string }
assert_equal @gzipped_src, io.string
end
end
@@ -35,6 +36,7 @@ class CompressableTest < Test::Unit::TestCase
test 'write decompressed data to IO with output_io option' do
io = StringIO.new
decompress(@gzipped_src, output_io: io)
+ waiting(10){ sleep 0.1 until @src == io.string }
assert_equal @src, io.string
end
@@ -56,6 +58,7 @@ class CompressableTest < Test::Unit::TestCase
output_io = StringIO.new
decompress(input_io: input_io, output_io: output_io)
+ waiting(10){ sleep 0.1 until @src == output_io.string }
assert_equal @src, output_io.string
end
diff --git a/test/plugin/test_in_dummy.rb b/test/plugin/test_in_dummy.rb
index fc070970c7..6d999cf5b1 100644
--- a/test/plugin/test_in_dummy.rb
+++ b/test/plugin/test_in_dummy.rb
@@ -112,10 +112,12 @@ def create_driver(conf)
d1.instance.emit(4)
end
- first_id1 = d1.events.first[2]['id']
+ events = d1.events.sort{|a,b| a[2]['id'] <=> b[2]['id'] }
+
+ first_id1 = events.first[2]['id']
assert_equal 0, first_id1
- last_id1 = d1.events.last[2]['id']
+ last_id1 = events.last[2]['id']
assert { last_id1 > 0 }
assert !File.exist?(File.join(TEST_PLUGIN_STORAGE_PATH, 'json', 'test-01.json'))
@@ -125,7 +127,9 @@ def create_driver(conf)
d2.instance.emit(4)
end
- first_id2 = d2.events.first[2]['id']
+ events = d2.events.sort{|a,b| a[2]['id'] <=> b[2]['id'] }
+
+ first_id2 = events.first[2]['id']
assert_equal 0, first_id2
assert !File.exist?(File.join(TEST_PLUGIN_STORAGE_PATH, 'json', 'test-01.json'))
diff --git a/test/plugin/test_in_monitor_agent.rb b/test/plugin/test_in_monitor_agent.rb
index 24ffeb7e98..0b5a98c6d0 100644
--- a/test/plugin/test_in_monitor_agent.rb
+++ b/test/plugin/test_in_monitor_agent.rb
@@ -460,11 +460,7 @@ def write(chunk)
"output_plugin" => true,
"plugin_category" => "output",
"plugin_id" => "test_out_fail_write",
- "retry_count" => 2,
"type" => "test_out_fail_write",
- "retry" => {
- "steps" => 1
- }
}
output = @ra.outputs[0]
output.start
@@ -473,12 +469,21 @@ def write(chunk)
# flush few times to check steps
2.times do
output.force_flush
+ # output.force_flush calls #submit_flush_all, but #submit_flush_all skips to call #submit_flush_once when @retry exists.
+ # So that forced flush in retry state should be done by calling #submit_flush_once directly.
+ output.submit_flush_once
+ sleep 0.1 until output.buffer.queued?
end
response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json"))
test_out_fail_write_response = response["plugins"][1]
# remove dynamic keys
- ["start", "next_time"].each { |key| test_out_fail_write_response["retry"].delete(key) }
+ response_retry_count = test_out_fail_write_response.delete("retry_count")
+ response_retry = test_out_fail_write_response.delete("retry")
assert_equal(expected_test_out_fail_write_response, test_out_fail_write_response)
+ assert{ response_retry.has_key?("steps") }
+ # it's very hard to check exact retry count (because retries are called by output flush thread scheduling)
+ assert{ response_retry_count >= 1 && response_retry["steps"] >= 0 }
+ assert{ response_retry_count == response_retry["steps"] + 1 }
end
end
end
diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb
index 26868aeecf..83851ad360 100644
--- a/test/plugin/test_in_tail.rb
+++ b/test/plugin/test_in_tail.rb
@@ -136,8 +136,7 @@ def test_emit(data)
d.run(expect_emits: 1) do
File.open("#{TMP_DIR}/tail.txt", "ab") {|f|
- f.puts "test3"
- f.puts "test4"
+ f.puts "test3\ntest4"
}
end
@@ -368,6 +367,8 @@ def test_rotate_file_with_write_old(data)
f.puts "test6"
}
}
+ # This test sometimes fails and it shows a potential bug of in_tail
+ # https://github.com/fluent/fluentd/issues/1434
assert_equal(6, events.length)
assert_equal({"message" => "test3"}, events[0][2])
assert_equal({"message" => "test4"}, events[1][2])
@@ -988,6 +989,8 @@ def test_missing_file
f.puts "test4"
}
end
+ # This test sometimes fails and it shows a potential bug of in_tail
+ # https://github.com/fluent/fluentd/issues/1434
events = d.events
assert_equal(2, events.length)
assert_equal({"message" => "test3"}, events[0][2])
diff --git a/test/plugin/test_out_file.rb b/test/plugin/test_out_file.rb
index d7a406f3e9..92d972a8d9 100644
--- a/test/plugin/test_out_file.rb
+++ b/test/plugin/test_out_file.rb
@@ -740,7 +740,7 @@ def parse_system(text)
end
test 'raise error to show it is a bug when path including * specified without timekey' do
- assert_raise "BUG: configuration error must be raised for path including '*' without timekey" do
+ assert_raise RuntimeError.new("BUG: configuration error must be raised for path including '*' without timekey") do
@i.generate_path_template('/path/to/file.*.log', nil, false, nil)
end
end
@@ -830,7 +830,7 @@ def parse_system(text)
end
test 'raise error if argument path does not include index placeholder' do
- assert_raise "BUG: index placeholder not found in path: #{@tmp}/myfile" do
+ assert_raise RuntimeError.new("BUG: index placeholder not found in path: #{@tmp}/myfile") do
@i.find_filepath_available("#{@tmp}/myfile") do |path|
# ...
end
diff --git a/test/plugin/test_out_forward.rb b/test/plugin/test_out_forward.rb
index 6b75903f7d..3d0082b1e6 100644
--- a/test/plugin/test_out_forward.rb
+++ b/test/plugin/test_out_forward.rb
@@ -234,8 +234,8 @@ def read_ack_from_sock(sock, unpacker)
target_input_driver.run(expect_records: 2) do
d.run do
- emit_events.each do |tag, time, record|
- d.feed(tag, time, record)
+ emit_events.each do |tag, t, record|
+ d.feed(tag, t, record)
end
end
end
@@ -639,7 +639,7 @@ def create_target_input_driver(response_stub: nil, disconnect: false, conf: TARG
node = d.instance.nodes.first
assert_equal Fluent::Plugin::ForwardOutput::NoneHeartbeatNode, node.class
- d.instance.start
+ d.instance_start
assert_nil d.instance.instance_variable_get(:@loop) # no HeartbeatHandler, or HeartbeatRequestTimer
assert_nil d.instance.instance_variable_get(:@thread) # no HeartbeatHandler, or HeartbeatRequestTimer
@@ -651,7 +651,7 @@ def create_target_input_driver(response_stub: nil, disconnect: false, conf: TARG
test 'heartbeat_type_udp' do
@d = d = create_driver(CONFIG + "\nheartbeat_type udp")
- d.instance.start
+ d.instance_start
usock = d.instance.instance_variable_get(:@usock)
servers = d.instance.instance_variable_get(:@_servers)
timers = d.instance.instance_variable_get(:@_timers)
@@ -660,7 +660,6 @@ def create_target_input_driver(response_stub: nil, disconnect: false, conf: TARG
assert timers.include?(:out_forward_heartbeat_request)
mock(usock).send("\0", 0, Socket.pack_sockaddr_in(TARGET_PORT, '127.0.0.1')).once
- # timer.disable # call send_heartbeat at just once
d.instance.send(:on_timer)
end
diff --git a/test/plugin/test_output.rb b/test/plugin/test_output.rb
index 54c1fd36f9..e573dcb966 100644
--- a/test/plugin/test_output.rb
+++ b/test/plugin/test_output.rb
@@ -824,26 +824,28 @@ def invoke_slow_flush_log_threshold_test(i)
test '#write flush took longer time than slow_flush_log_threshold' do
i = create_output(:buffered)
write_called = false
- i.register(:write) { |chunk| sleep 1 }
+ i.register(:write) { |chunk| sleep 3 }
i.define_singleton_method(:test_finished?) { write_called }
i.define_singleton_method(:try_flush) { super(); write_called = true }
invoke_slow_flush_log_threshold_test(i) {
assert write_called
- assert_equal 1, i.log.out.logs.select { |line| line =~ /buffer flush took longer time than slow_flush_log_threshold: elapsed_time/ }.size
+ logs = i.log.out.logs
+ assert{ logs.any?{|log| log.include?("buffer flush took longer time than slow_flush_log_threshold: elapsed_time") } }
}
end
test '#try_write flush took longer time than slow_flush_log_threshold' do
i = create_output(:delayed)
try_write_called = false
- i.register(:try_write){ |chunk| sleep 1 }
+ i.register(:try_write){ |chunk| sleep 3 }
i.define_singleton_method(:test_finished?) { try_write_called }
i.define_singleton_method(:try_flush) { super(); try_write_called = true }
invoke_slow_flush_log_threshold_test(i) {
assert try_write_called
- assert_equal 1, i.log.out.logs.select { |line| line =~ /buffer flush took longer time than slow_flush_log_threshold: elapsed_time/ }.size
+ logs = i.log.out.logs
+ assert{ logs.any?{|log| log.include?("buffer flush took longer time than slow_flush_log_threshold: elapsed_time") } }
}
end
end
diff --git a/test/plugin/test_output_as_buffered.rb b/test/plugin/test_output_as_buffered.rb
index 334947d91f..aa11e23071 100644
--- a/test/plugin/test_output_as_buffered.rb
+++ b/test/plugin/test_output_as_buffered.rb
@@ -642,6 +642,22 @@ def waiting(seconds)
ary.reject!{|e| true }
end
end
+ end
+
+ sub_test_case 'with much longer flush_interval' do
+ setup do
+ hash = {
+ 'flush_mode' => 'interval',
+ 'flush_interval' => 3000,
+ 'flush_thread_count' => 1,
+ 'flush_thread_burst_interval' => 0.01,
+ 'chunk_limit_size' => 1024,
+ }
+ @i = create_output(:buffered)
+ @i.configure(config_element('ROOT','',{},[config_element('buffer','',hash)]))
+ @i.start
+ @i.after_start
+ end
test 'flush_at_shutdown work well when plugin is shutdown' do
ary = []
@@ -658,16 +674,15 @@ def waiting(seconds)
(1024 * 0.9 / event_size).to_i.times do |i|
@i.emit_events("test.tag", Fluent::ArrayEventStream.new([ [t, r] ]))
end
- assert{ @i.buffer.queue.size == 0 && ary.size == 0 }
+ queue_size = @i.buffer.queue.size
+ assert{ queue_size == 0 && ary.size == 0 }
@i.stop
@i.before_shutdown
@i.shutdown
@i.after_shutdown
- waiting(10) do
- Thread.pass until ary.size == 1
- end
+ waiting(10){ sleep 0.1 until ary.size == 1 }
assert_equal [tag,t,r].to_json * (1024 * 0.9 / event_size), ary.first
end
end
@@ -730,11 +745,9 @@ def waiting(seconds)
assert_equal rand_records, es.size
@i.emit_events("test.tag", es)
- assert{ @i.buffer.stage.size == 0 && (@i.buffer.queue.size == 1 || @i.buffer.dequeued.size == 1 || ary.size > 0) }
-
- waiting(10) do
- Thread.pass until @i.buffer.queue.size == 0 && @i.buffer.dequeued.size == 0
- end
+ waiting(10){ sleep 0.1 until @i.buffer.stage.size == 0 } # make sure that the emitted es is enqueued by "flush_mode immediate"
+ waiting(10){ sleep 0.1 until @i.buffer.queue.size == 0 && @i.buffer.dequeued.size == 0 }
+ waiting(10){ sleep 0.1 until ary.size == rand_records }
assert_equal rand_records, ary.size
ary.reject!{|e| true }
@@ -863,12 +876,12 @@ def waiting(seconds)
@i.enqueue_thread_wait
- waiting(4) do
- Thread.pass until @i.write_count > 0
- end
+ waiting(4){ sleep 0.1 until @i.write_count > 0 }
assert{ @i.buffer.stage.size == 2 && @i.write_count == 1 }
+ waiting(4){ sleep 0.1 until ary.size == 3 }
+
assert_equal 3, ary.size
assert_equal 2, ary.select{|e| e[0] == "test.tag.1" }.size
assert_equal 1, ary.select{|e| e[0] == "test.tag.2" }.size
@@ -882,9 +895,7 @@ def waiting(seconds)
Timecop.freeze( Time.parse('2016-04-13 14:04:06 +0900') )
@i.enqueue_thread_wait
- waiting(4) do
- Thread.pass until @i.write_count > 1
- end
+ waiting(4){ sleep 0.1 until @i.write_count > 1 }
assert{ @i.buffer.stage.size == 1 && @i.write_count == 2 }
@@ -904,7 +915,13 @@ def waiting(seconds)
metachecks = []
@i.register(:format){|tag,time,record| [tag,time,record].to_json + "\n" }
- @i.register(:write){|chunk| chunk.read.split("\n").reject{|l| l.empty? }.each{|data| e = JSON.parse(data); ary << e; metachecks << (chunk.metadata.timekey.to_i <= e[1].to_i && e[1].to_i < chunk.metadata.timekey.to_i + 30) } }
+ @i.register(:write){|chunk|
+ chunk.read.split("\n").reject{|l| l.empty? }.each{|data|
+ e = JSON.parse(data)
+ ary << e
+ metachecks << (chunk.metadata.timekey.to_i <= e[1].to_i && e[1].to_i < chunk.metadata.timekey.to_i + 30)
+ }
+ }
r = {}
(0...10).each do |i|
@@ -942,9 +959,7 @@ def waiting(seconds)
@i.enqueue_thread_wait
- waiting(4) do
- Thread.pass until @i.write_count > 0
- end
+ waiting(4){ sleep 0.1 until @i.write_count > 0 }
assert{ @i.buffer.stage.size == 2 && @i.write_count == 1 }
@@ -957,14 +972,13 @@ def waiting(seconds)
Timecop.freeze( Time.parse('2016-04-13 14:04:06 +0900') )
@i.enqueue_thread_wait
- waiting(4) do
- Thread.pass until @i.write_count > 1
- end
+ waiting(4){ sleep 0.1 until @i.write_count > 1 }
assert{ @i.buffer.stage.size == 1 && @i.write_count == 2 }
Timecop.freeze( Time.parse('2016-04-13 14:04:13 +0900') )
+ waiting(4){ sleep 0.1 until ary.size == 9 }
assert_equal 9, ary.size
@i.stop
@@ -972,9 +986,7 @@ def waiting(seconds)
@i.shutdown
@i.after_shutdown
- waiting(4) do
- Thread.pass until @i.write_count > 2
- end
+ waiting(4){ sleep 0.1 until @i.write_count > 2 && ary.size == 11 }
assert_equal 11, ary.size
assert metachecks.all?{|e| e }
diff --git a/test/plugin/test_output_as_buffered_compress.rb b/test/plugin/test_output_as_buffered_compress.rb
index b7b2aee3e6..b698aeafee 100644
--- a/test/plugin/test_output_as_buffered_compress.rb
+++ b/test/plugin/test_output_as_buffered_compress.rb
@@ -157,7 +157,7 @@ def dummy_event_stream
@i.emit_events('tag', es)
@i.enqueue_thread_wait
@i.flush_thread_wakeup
- waiting(4) { Thread.pass until io.size > 0 }
+ waiting(4) { sleep 0.1 until io.size > 0 }
assert_equal expected, decompress(compressed_data)
assert_equal expected, io.string
diff --git a/test/plugin/test_output_as_buffered_retries.rb b/test/plugin/test_output_as_buffered_retries.rb
index 1fa11095ab..df9aca34fc 100644
--- a/test/plugin/test_output_as_buffered_retries.rb
+++ b/test/plugin/test_output_as_buffered_retries.rb
@@ -687,7 +687,7 @@ def get_log_time(msg, logs)
@i.enqueue_thread_wait
@i.flush_thread_wakeup
- waiting(4){ Thread.pass until @i.write_count > 0 && @i.num_errors > 0 }
+ waiting(4){ sleep 0.1 until @i.write_count > 0 && @i.num_errors > 0 }
assert{ @i.buffer.queue.size > 0 }
assert{ @i.buffer.queue.first.metadata.tag == 'test.tag.1' }
@@ -703,10 +703,10 @@ def get_log_time(msg, logs)
15.times do |i|
now = @i.next_flush_time
- Timecop.freeze( now )
+ Timecop.freeze( now + 1 )
@i.enqueue_thread_wait
@i.flush_thread_wakeup
- waiting(4){ Thread.pass until @i.write_count > prev_write_count && @i.num_errors > prev_num_errors }
+ waiting(4){ sleep 0.1 until @i.write_count > prev_write_count && @i.num_errors > prev_num_errors }
assert{ @i.write_count > prev_write_count }
assert{ @i.num_errors > prev_num_errors }
@@ -758,7 +758,7 @@ def get_log_time(msg, logs)
@i.enqueue_thread_wait
@i.flush_thread_wakeup
- waiting(4){ Thread.pass until @i.write_count > 0 && @i.num_errors > 0 }
+ waiting(4){ sleep 0.1 until @i.write_count > 0 && @i.num_errors > 0 }
assert{ @i.buffer.queue.size > 0 }
assert{ @i.buffer.queue.first.metadata.tag == 'test.tag.1' }
@@ -774,10 +774,10 @@ def get_log_time(msg, logs)
15.times do |i|
now = @i.next_flush_time
- Timecop.freeze( now )
+ Timecop.freeze( now + 1 )
@i.enqueue_thread_wait
@i.flush_thread_wakeup
- waiting(4){ Thread.pass until @i.write_count > prev_write_count && @i.num_errors > prev_num_errors }
+ waiting(4){ sleep 0.1 until @i.write_count > prev_write_count && @i.num_errors > prev_num_errors }
assert{ @i.write_count > prev_write_count }
assert{ @i.num_errors > prev_num_errors }
diff --git a/test/plugin/test_output_as_buffered_secondary.rb b/test/plugin/test_output_as_buffered_secondary.rb
index 622386247e..a98f0c68bf 100644
--- a/test/plugin/test_output_as_buffered_secondary.rb
+++ b/test/plugin/test_output_as_buffered_secondary.rb
@@ -230,7 +230,7 @@ def dummy_event_stream
@i.enqueue_thread_wait
@i.flush_thread_wakeup
- waiting(4){ Thread.pass until @i.write_count > 0 && @i.num_errors > 0 }
+ waiting(4){ sleep 0.1 until @i.write_count > 0 && @i.num_errors > 0 }
assert{ @i.buffer.queue.size > 0 }
assert{ @i.buffer.queue.first.metadata.tag == 'test.tag.1' }
@@ -244,16 +244,30 @@ def dummy_event_stream
first_failure = @i.retry.start
# retry_timeout == 60(sec), retry_secondary_threshold == 0.8
+ now = first_failure + 60 * 0.8 + 1 # to step from primary to secondary
+ Timecop.freeze( now )
- now = first_failure + 60 * 0.8 + 1
+ unless @i.retry.secondary?
+ @i.enqueue_thread_wait
+ @i.flush_thread_wakeup
+ waiting(4){ sleep 0.1 until @i.write_count > prev_write_count }
+
+ prev_write_count = @i.write_count
+ prev_num_errors = @i.num_errors
+
+ # next step is on secondary
+ now = first_failure + 60 * 0.8 + 10
+ Timecop.freeze( now )
+ end
- Timecop.freeze( now )
@i.enqueue_thread_wait
@i.flush_thread_wakeup
- waiting(4){ Thread.pass until @i.write_count > prev_write_count }
+ waiting(4){ sleep 0.1 until @i.write_count > prev_write_count }
- assert{ @i.write_count > prev_write_count }
- assert{ @i.num_errors == prev_num_errors }
+ current_write_count = @i.write_count
+ current_num_errors = @i.num_errors
+ assert{ current_write_count > prev_write_count }
+ assert{ current_num_errors == prev_num_errors }
assert_nil @i.retry
@@ -261,7 +275,9 @@ def dummy_event_stream
assert_equal [ 'test.tag.1', event_time('2016-04-13 18:33:13').to_i, {"name" => "moris", "age" => 36, "message" => "data2"} ], written[1]
assert_equal [ 'test.tag.1', event_time('2016-04-13 18:33:32').to_i, {"name" => "moris", "age" => 36, "message" => "data3"} ], written[2]
- assert{ @i.log.out.logs.any?{|l| l.include?("[warn]: retry succeeded by secondary.") } }
+ logs = @i.log.out.logs
+ waiting(4){ sleep 0.1 until logs.any?{|l| l.include?("[warn]: retry succeeded by secondary.") } }
+ assert{ logs.any?{|l| l.include?("[warn]: retry succeeded by secondary.") } }
end
test 'secondary can do non-delayed commit even if primary do delayed commit' do
@@ -295,7 +311,7 @@ def dummy_event_stream
@i.enqueue_thread_wait
@i.flush_thread_wakeup
- waiting(4){ Thread.pass until @i.write_count > 0 && @i.num_errors > 0 }
+ waiting(4){ sleep 0.1 until @i.write_count > 0 && @i.num_errors > 0 }
assert{ @i.buffer.queue.size > 0 }
assert{ @i.buffer.queue.first.metadata.tag == 'test.tag.1' }
@@ -309,13 +325,25 @@ def dummy_event_stream
first_failure = @i.retry.start
# retry_timeout == 60(sec), retry_secondary_threshold == 0.8
+ now = first_failure + 60 * 0.8 + 1 # to step from primary to secondary
+ Timecop.freeze( now )
- now = first_failure + 60 * 0.8 + 1
+ unless @i.retry.secondary?
+ @i.enqueue_thread_wait
+ @i.flush_thread_wakeup
+ waiting(4){ sleep 0.1 until @i.write_count > prev_write_count }
+
+ prev_write_count = @i.write_count
+ prev_num_errors = @i.num_errors
+
+ # next step is on secondary
+ now = first_failure + 60 * 0.8 + 10
+ Timecop.freeze( now )
+ end
- Timecop.freeze( now )
@i.enqueue_thread_wait
@i.flush_thread_wakeup
- waiting(4){ Thread.pass until @i.write_count > prev_write_count }
+ waiting(4){ sleep 0.1 until @i.write_count > prev_write_count }
assert{ @i.write_count > prev_write_count }
assert{ @i.num_errors == prev_num_errors }
@@ -326,7 +354,9 @@ def dummy_event_stream
assert_equal [ 'test.tag.1', event_time('2016-04-13 18:33:13').to_i, {"name" => "moris", "age" => 36, "message" => "data2"} ], written[1]
assert_equal [ 'test.tag.1', event_time('2016-04-13 18:33:32').to_i, {"name" => "moris", "age" => 36, "message" => "data3"} ], written[2]
- assert{ @i.log.out.logs.any?{|l| l.include?("[warn]: retry succeeded by secondary.") } }
+ logs = @i.log.out.logs
+ waiting(4){ sleep 0.1 until logs.any?{|l| l.include?("[warn]: retry succeeded by secondary.") } }
+ assert{ logs.any?{|l| l.include?("[warn]: retry succeeded by secondary.") } }
end
test 'secondary plugin can do delayed commit if primary do it' do
@@ -361,7 +391,7 @@ def dummy_event_stream
@i.enqueue_thread_wait
@i.flush_thread_wakeup
- waiting(4){ Thread.pass until @i.write_count > 0 && @i.num_errors > 0 }
+ waiting(4){ sleep 0.1 until @i.write_count > 0 && @i.num_errors > 0 }
assert{ @i.buffer.queue.size > 0 }
assert{ @i.buffer.queue.first.metadata.tag == 'test.tag.1' }
@@ -375,13 +405,25 @@ def dummy_event_stream
first_failure = @i.retry.start
# retry_timeout == 60(sec), retry_secondary_threshold == 0.8
+ now = first_failure + 60 * 0.8 + 1 # to step from primary to secondary
+ Timecop.freeze( now )
- now = first_failure + 60 * 0.8 + 1
+ unless @i.retry.secondary?
+ @i.enqueue_thread_wait
+ @i.flush_thread_wakeup
+ waiting(4){ sleep 0.1 until @i.write_count > prev_write_count }
+
+ prev_write_count = @i.write_count
+ prev_num_errors = @i.num_errors
+
+ # next step is on secondary
+ now = first_failure + 60 * 0.8 + 10
+ Timecop.freeze( now )
+ end
- Timecop.freeze( now )
@i.enqueue_thread_wait
@i.flush_thread_wakeup
- waiting(4){ Thread.pass until @i.write_count > prev_write_count }
+ waiting(4){ sleep 0.1 until @i.write_count > prev_write_count }
assert{ @i.write_count > prev_write_count }
assert{ @i.num_errors == prev_num_errors }
@@ -403,7 +445,9 @@ def dummy_event_stream
assert_nil @i.retry
- assert{ @i.log.out.logs.any?{|l| l.include?("[warn]: retry succeeded by secondary.") } }
+ logs = @i.log.out.logs
+ waiting(4){ sleep 0.1 until logs.any?{|l| l.include?("[warn]: retry succeeded by secondary.") } }
+ assert{ logs.any?{|l| l.include?("[warn]: retry succeeded by secondary.") } }
end
test 'secondary plugin can do delayed commit even if primary does not do it' do
@@ -438,7 +482,7 @@ def dummy_event_stream
@i.enqueue_thread_wait
@i.flush_thread_wakeup
- waiting(4){ Thread.pass until @i.write_count > 0 && @i.num_errors > 0 }
+ waiting(4){ sleep 0.1 until @i.write_count > 0 && @i.num_errors > 0 }
assert{ @i.buffer.queue.size > 0 }
assert{ @i.buffer.queue.first.metadata.tag == 'test.tag.1' }
@@ -452,13 +496,25 @@ def dummy_event_stream
first_failure = @i.retry.start
# retry_timeout == 60(sec), retry_secondary_threshold == 0.8
+ now = first_failure + 60 * 0.8 + 1 # to step from primary to secondary
+ Timecop.freeze( now )
- now = first_failure + 60 * 0.8 + 1
+ unless @i.retry.secondary?
+ @i.enqueue_thread_wait
+ @i.flush_thread_wakeup
+ waiting(4){ sleep 0.1 until @i.write_count > prev_write_count }
+
+ prev_write_count = @i.write_count
+ prev_num_errors = @i.num_errors
+
+ # next step is on secondary
+ now = first_failure + 60 * 0.8 + 10
+ Timecop.freeze( now )
+ end
- Timecop.freeze( now )
@i.enqueue_thread_wait
@i.flush_thread_wakeup
- waiting(4){ Thread.pass until @i.write_count > prev_write_count }
+ waiting(4){ sleep 0.1 until @i.write_count > prev_write_count }
assert{ @i.write_count > prev_write_count }
assert{ @i.num_errors == prev_num_errors }
@@ -480,7 +536,9 @@ def dummy_event_stream
assert_nil @i.retry
- assert{ @i.log.out.logs.any?{|l| l.include?("[warn]: retry succeeded by secondary.") } }
+ logs = @i.log.out.logs
+ waiting(4){ sleep 0.1 until logs.any?{|l| l.include?("[warn]: retry succeeded by secondary.") } }
+ assert{ logs.any?{|l| l.include?("[warn]: retry succeeded by secondary.") } }
end
test 'secondary plugin can do delayed commit even if primary does not do it, and non-committed chunks will be rollbacked by primary' do
@@ -515,7 +573,7 @@ def dummy_event_stream
@i.enqueue_thread_wait
@i.flush_thread_wakeup
- waiting(4){ Thread.pass until @i.write_count > 0 && @i.num_errors > 0 }
+ waiting(4){ sleep 0.1 until @i.write_count > 0 && @i.num_errors > 0 }
assert{ @i.buffer.queue.size == 2 }
assert{ @i.buffer.queue.first.metadata.tag == 'test.tag.1' }
@@ -539,7 +597,7 @@ def dummy_event_stream
@i.enqueue_thread_wait
@i.flush_thread_wakeup
- waiting(4){ Thread.pass until chunks.size == 2 }
+ waiting(4){ sleep 0.1 until chunks.size == 2 }
assert{ @i.write_count > prev_write_count }
assert{ @i.num_errors == prev_num_errors }
@@ -568,6 +626,7 @@ def dummy_event_stream
assert @i.retry
logs = @i.log.out.logs
+ waiting(4){ sleep 0.1 until logs.select{|l| l.include?("[warn]: failed to flush the buffer chunk, timeout to commit.") }.size == 2 }
assert{ logs.select{|l| l.include?("[warn]: failed to flush the buffer chunk, timeout to commit.") }.size == 2 }
end
@@ -601,7 +660,7 @@ def dummy_event_stream
@i.enqueue_thread_wait
@i.flush_thread_wakeup
- waiting(4){ Thread.pass until @i.write_count > 0 && @i.num_errors > 0 }
+ waiting(4){ sleep 0.1 until @i.write_count > 0 && @i.num_errors > 0 }
assert{ @i.buffer.queue.size > 0 }
assert{ @i.buffer.queue.first.metadata.tag == 'test.tag.1' }
@@ -621,7 +680,7 @@ def dummy_event_stream
Timecop.freeze( now )
@i.enqueue_thread_wait
@i.flush_thread_wakeup
- waiting(4){ Thread.pass until @i.write_count > prev_write_count }
+ waiting(4){ sleep 0.1 until @i.write_count > prev_write_count }
assert{ @i.write_count > prev_write_count }
assert{ @i.num_errors > prev_num_errors }
@@ -631,6 +690,7 @@ def dummy_event_stream
assert_equal 3, (@i.next_flush_time - Time.now)
logs = @i.log.out.logs
+ waiting(4){ sleep 0.1 until logs.any?{|l| l.include?("[warn]: failed to flush the buffer with secondary output.") } }
assert{ logs.any?{|l| l.include?("[warn]: failed to flush the buffer with secondary output.") } }
end
end
@@ -672,7 +732,7 @@ def dummy_event_stream
@i.enqueue_thread_wait
@i.flush_thread_wakeup
- waiting(4){ Thread.pass until @i.write_count > 0 && @i.num_errors > 0 }
+ waiting(4){ sleep 0.1 until @i.write_count > 0 && @i.num_errors > 0 }
assert{ @i.buffer.queue.size > 0 }
assert{ @i.buffer.queue.first.metadata.tag == 'test.tag.1' }
@@ -690,7 +750,7 @@ def dummy_event_stream
Timecop.freeze( now )
@i.enqueue_thread_wait
@i.flush_thread_wakeup
- waiting(4){ Thread.pass until @i.write_count > prev_write_count }
+ waiting(4){ sleep 0.1 until @i.write_count > prev_write_count }
assert{ @i.write_count > prev_write_count }
@@ -743,7 +803,7 @@ def dummy_event_stream
@i.enqueue_thread_wait
@i.flush_thread_wakeup
- waiting(4){ Thread.pass until @i.write_count > 0 && @i.num_errors > 0 }
+ waiting(4){ sleep 0.1 until @i.write_count > 0 && @i.num_errors > 0 }
assert{ @i.buffer.queue.size > 0 }
assert{ @i.buffer.queue.first.metadata.tag == 'test.tag.1' }
@@ -768,7 +828,7 @@ def dummy_event_stream
Timecop.freeze( now )
@i.enqueue_thread_wait
@i.flush_thread_wakeup
- waiting(4){ Thread.pass until @i.write_count > prev_write_count }
+ waiting(4){ sleep 0.1 until @i.write_count > prev_write_count }
assert{ @i.write_count > prev_write_count }
assert{ @i.num_errors > prev_num_errors }
@@ -800,7 +860,7 @@ def dummy_event_stream
Timecop.freeze( now )
@i.enqueue_thread_wait
@i.flush_thread_wakeup
- waiting(4){ Thread.pass until @i.write_count > prev_write_count }
+ waiting(4){ sleep 0.1 until @i.write_count > prev_write_count }
assert{ @i.write_count > prev_write_count }
assert{ @i.num_errors > prev_num_errors }
diff --git a/test/plugin_helper/test_child_process.rb b/test/plugin_helper/test_child_process.rb
index 889fd10df5..ec714dd7c3 100644
--- a/test/plugin_helper/test_child_process.rb
+++ b/test/plugin_helper/test_child_process.rb
@@ -630,17 +630,15 @@ def configure(conf)
str = nil
- Timeout.timeout(TEST_DEADLOCK_TIMEOUT) do
- pid = nil
- @d.child_process_execute(:st1, "ruby", arguments: args, mode: [:read], on_exit_callback: cb) do |readio|
- pid = @d.instance_eval{ child_process_id }
- str = readio.read.chomp
- block_exits = true
- end
- sleep TEST_WAIT_INTERVAL_FOR_BLOCK_RUNNING while @d.child_process_exist?(pid) # to get exit status
- sleep TEST_WAIT_INTERVAL_FOR_BLOCK_RUNNING until block_exits
- sleep TEST_WAIT_INTERVAL_FOR_BLOCK_RUNNING until callback_called
+ pid = nil
+ @d.child_process_execute(:st1, "ruby", arguments: args, mode: [:read], on_exit_callback: cb) do |readio|
+ pid = @d.instance_eval{ child_process_id }
+ str = readio.read.chomp
+ block_exits = true
end
+ waiting(TEST_DEADLOCK_TIMEOUT){ sleep TEST_WAIT_INTERVAL_FOR_BLOCK_RUNNING while @d.child_process_exist?(pid) } # to get exit status
+ waiting(TEST_DEADLOCK_TIMEOUT){ sleep TEST_WAIT_INTERVAL_FOR_BLOCK_RUNNING until block_exits }
+ waiting(TEST_DEADLOCK_TIMEOUT){ sleep TEST_WAIT_INTERVAL_FOR_BLOCK_RUNNING until callback_called }
assert callback_called
assert exit_status
@@ -658,32 +656,35 @@ def configure(conf)
block_exits = false
callback_called = false
exit_status = nil
- args = ['-e', 'sleep ARGV[0].to_i; puts "yay"; File.unlink ARGV[1]', '25', @temp_path]
+ args = ['-e', 'sleep ARGV[0].to_i; puts "yay"; File.unlink ARGV[1]', '100', @temp_path]
cb = ->(status){ exit_status = status; callback_called = true }
str = nil
- Timeout.timeout(TEST_DEADLOCK_TIMEOUT) do
- pid = nil
- @d.child_process_execute(:st1, "ruby", arguments: args, mode: [:read], on_exit_callback: cb) do |readio|
- pid = @d.instance_eval{ child_process_id }
- Process.kill(:QUIT, pid)
- Process.kill(:QUIT, pid) rescue nil # once more to kill certainly
- str = readio.read.chomp rescue nil # empty string before EOF
- block_exits = true
- end
- sleep TEST_WAIT_INTERVAL_FOR_BLOCK_RUNNING while @d.child_process_exist?(pid) # to get exit status
- sleep TEST_WAIT_INTERVAL_FOR_BLOCK_RUNNING until block_exits
- sleep TEST_WAIT_INTERVAL_FOR_BLOCK_RUNNING until callback_called
- end
+ pid = nil
+ @d.child_process_execute(:st1, "ruby", arguments: args, mode: [:read], on_exit_callback: cb) do |readio|
+ pid = @d.instance_eval{ child_process_id }
+ sleep 10 # to run child process correctly
+ Process.kill(:QUIT, pid)
+ sleep 1
+ Process.kill(:QUIT, pid) rescue nil # once more to send kill
+ sleep 1
+ Process.kill(:QUIT, pid) rescue nil # just like sync
+ str = readio.read.chomp rescue nil # empty string before EOF
+ block_exits = true
+ end
+ waiting(TEST_DEADLOCK_TIMEOUT){ sleep TEST_WAIT_INTERVAL_FOR_BLOCK_RUNNING while @d.child_process_exist?(pid) } # to get exit status
+ waiting(TEST_DEADLOCK_TIMEOUT){ sleep TEST_WAIT_INTERVAL_FOR_BLOCK_RUNNING until block_exits }
+ waiting(TEST_DEADLOCK_TIMEOUT){ sleep TEST_WAIT_INTERVAL_FOR_BLOCK_RUNNING until callback_called }
assert callback_called
assert exit_status
- assert_equal [nil, 3], [exit_status.exitstatus, exit_status.termsig] # SIGQUIT
-
- assert File.exist?(@temp_path)
- assert_equal "", str
+ # This test sometimes fails on TravisCI
+ # with [nil, 11] # SIGSEGV
+ # or with [1, nil] # ???
+ assert_equal [nil, 3, true, ""], [exit_status.exitstatus, exit_status.termsig, File.exist?(@temp_path), str] # SIGQUIT
+ # SIGSEGV looks a kind of BUG of ruby...
end
test 'calls on_exit_callback for each process exits for interval call using on_exit_callback' do
diff --git a/test/plugin_helper/test_server.rb b/test/plugin_helper/test_server.rb
index a570456b5b..ae906de967 100644
--- a/test/plugin_helper/test_server.rb
+++ b/test/plugin_helper/test_server.rb
@@ -27,12 +27,12 @@ class Dummy < Fluent::Plugin::TestBase
end
teardown do
- @d.stopped? || @d.stop
- @d.before_shutdown? || @d.before_shutdown
- @d.shutdown? || @d.shutdown
- @d.after_shutdown? || @d.after_shutdown
- @d.closed? || @d.close
- @d.terminated? || @d.terminate
+ (@d.stopped? || @d.stop) rescue nil
+ (@d.before_shutdown? || @d.before_shutdown) rescue nil
+ (@d.shutdown? || @d.shutdown) rescue nil
+ (@d.after_shutdown? || @d.after_shutdown) rescue nil
+ (@d.closed? || @d.close) rescue nil
+ (@d.terminated? || @d.terminate) rescue nil
@socket_manager_server.close
if @socket_manager_server.is_a?(String) && File.exist?(@socket_manager_path)
@@ -645,7 +645,7 @@ class Dummy < Fluent::Plugin::TestBase
sock.write "foo\n"
sock.close
- waiting(10){ sleep 0.1 until received.bytesize == 4 || errors.size == 1 }
+ waiting(10){ sleep 0.1 until received.bytesize == 4 && errors.size == 1 }
assert_equal "foo\n", received
assert_equal 1, errors.size
assert_equal "BUG: this event is disabled for udp: data", errors.first.message
@@ -667,7 +667,7 @@ class Dummy < Fluent::Plugin::TestBase
sock.write "foo\n"
sock.close
- waiting(10){ sleep 0.1 until received.bytesize == 4 || errors.size == 1 }
+ waiting(10){ sleep 0.1 until received.bytesize == 4 && errors.size == 1 }
assert_equal "foo\n", received
assert_equal 1, errors.size
assert_equal "BUG: this event is disabled for udp: write_complete", errors.first.message
@@ -689,7 +689,7 @@ class Dummy < Fluent::Plugin::TestBase
sock.write "foo\n"
sock.close
- waiting(10){ sleep 0.1 until received.bytesize == 4 || errors.size == 1 }
+ waiting(10){ sleep 0.1 until received.bytesize == 4 && errors.size == 1 }
assert_equal "foo\n", received
assert_equal 1, errors.size
assert_equal "BUG: this event is disabled for udp: close", errors.first.message
diff --git a/test/plugin_helper/test_timer.rb b/test/plugin_helper/test_timer.rb
index 2928507951..03dfa88c01 100644
--- a/test/plugin_helper/test_timer.rb
+++ b/test/plugin_helper/test_timer.rb
@@ -122,6 +122,7 @@ class Dummy < Fluent::Plugin::TestBase
sleep(0.1) while waiting_timer
assert_equal(1, counter)
+ waiting(4){ sleep 0.1 while watchers.first.attached? }
assert_false(watchers.first.attached?)
watchers = d1._event_loop.watchers.reject {|w| w.is_a?(Fluent::PluginHelper::EventLoop::DefaultWatcher) }
assert_equal(0, watchers.size)
diff --git a/test/test_clock.rb b/test/test_clock.rb
index 5d1922387b..2bed54196c 100644
--- a/test/test_clock.rb
+++ b/test/test_clock.rb
@@ -39,7 +39,7 @@ class ClockTest < ::Test::Unit::TestCase
end
test 'Clock.return raises an error if it is called in block' do
- assert_raise "invalid return while running code in blocks" do
+ assert_raise RuntimeError.new("invalid return while running code in blocks") do
Fluent::Clock.freeze do
Fluent::Clock.return
end
@@ -148,14 +148,16 @@ class ClockTest < ::Test::Unit::TestCase
t2 = t0 + 30
assert_kind_of Time, t2
+ # 31 is for error of floating point value
Fluent::Clock.freeze(t1) do
c1 = Fluent::Clock.now
- assert{ c1 >= c0 - 30 && c1 <= c0 - 30 + 10 } # +10 is for threading schedule error
+ assert{ c1 >= c0 - 31 && c1 <= c0 - 31 + 10 } # +10 is for threading schedule error
end
+ # 29 is for error of floating point value
Fluent::Clock.freeze(t2) do
c2 = Fluent::Clock.now
- assert{ c2 >= c0 + 30 && c2 <= c0 + 30 + 10 } # +10 is for threading schedule error
+ assert{ c2 >= c0 + 29 && c2 <= c0 + 29 + 10 } # +10 is for threading schedule error
end
end
end
diff --git a/test/test_supervisor.rb b/test/test_supervisor.rb
index 867f59f028..8b6db9130f 100644
--- a/test/test_supervisor.rb
+++ b/test/test_supervisor.rb
@@ -9,10 +9,13 @@
require 'fileutils'
class SupervisorTest < ::Test::Unit::TestCase
- include Fluent
- include FluentTest
- include ServerModule
- include WorkerModule
+ class DummyServer
+ include Fluent::ServerModule
+ attr_accessor :rpc_endpoint, :enable_get_dump
+ def config
+ {}
+ end
+ end
TMP_DIR = File.expand_path(File.dirname(__FILE__) + "/tmp/supervisor#{ENV['TEST_ENV_NUMBER']}")
TMP_ROOT_DIR = File.join(TMP_DIR, 'root')
@@ -105,80 +108,83 @@ def test_system_config
end
def test_main_process_signal_handlers
- create_info_dummy_logger
-
- unless Fluent.windows?
- opts = Fluent::Supervisor.default_options
- sv = Fluent::Supervisor.new(opts)
- sv.send(:install_main_process_signal_handlers)
+ omit "Windows cannot handle signals" if Fluent.windows?
- begin
- Process.kill :USR1, $$
- rescue
- end
+ create_info_dummy_logger
- sleep 1
+ opts = Fluent::Supervisor.default_options
+ sv = Fluent::Supervisor.new(opts)
+ sv.send(:install_main_process_signal_handlers)
- info_msg = '[info]: force flushing buffered events' + "\n"
- assert{ $log.out.logs.first.end_with?(info_msg) }
+ begin
+ Process.kill :USR1, $$
+ rescue
end
- $log.out.reset
+ sleep 1
+
+ info_msg = '[info]: force flushing buffered events' + "\n"
+ assert{ $log.out.logs.first.end_with?(info_msg) }
+ ensure
+ $log.out.reset if $log && $log.out && $log.out.respond_to?(:reset)
end
def test_supervisor_signal_handler
- create_debug_dummy_logger
+ omit "Windows cannot handle signals" if Fluent.windows?
- unless Fluent.windows?
-
- install_supervisor_signal_handlers
- begin
- Process.kill :USR1, $$
- rescue
- end
-
- sleep 1
+ create_debug_dummy_logger
- debug_msg = '[debug]: fluentd supervisor process get SIGUSR1' + "\n"
- assert{ $log.out.logs.first.end_with?(debug_msg) }
+ server = DummyServer.new
+ server.install_supervisor_signal_handlers
+ begin
+ Process.kill :USR1, $$
+ rescue
end
- $log.out.reset
+ sleep 1
+
+ debug_msg = '[debug]: fluentd supervisor process get SIGUSR1'
+ logs = $log.out.logs
+ assert{ logs.any?{|log| log.include?(debug_msg) } }
+ ensure
+ $log.out.reset if $log && $log.out && $log.out.respond_to?(:reset)
end
def test_rpc_server
+ omit "Windows cannot handle signals" if Fluent.windows?
+
create_info_dummy_logger
- unless Fluent.windows?
- opts = Fluent::Supervisor.default_options
- sv = Fluent::Supervisor.new(opts)
- conf_data = <<-EOC
+ opts = Fluent::Supervisor.default_options
+ sv = Fluent::Supervisor.new(opts)
+ conf_data = <<-EOC
rpc_endpoint 0.0.0.0:24447
- EOC
- conf = Fluent::Config.parse(conf_data, "(test)", "(test_dir)", true)
- sv.instance_variable_set(:@conf, conf)
- sv.send(:set_system_config)
- sys_conf = sv.instance_variable_get(:@system_config)
- @rpc_endpoint = sys_conf.rpc_endpoint
- @enable_get_dump = sys_conf.enable_get_dump
+ EOC
+ conf = Fluent::Config.parse(conf_data, "(test)", "(test_dir)", true)
+ sv.instance_variable_set(:@conf, conf)
+ sv.send(:set_system_config)
+ sys_conf = sv.instance_variable_get(:@system_config)
- run_rpc_server
+ server = DummyServer.new
+ server.rpc_endpoint = sys_conf.rpc_endpoint
+ server.enable_get_dump = sys_conf.enable_get_dump
- sv.send(:install_main_process_signal_handlers)
- Net::HTTP.get URI.parse('http://0.0.0.0:24447/api/plugins.flushBuffers')
- info_msg = '[info]: force flushing buffered events' + "\n"
+ server.run_rpc_server
- stop_rpc_server
+ sv.send(:install_main_process_signal_handlers)
+ Net::HTTP.get URI.parse('http://0.0.0.0:24447/api/plugins.flushBuffers')
+ info_msg = '[info]: force flushing buffered events' + "\n"
- # In TravisCI with OSX(Xcode), it seems that can't use rpc server.
- # This test will be passed in such environment.
- pend unless $log.out.logs.first
+ server.stop_rpc_server
- assert{ $log.out.logs.first.end_with?(info_msg) }
- end
+ # In TravisCI with OSX(Xcode), it seems that can't use rpc server.
+ # This test will be passed in such environment.
+ pend unless $log.out.logs.first
+ assert{ $log.out.logs.first.end_with?(info_msg) }
+ ensure
$log.out.reset
end
diff --git a/test/test_test_drivers.rb b/test/test_test_drivers.rb
index 695f74e6c8..f00feeb5e7 100644
--- a/test/test_test_drivers.rb
+++ b/test/test_test_drivers.rb
@@ -67,7 +67,7 @@ def setup
assert_nothing_raised do
before = Process.clock_gettime(Process::CLOCK_MONOTONIC)
d.end_if{ false }
- d.run(timeout: 1) do
+ d.run(timeout: 5) do
sleep 0.1 until d.stop?
end
after = Process.clock_gettime(Process::CLOCK_MONOTONIC)
@@ -89,7 +89,7 @@ def start
end
end
end
- assert_raise "yaaaaaaaaaay!" do
+ assert_raise RuntimeError.new("yaaaaaaaaaay!") do
d.end_if{ false }
d.run(timeout: 3) do
sleep 0.1 until d.stop?