diff --git a/lib/fluent/logger/fluent_logger.rb b/lib/fluent/logger/fluent_logger.rb index d9e436a..b998277 100644 --- a/lib/fluent/logger/fluent_logger.rb +++ b/lib/fluent/logger/fluent_logger.rb @@ -81,6 +81,7 @@ def initialize(tag_prefix, *args) @connect_error_history = [] @limit = options[:buffer_limit] || BUFFER_LIMIT + @log_reconnect_error_threshold = options[:log_reconnect_error_threshold] || RECONNECT_WAIT_MAX_COUNT if logger = options[:logger] @logger = logger @@ -101,7 +102,7 @@ def initialize(tag_prefix, *args) end end - attr_accessor :limit, :logger + attr_accessor :limit, :logger, :log_reconnect_error_threshold def post_with_time(tag, map, time) @logger.debug { "event: #{tag} #{map.to_json}" rescue nil } @@ -142,6 +143,14 @@ def to_msgpack(msg) end end + def suppress_sec + if (sz = @connect_error_history.size) < RECONNECT_WAIT_MAX_COUNT + RECONNECT_WAIT * (RECONNECT_WAIT_INCR_RATE ** (sz - 1)) + else + RECONNECT_WAIT_MAX + end + end + def write(msg) begin data = to_msgpack(msg) @@ -159,11 +168,6 @@ def write(msg) # suppress reconnection burst if !@connect_error_history.empty? && @pending.bytesize <= @limit - if (sz = @connect_error_history.size) < RECONNECT_WAIT_MAX_COUNT - suppress_sec = RECONNECT_WAIT * (RECONNECT_WAIT_INCR_RATE ** (sz - 1)) - else - suppress_sec = RECONNECT_WAIT_MAX - end if Time.now.to_i - @connect_error_history.last < suppress_sec return false end @@ -210,13 +214,24 @@ def connect! @con = TCPSocket.new(@host, @port) @con.sync = true @connect_error_history.clear + @logged_reconnect_error = false rescue @connect_error_history << Time.now.to_i if @connect_error_history.size > RECONNECT_WAIT_MAX_COUNT @connect_error_history.shift end + + if @connect_error_history.size >= @log_reconnect_error_threshold && !@logged_reconnect_error + log_reconnect_error + @logged_reconnect_error = true + end + raise end + + def log_reconnect_error + @logger.error("FluentLogger: Can't connect to #{@host}:#{@port}(#{@connect_error_history.size} retried): #{$!}") + end end diff --git a/spec/fluent_logger_spec.rb b/spec/fluent_logger_spec.rb index 8ee5162..4a8143d 100644 --- a/spec/fluent_logger_spec.rb +++ b/spec/fluent_logger_spec.rb @@ -225,6 +225,19 @@ def wait_transfer logger_io.rewind logger_io.read.should =~ /Can't send logs to/ end + + it ('log connect error once') do + logger.stub(:suppress_sec).and_return(-1) + logger.log_reconnect_error_threshold = 1 + logger.should_receive(:log_reconnect_error).once.and_call_original + + logger.post('tag', {'a' => 'b'}) + wait_transfer # even if wait + logger.post('tag', {'a' => 'b'}) + wait_transfer # even if wait + logger_io.rewind + logger_io.read.should =~ /Can't connect to/ + end end end