Skip to content

Commit

Permalink
buffer: Avoid to process discarded chunks in write_step_by_step (for …
Browse files Browse the repository at this point in the history
…v1.16) (#4363)

It fixes following error when many `chunk bytes limit exceeds` errors
are occurred:
```
2020-07-28 14:59:26 +0000 [warn]: #0 emit transaction failed: error_class=IOError error="closed stream" location="/fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/buffer/file_chunk.rb:82:in `pos'" tag="cafiscode-eks-cluster.default"
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/buffer/file_chunk.rb:82:in `pos'
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/buffer/file_chunk.rb:82:in `rollback'
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/buffer.rb:339:in `rescue in block in write'
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/buffer.rb:332:in `block in write'
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/buffer.rb:331:in `each'
  ...
```

Fix #3089

Signed-off-by: Takuro Ashie <ashie@clear-code.com>
  • Loading branch information
ashie committed Dec 14, 2023
1 parent d3cf2e0 commit e2d7885
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 17 deletions.
36 changes: 19 additions & 17 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,6 @@ def write_once(metadata, data, format: nil, size: nil, &block)

def write_step_by_step(metadata, data, format, splits_count, &block)
splits = []
errors = []
if splits_count > data.size
splits_count = data.size
end
Expand All @@ -749,23 +748,23 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
modified_chunks = []
modified_metadata = metadata
get_next_chunk = ->(){
c = if staged_chunk_used
# Staging new chunk here is bad idea:
# Recovering whole state including newly staged chunks is much harder than current implementation.
modified_metadata = modified_metadata.dup_next
generate_chunk(modified_metadata)
else
synchronize { @stage[modified_metadata] ||= generate_chunk(modified_metadata).staged! }
end
modified_chunks << c
c
if staged_chunk_used
# Staging new chunk here is bad idea:
# Recovering whole state including newly staged chunks is much harder than current implementation.
modified_metadata = modified_metadata.dup_next
generate_chunk(modified_metadata)
else
synchronize { @stage[modified_metadata] ||= generate_chunk(modified_metadata).staged! }
end
}

writing_splits_index = 0
enqueue_chunk_before_retry = false

while writing_splits_index < splits.size
chunk = get_next_chunk.call
errors = []
modified_chunks << {chunk: chunk, adding_bytesize: 0, errors: errors}
chunk.synchronize do
raise ShouldRetry unless chunk.writable?
staged_chunk_used = true if chunk.staged?
Expand Down Expand Up @@ -851,15 +850,18 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
raise
end

block.call(chunk, chunk.bytesize - original_bytesize, errors)
errors = []
modified_chunks.last[:adding_bytesize] = chunk.bytesize - original_bytesize
end
end
modified_chunks.each do |data|
block.call(data[:chunk], data[:adding_bytesize], data[:errors])
end
rescue ShouldRetry
modified_chunks.each do |mc|
mc.rollback rescue nil
if mc.unstaged?
mc.purge rescue nil
modified_chunks.each do |data|
chunk = data[:chunk]
chunk.rollback rescue nil
if chunk.unstaged?
chunk.purge rescue nil
end
end
enqueue_chunk(metadata) if enqueue_chunk_before_retry
Expand Down
51 changes: 51 additions & 0 deletions test/plugin/test_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,57 @@ def create_chunk_es(metadata, es)
test '#compress returns :text' do
assert_equal :text, @p.compress
end

# https://github.com/fluent/fluentd/issues/3089
test "closed chunk should not be committed" do
assert_equal 8 * 1024 * 1024, @p.chunk_limit_size
assert_equal 0.95, @p.chunk_full_threshold

purge_count = 0

stub.proxy(@p).generate_chunk(anything) do |chunk|
stub.proxy(chunk).purge do |result|
purge_count += 1
result
end
stub.proxy(chunk).commit do |result|
assert_false(chunk.closed?)
result
end
stub.proxy(chunk).rollback do |result|
assert_false(chunk.closed?)
result
end
chunk
end

m = @p.metadata(timekey: Time.parse('2016-04-11 16:40:00 +0000').to_i)
small_row = "x" * 1024 * 400
big_row = "x" * 1024 * 1024 * 8 # just `chunk_size_limit`, it does't cause BufferOverFlowError.

# Write 42 events in 1 event stream, last one is for triggering `ShouldRetry`
@p.write({m => [small_row] * 40 + [big_row] + ["x"]})

# Above event strem will be splitted twice by `Buffer#write_step_by_step`
#
# 1. `write_once`: 42 [events] * 1 [stream]
# 2. `write_step_by_step`: 4 [events]* 10 [streams] + 2 [events] * 1 [stream]
# 3. `write_step_by_step` (by `ShouldRetry`): 1 [event] * 42 [streams]
#
# The problematic data is built in the 2nd stage.
# In the 2nd stage, 5 streams are packed in a chunk.
# ((1024 * 400) [bytes] * 4 [events] * 5 [streams] = 8192000 [bytes] < `chunk_limit_size` (8MB)).
# So 3 chunks are used to store all data.
# The 1st chunk is already staged by `write_once`.
# The 2nd & 3rd chunks are newly created as unstaged.
# The 3rd chunk is purged before `ShouldRetry`, it's no problem:
# https://github.com/fluent/fluentd/blob/7e9eba736ff40ad985341be800ddc46558be75f2/lib/fluent/plugin/buffer.rb#L850
# The 2nd chunk is purged in `rescue ShouldRetry`:
# https://github.com/fluent/fluentd/blob/7e9eba736ff40ad985341be800ddc46558be75f2/lib/fluent/plugin/buffer.rb#L862
# It causes the issue described in https://github.com/fluent/fluentd/issues/3089#issuecomment-1811839198

assert_equal 2, purge_count
end
end

sub_test_case 'standard format with configuration for test with lower chunk limit size' do
Expand Down

0 comments on commit e2d7885

Please sign in to comment.