Skip to content

Commit

Permalink
parser: Add timeout parameter to avoid parsing stuck. fix #2464
Browse files Browse the repository at this point in the history
Signed-off-by: Masahiro Nakagawa <repeatedly@gmail.com>
  • Loading branch information
repeatedly committed Jul 22, 2019
1 parent 58fd527 commit 491dd08
Showing 1 changed file with 77 additions and 0 deletions.
77 changes: 77 additions & 0 deletions lib/fluent/plugin/parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,60 @@
require 'fluent/plugin/base'
require 'fluent/plugin/owned_by_mixin'

require 'fluent/error'
require 'fluent/mixin' # for TypeConverter
require 'fluent/time'
require 'fluent/plugin/string_util'

require 'serverengine/blocking_flag'

module Fluent
module Plugin
class Parser < Base
class TimeoutChecker
# This implementation now uses mutex because parser is typically used in input.
# If this has a performance issue under high concurreny, use concurrent-ruby's map instead.
def initialize(timeout)
@map = {}
@flag = ServerEngine::BlockingFlag.new
@mutex = Mutex.new
@timeout = timeout
end

def start
@thread = ::Thread.new {
until @flag.wait_for_set(0.5)
@mutex.synchronize {
now = Time.now
@map.keys.each { |th|
time = @map[th]
if now - time > @timeout
th.raise UnrecoverableError, "parsing timed out"
@map.delete(th)
end
}
}
end
}
end

def stop
@flag.set!
@thread.join
end

def execute
th = Thread.current
@mutex.synchronize {
@map[th] = Time.now
}
yield
ensure
# Need clean up here becuase if next event is delayed, incorrect exception will be raised in normal exceution flow.
@map.delete(th)
end
end

include OwnedByMixin
include TimeMixin::Parser

Expand All @@ -47,6 +94,7 @@ class ParserError < StandardError; end
config_param :null_empty_string, :bool, default: false
config_param :estimate_current_event, :bool, default: true
config_param :keep_time_key, :bool, default: false
config_param :timeout, :time, default: nil

AVAILABLE_PARSER_VALUE_TYPES = ['string', 'integer', 'float', 'bool', 'time', 'array']

Expand All @@ -65,12 +113,41 @@ def configure(conf)
@null_value_regexp = @null_value_pattern && Regexp.new(@null_value_pattern)
@type_converters = build_type_converters(@types)
@execute_convert_values = @type_converters || @null_value_regexp || @null_empty_string
if @timeout
class << self
alias_method :parse_orig, :parse
alias_method :parse, :parse_with_timeout
end
@timeout_checker = TimeoutChecker.new(@timeout)
end
end

def start
super

@timeout_checker.start if @timeout_checker
end

def stop
super

@timeout_checker.stop if @timeout_checker
end

def parse(text, &block)
raise NotImplementedError, "Implement this method in child class"
end

def parse_with_timeout(text, &block)
@timeout_checker.execute {
parse_orig(text, &block)
}
rescue UnrecoverableError
log.error "parsing timed out with #{self.class}: text = #{text}"
# Return nil instead of raising error. in_tail or other plugin can emit broken line.
yield nil, nil
end

def call(*a, &b)
# Keep backward compatibility for existing plugins
# TODO: warn when deprecated
Expand Down

0 comments on commit 491dd08

Please sign in to comment.