Skip to content

Commit

Permalink
Make sure log events generated at shutdown phase of plugins (except
Browse files Browse the repository at this point in the history
corresponding filter and output) are emitted
  • Loading branch information
Yuki Ito committed Jul 12, 2017
1 parent 004bae1 commit 8071cfd
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 7 deletions.
52 changes: 50 additions & 2 deletions lib/fluent/agent.rb
Expand Up @@ -38,6 +38,11 @@ def initialize(opts = {})
@started_outputs = []
@started_filters = []

@outputs_for_log_event = []
@filters_for_log_event = []
@started_outputs_for_log_event = []
@started_filters_for_log_event = []

@log = Engine.log
@event_router = EventRouter.new(NoMatchMatch.new(log), self)
@error_collector = nil
Expand Down Expand Up @@ -70,16 +75,47 @@ def start
@outputs.each { |o|
o.start
@started_outputs << o
@started_outputs_for_log_event << o if @outputs_for_log_event.include?(o)
}

@filters.each { |f|
f.start
@started_filters << f
@started_filters_for_log_event << f if @filters_for_log_event.include?(f)
}
end

def shutdown
@started_filters.map { |f|
(@started_filters - @started_filters_for_log_event).map { |f|
Thread.new do
begin
log.info "shutting down filter#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_name_from_class(f.class), plugin_id: f.plugin_id
f.shutdown
rescue => e
log.warn "unexpected error while shutting down filter plugins", plugin: f.class, plugin_id: f.plugin_id, error_class: e.class, error: e
log.warn_backtrace
end
end
}.each { |t| t.join }

# Output plugin as filter emits records at shutdown so emit problem still exist.
# This problem will be resolved after actual filter mechanizm.
(@started_outputs - @started_outputs_for_log_event).map { |o|
Thread.new do
begin
log.info "shutting down output#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_name_from_class(o.class), plugin_id: o.plugin_id
o.shutdown
rescue => e
log.warn "unexpected error while shutting down output plugins", plugin: o.class, plugin_id: o.plugin_id, error_class: e.class, error: e
log.warn_backtrace
end
end
}.each { |t| t.join }

## execute callback from Engine to flush log event queue before shutting down corresponding filters and outputs
yield if block_given?

@started_filters_for_log_event.map { |f|
Thread.new do
begin
log.info "shutting down filter#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_name_from_class(f.class), plugin_id: f.plugin_id
Expand All @@ -93,7 +129,7 @@ def shutdown

# Output plugin as filter emits records at shutdown so emit problem still exist.
# This problem will be resolved after actual filter mechanizm.
@started_outputs.map { |o|
@started_outputs_for_log_event.map { |o|
Thread.new do
begin
log.info "shutting down output#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_name_from_class(o.class), plugin_id: o.plugin_id
Expand Down Expand Up @@ -134,6 +170,10 @@ def add_match(type, pattern, conf)
@outputs << output
@event_router.add_rule(pattern, output)

if match_event_log_tag?(pattern)
@outputs_for_log_event << output
end

output
end

Expand All @@ -146,9 +186,17 @@ def add_filter(type, pattern, conf)
@filters << filter
@event_router.add_rule(pattern, filter)

if match_event_log_tag?(pattern)
@filters_for_log_event << filter
end

filter
end

def match_event_log_tag?(pattern)
EventRouter::Rule.new(pattern, nil).match?($log.tag)
end

# For handling invalid record
def emit_error_event(tag, time, record, error)
end
Expand Down
13 changes: 8 additions & 5 deletions lib/fluent/engine.rb
Expand Up @@ -210,10 +210,13 @@ def run
ensure
$log.info "shutting down fluentd"
if @log_emit_thread
@log_event_loop_stop = true
@log_emit_thread.join
shutdown do
@log_event_loop_stop = true
@log_emit_thread.join
end
else
shutdown
end
shutdown
end
end

Expand All @@ -237,8 +240,8 @@ def start
@root_agent.start
end

def shutdown
@root_agent.shutdown
def shutdown(&block)
@root_agent.shutdown(&block)
end
end

Expand Down

0 comments on commit 8071cfd

Please sign in to comment.