Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add follow_inodes flag to tail files by inodes, not by names #1660

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
189 changes: 148 additions & 41 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
require 'fluent/event'
require 'fluent/plugin/buffer'
require 'fluent/plugin/parser_multiline'
require 'pathname'

if Fluent.windows?
require_relative 'file_wrapper'
Expand Down Expand Up @@ -84,6 +85,8 @@ def initialize
config_param :skip_refresh_on_startup, :bool, default: false
desc 'Ignore repeated permission error logs'
config_param :ignore_repeated_permission_error, :bool, default: false
desc 'Follow inodes instead of following file names. Guarantees more stable delivery and allows to use * in path pattern with rotating files'
config_param :follow_inodes, :bool, default: false

attr_reader :paths

Expand All @@ -110,6 +113,9 @@ def configure(conf)

# TODO: Use plugin_root_dir and storage plugin to store positions if available
unless @pos_file
if @follow_inodes
raise Fluent::ConfigError, "Can't follow inodes without pos_file configuration parameter"
end
$log.warn "'pos_file PATH' parameter is not set to a 'tail' source."
$log.warn "this parameter is highly recommended to save the position to resume tailing."
end
Expand Down Expand Up @@ -162,7 +168,7 @@ def start
if @pos_file
@pf_file = File.open(@pos_file, File::RDWR|File::CREAT|File::BINARY, @file_perm)
@pf_file.sync = true
@pf = PositionFile.parse(@pf_file)
@pf = PositionFile.parse(@pf_file, @follow_inodes, expand_paths)
end

refresh_watchers unless @skip_refresh_on_startup
Expand Down Expand Up @@ -213,8 +219,30 @@ def expand_paths
paths << path
end
}
excluded = @exclude_path.map { |path| path = date.strftime(path); path.include?('*') ? Dir.glob(path) : path }.flatten.uniq
paths - excluded
excluded = @exclude_path.map {|path| path = date.strftime(path); path.include?('*') ? Dir.glob(path) : path}.flatten.uniq
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't change syntax/indent.

to_return = paths - excluded
# filter out non existing files, so in case pattern is without '*' we don't do unnecessary work
to_return = to_return.select {|path|
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(paths - excluded).to_return.select {|path| seems enough

Pathname.new(path).exist?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FileTest#exist? is more lightweight because no object instantiation.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, thx, didn't know about that option

}
Hash[to_return.map {|path|
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Long lines in Hash[] seems hard to read.
This map can be moved to above line. {}.select {}.map {}.

tuple = PathInodeTuple.new(path, Fluent::FileWrapper.stat(path).ino)
if @follow_inodes
[tuple.ino, tuple]
else
[tuple.path, tuple]
end
}]
end

def existence_path
Hash[@tails.keys.map {|path_ino|
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With lots of files, e.g. 1000+ files, this map creates lots of object.
So avoiding map is better for the performance.

Of course, expand_paths's map is also same.

if @follow_inodes
[path_ino.ino, path_ino]
else
[path_ino.path, path_ino]
end
}]
end

# in_tail with '*' path doesn't check rotation file equality at refresh phase.
Expand All @@ -223,31 +251,33 @@ def expand_paths
# In such case, you should separate log directory and specify two paths in path parameter.
# e.g. path /path/to/dir/*,/path/to/rotated_logs/target_file
def refresh_watchers
target_paths = expand_paths
existence_paths = @tails.keys
target_paths_hash = expand_paths
existence_paths_hash = existence_path

unwatched = existence_paths - target_paths
added = target_paths - existence_paths
unwatched_hash = existence_paths_hash.reject {|key, value| target_paths_hash.key?(key)}
added_hash = target_paths_hash.reject {|key, value| existence_paths_hash.key?(key)}

stop_watchers(unwatched, immediate: false, unwatched: true) unless unwatched.empty?
start_watchers(added) unless added.empty?
stop_watchers(unwatched_hash.values, immediate: false, unwatched: true) unless unwatched_hash.empty?
start_watchers(added_hash.values) unless added_hash.empty?
end

def setup_watcher(path, pe)
def setup_watcher(path, ino, pe)
line_buffer_timer_flusher = (@multiline_mode && @multiline_flush_interval) ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil
tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, @enable_watch_timer, @read_lines_limit, method(:update_watcher), line_buffer_timer_flusher, @from_encoding, @encoding, open_on_every_update, &method(:receive_lines))
tw = TailWatcher.new(path, ino, @rotate_wait, pe, log, @read_from_head, @enable_watch_timer, @read_lines_limit, method(:update_watcher), line_buffer_timer_flusher, @from_encoding, @encoding, open_on_every_update, @follow_inodes, &method(:receive_lines))
tw.attach do |watcher|
watcher.timer_trigger = timer_execute(:in_tail_timer_trigger, 1, &watcher.method(:on_notify)) if watcher.enable_watch_timer
event_loop_attach(watcher.stat_trigger)
end
tw
end

def start_watchers(paths)
paths.each { |path|
def start_watchers(paths_with_inodes)
paths_with_inodes.each { |path_with_inode|
path = path_with_inode.path
ino = path_with_inode.ino
pe = nil
if @pf
pe = @pf[path]
pe = @pf[path, ino]
if @read_from_head && pe.read_inode.zero?
begin
pe.update(Fluent::FileWrapper.stat(path).ino, 0)
Expand All @@ -257,19 +287,19 @@ def start_watchers(paths)
end
end

@tails[path] = setup_watcher(path, pe)
@tails[path_with_inode] = setup_watcher(path, ino, pe)
}
end

def stop_watchers(paths, immediate: false, unwatched: false, remove_watcher: true)
paths.each { |path|
paths.each {|path|
tw = remove_watcher ? @tails.delete(path) : @tails[path]
if tw
tw.unwatched = unwatched
if immediate
detach_watcher(tw, false)
detach_watcher(tw, path.ino, false)
else
detach_watcher_after_rotate_wait(tw)
detach_watcher_after_rotate_wait(tw, path.ino)
end
end
}
Expand All @@ -285,34 +315,46 @@ def close_watcher_handles
end

# refresh_watchers calls @tails.keys so we don't use stop_watcher -> start_watcher sequence for safety.
def update_watcher(path, pe)
def update_watcher(path, inode, pe)
if @pf
unless pe.read_inode == @pf[path].read_inode
log.trace "Skip update_watcher because watcher has been already updated by other inotify event"
unless pe.read_inode == @pf[path, pe.read_inode].read_inode
return
end
end
rotated_tw = @tails[path]
@tails[path] = setup_watcher(path, pe)
detach_watcher_after_rotate_wait(rotated_tw) if rotated_tw

tuple = PathInodeTuple.new(path, pe.read_inode)
rotated_tw = @tails[tuple]

new_tuple = PathInodeTuple.new(path, inode)

if @follow_inodes
new_position_entry = @pf[path, inode]

if new_position_entry.read_inode == 0
@tails[new_tuple] = setup_watcher(path, inode, new_position_entry)
end
else
@tails[new_tuple] = setup_watcher(path, inode, pe)
end
detach_watcher_after_rotate_wait(rotated_tw, pe.read_inode) if rotated_tw
end

# TailWatcher#close is called by another thread at shutdown phase.
# It causes 'can't modify string; temporarily locked' error in IOHandler
# so adding close_io argument to avoid this problem.
# At shutdown, IOHandler's io will be released automatically after detached the event loop
def detach_watcher(tw, close_io = true)
def detach_watcher(tw, inode, close_io = true)
tw.detach
tw.close if close_io
flush_buffer(tw)
if tw.unwatched && @pf
@pf[tw.path].update_pos(PositionFile::UNWATCHED_POSITION)
@pf[tw.path, inode].update_pos(PositionFile::UNWATCHED_POSITION)
end
end

def detach_watcher_after_rotate_wait(tw)
def detach_watcher_after_rotate_wait(tw, inode)
timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do
detach_watcher(tw)
detach_watcher(tw, inode)
end
end

Expand Down Expand Up @@ -426,15 +468,17 @@ def parse_multilines(lines, tail_watcher)
end

class TailWatcher
def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, read_lines_limit, update_watcher, line_buffer_timer_flusher, from_encoding, encoding, open_on_every_update, &receive_lines)
def initialize(path, ino, rotate_wait, pe, log, read_from_head, enable_watch_timer, read_lines_limit, update_watcher, line_buffer_timer_flusher, from_encoding, encoding, open_on_every_update, follow_inodes, &receive_lines)
@path = path
@ino = ino
@rotate_wait = rotate_wait
@pe = pe || MemoryPositionEntry.new
@read_from_head = read_from_head
@enable_watch_timer = enable_watch_timer
@read_lines_limit = read_lines_limit
@receive_lines = receive_lines
@update_watcher = update_watcher
@follow_inodes = follow_inodes

@stat_trigger = StatWatcher.new(self, &method(:on_notify))
@timer_trigger = nil
Expand All @@ -449,7 +493,7 @@ def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, r
@open_on_every_update = open_on_every_update
end

attr_reader :path
attr_reader :path, :ino
attr_reader :log, :pe, :read_lines_limit, :open_on_every_update
attr_reader :from_encoding, :encoding
attr_reader :stat_trigger, :enable_watch_timer
Expand Down Expand Up @@ -551,7 +595,16 @@ def on_rotate(stat)
@log.info log_msg

if watcher_needs_update
@update_watcher.call(@path, swap_state(@pe))
# No need to update a watcher if stat is nil (file not present), because moving to inodes will create
# new watcher, and old watcher will be closed by stop_watcher in refresh_watchers method
if stat
if @follow_inodes
# don't want to swap state because we need latest read offset in pos file even after rotate_wait
@update_watcher.call(@path, stat.ino, @pe)
else
@update_watcher.call(@path, stat.ino, swap_state(@pe))
end
end
else
@io_handler = IOHandler.new(self, &method(:wrap_receive_lines))
end
Expand Down Expand Up @@ -787,17 +840,30 @@ def reset_timer
end
end

class PathWithInode
def initialize(path, inode)
@path = path
@inode = inode
end

attr_accessor :path, :inode
end

class PositionFile
UNWATCHED_POSITION = 0xffffffffffffffff

def initialize(file, map, last_pos)
def initialize(file, map, last_pos, follow_inodes)
@file = file
@map = map
@last_pos = last_pos
@follow_inodes = follow_inodes
end

def [](path)
if m = @map[path]
def [](path, inode)
if @follow_inodes && m = @map[inode]
return m
end
if !@follow_inodes && m = @map[path]
return m
end

Expand All @@ -808,28 +874,37 @@ def [](path)
@file.write "0000000000000000\t0000000000000000\n"
@last_pos = @file.pos

@map[path] = FilePositionEntry.new(@file, seek)
if @follow_inodes
@map[inode] = FilePositionEntry.new(@file, seek)
else
@map[path] = FilePositionEntry.new(@file, seek)
end
end

def self.parse(file)
compact(file)
def self.parse(file, follow_inodes, existing_paths)
compact(file, follow_inodes, existing_paths)

map = {}
file.pos = 0
file.each_line {|line|
m = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.match(line)
next unless m
path = m[1]
ino = m[3].to_i(16)
seek = file.pos - line.bytesize + path.bytesize + 1
map[path] = FilePositionEntry.new(file, seek)
if follow_inodes
map[ino] = FilePositionEntry.new(file, seek)
else
map[path] = FilePositionEntry.new(file, seek)
end
}
new(file, map, file.pos)
new(file, map, file.pos, follow_inodes)
end

# Clean up unwatched file entries
def self.compact(file)
def self.compact(file, follow_inodes, existing_paths)
file.pos = 0
existent_entries = file.each_line.map { |line|
existent_entries = file.each_line.map {|line|
m = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.match(line)
next unless m
path = m[1]
Expand All @@ -839,10 +914,21 @@ def self.compact(file)
pos == UNWATCHED_POSITION ? nil : ("%s\t%016x\t%016x\n" % [path, pos, ino])
}.compact

existent_entries = remove_deleted_files_entries(existent_entries, existing_paths) if follow_inodes

file.pos = 0
file.truncate(0)
file.write(existent_entries.join)
end

def self.remove_deleted_files_entries(existent_entries, existing_paths)
filtered_entries = existent_entries.select {|file_entry|
m = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.match(file_entry)
ino = m[3].to_i(16)
existing_paths.key?(ino)
}
filtered_entries
end
end

# pos inode
Expand Down Expand Up @@ -910,5 +996,26 @@ def read_inode
@inode
end
end

class PathInodeTuple
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't check eql? and other method behaviour of struct object but can't use struct instead of class for this case?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've checked eql? of struct - it work as expected, will change to Struct

def initialize(path, ino)
@path = path
@ino = ino
end

attr_accessor :path, :ino

def eql?(other)
path == other.path && ino == other.ino
end

def ==(other)
self.eql?(other)
end

def hash
[path, ino].hash
end
end
end
end