Skip to content

Commit

Permalink
Modify TimeSlicedOutput#emit mechanizm to fit BufferedOutput#emit way
Browse files Browse the repository at this point in the history
Current TimeSlicedOutput#emit emits records to buffer one by one.
It causes inconsistent state when queue has a problem.
For example, some records are stored into buffers and
raises BufferQueueLimitError. In this situation,
logs are duplicated if input plugin retries.
BufferedOutput and ObjectBufferedOutput uses another way.
These outputs format all records into data chunk first.
It is more safer than current TimeSlicedOutput way.
  • Loading branch information
repeatedly committed Jul 16, 2015
1 parent 29261f2 commit 8798b63
Showing 1 changed file with 5 additions and 1 deletion.
6 changes: 5 additions & 1 deletion lib/fluent/output.rb
Expand Up @@ -539,6 +539,7 @@ def configure(conf)

def emit(tag, es, chain)
@emit_count += 1
formatted_data = {}
es.each {|time,record|
tc = time / @time_slice_cache_interval
if @before_tc == tc
Expand All @@ -548,7 +549,10 @@ def emit(tag, es, chain)
key = @time_slicer.call(time)
@before_key = key
end
data = format(tag, time, record)
formatted_data[key] ||= ''
formatted_data[key] << format(tag, time, record)
}
formatted_data.each { |key, data|
if @buffer.emit(key, data, chain)
submit_flush
end
Expand Down

0 comments on commit 8798b63

Please sign in to comment.