From 52e46f04b3cb7bddc809841e1e54f3cf8925681c Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Fri, 27 Oct 2023 19:17:52 +0900 Subject: [PATCH] in_tail: Fix a stall bug on !follow_inode case (#4327) Fix #3614 Although known stall issues of in_tail on `follow_inode` case are fixed in v1.16.2, it has still a similar problem on `!follow_inode` case. In this case, a tail watcher is possible to mark the position entry as `unwatched` if it's tansitioned to `rotate_wait` state by `refresh_watcher` even if another newer tail watcher is managing it. It's hard to occur in usual because `stat_watcher` will be called immediately after the file is changed while `refresh_wather` is called every 60 seconds by default. However, there is a rare possibility that this order might be swapped especillay if in_tail is busy on processing large amount of logs. Because in_tail is single threadied, event queues such as timers or inotify will be stucked in this case. There is no such problem on `follow_inode` case because position entries are always marked as `unwatched` before entering `rotate_wait` state. --------- Signed-off-by: Takuro Ashie Co-authored-by: Daijiro Fukuda --- lib/fluent/plugin/in_tail.rb | 2 +- test/plugin/test_in_tail.rb | 105 +++++++++++++++++++++++++++++++++++ 2 files changed, 106 insertions(+), 1 deletion(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 54fd900039..7ef6377aab 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -564,7 +564,7 @@ def detach_watcher(tw, ino, close_io = true) tw.close if close_io - if tw.unwatched && @pf + if @pf && tw.unwatched && (@follow_inode || !@tails[tw.path]) target_info = TargetInfo.new(tw.path, ino) @pf.unwatch(target_info) end diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 1546bf9d4d..8919866683 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -3017,4 +3017,109 @@ def test_path_resurrection ) end end + + sub_test_case "Update watchers for rotation without follow_inodes" do + # The scenario where in_tail wrongly unwatches the PositionEntry. + # This is reported in https://github.com/fluent/fluentd/issues/3614. + def test_refreshTW_during_rotation + config = config_element( + "ROOT", + "", + { + "path" => "#{@tmp_dir}/tail.txt0", + "pos_file" => "#{@tmp_dir}/tail.pos", + "tag" => "t1", + "format" => "none", + "read_from_head" => "true", + # In order to detach the old watcher quickly. + "rotate_wait" => "3s", + # In order to reproduce the same condition stably, ensure that `refresh_watchers` is not + # called by a timer. + "refresh_interval" => "1h", + # stat_watcher often calls `TailWatcher::on_notify` faster than creating a new log file, + # so disable it in order to reproduce the same condition stably. + "enable_stat_watcher" => "false", + } + ) + d = create_driver(config, false) + + tail_watchers = [] + stub.proxy(d.instance).setup_watcher do |tw| + tail_watchers.append(tw) + tw + end + + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file1 log1"} + + d.run(expect_records: 6, timeout: 15) do + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file1 log2"} + FileUtils.move("#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt" + "1") + + # This reproduces the following situation: + # `refresh_watchers` is called during the rotation process and it detects the current file being lost. + # Then it stops and unwatches the TailWatcher. + d.instance.refresh_watchers + + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file2 log1"} + + # `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` trys to add the new TailWatcher. + # After `rotate_wait` interval, the PositionEntry is unwatched. + # HOWEVER, the new TailWatcher is still using that PositionEntry, so this breaks the PositionFile!! + # That PositionEntry is removed from `PositionFile::map`, but it is still working and remaining in the real pos file. + sleep 5 + + # Append to the new current log file. + # The PositionEntry is updated although it does not exist in `PositionFile::map`. + # `PositionFile::map`: empty + # Real pos file: `.../tail.txt 0000000000000016 (inode)` + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file2 log2"} + + # Rotate again + [1, 0].each do |i| + FileUtils.move("#{@tmp_dir}/tail.txt#{i}", "#{@tmp_dir}/tail.txt#{i + 1}") + end + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file3 log1"} + + # `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` trys to update the TailWatcher. + sleep 3 + + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file3 log2"} + + # Wait `rotate_wait` for file2 to make sure to close all IO handlers + sleep 3 + end + + inode_0 = tail_watchers[0]&.ino + inode_1 = tail_watchers[1]&.ino + inode_2 = tail_watchers[2]&.ino + record_values = d.events.collect { |event| event[2]["message"] }.sort + position_entries = [] + Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") do |f| + f.readlines(chomp: true).each do |line| + values = line.split("\t") + position_entries.append([values[0], values[1], values[2].to_i(16)]) + end + end + + assert_equal( + { + record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2", "file3 log1", "file3 log2"], + tail_watcher_paths: ["#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt0"], + tail_watcher_inodes: [inode_0, inode_1, inode_2], + tail_watcher_io_handler_opened_statuses: [false, false, false], + position_entries: [ + # The recorded path is old, but it is no problem. The path is not used when using follow_inodes. + ["#{@tmp_dir}/tail.txt0", "0000000000000016", inode_2], + ], + }, + { + record_values: record_values, + tail_watcher_paths: tail_watchers.collect { |tw| tw.path }, + tail_watcher_inodes: tail_watchers.collect { |tw| tw.ino }, + tail_watcher_io_handler_opened_statuses: tail_watchers.collect { |tw| tw.instance_variable_get(:@io_handler)&.opened? || false }, + position_entries: position_entries + }, + ) + end + end end