Skip to content

Commit

Permalink
Don't raise exception when each message size is smaller enough
Browse files Browse the repository at this point in the history
Follow up #3553

In the previous versions, even though each record size is smaller
than chunk limit size, but whole message size exceeds
chunk limit size, BufferChunkOverflowError is raised unexpectedly.

For example, if chunk limit size is 1_280_000, when processing 3 event
stream (every 1_000_000 bytes), it throws an exception like this:

  Fluent::Plugin::Buffer::BufferChunkOverflowError(<a 1000025 bytes
  record (nth: 1) is larger than buffer chunk limit size, a 1000025
  bytes record (nth: 2) is larger than buffer chunk limit size>)

Now changed not to raise exception if it's record size is smaller
enough than chunk limit size. Thus each message is stored into
separated chunks.

The idea is based on that adding byte size is smaller than chunk
limit size, chunk should be unstaged and pushed into queue,
If not, it should be skipped like #3553.

Signed-off-by: Kentaro Hayashi <hayashi@clear-code.com>
  • Loading branch information
kenhys committed Dec 17, 2021
1 parent 30132e1 commit cca4f44
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 11 deletions.
38 changes: 27 additions & 11 deletions lib/fluent/plugin/buffer.rb
Expand Up @@ -767,7 +767,7 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
raise ShouldRetry unless chunk.writable?
staged_chunk_used = true if chunk.staged?

original_bytesize = chunk.bytesize
original_bytesize = committed_bytesize = chunk.bytesize
begin
while writing_splits_index < splits.size
split = splits[writing_splits_index]
Expand All @@ -778,12 +778,18 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
# so, keep already processed 'split' content here.
# (allow performance regression a bit)
chunk.commit
committed_bytesize = chunk.bytesize
else
big_record_size = formatted_split.bytesize
if chunk.bytesize + big_record_size > @chunk_limit_size
if big_record_size > @chunk_limit_size
# Just skip to next split (current split is ignored)
errors << "a #{big_record_size} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})"
writing_splits_index += 1
next
elsif chunk.bytesize + big_record_size > @chunk_limit_size
# No doubt that the split is expected to cause size over, keep 'split' content here.
chunk.commit
committed_bytesize = chunk.bytesize
end
end
end
Expand All @@ -793,19 +799,29 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
else
chunk.append(split, compress: @compress)
end
adding_bytes = chunk.bytesize - committed_bytesize

if chunk_size_over?(chunk) # split size is larger than difference between size_full? and size_over?
adding_bytes = chunk.instance_eval { @adding_bytes } || "N/A" # 3rd party might not have 'adding_bytes'
chunk.rollback

committed_bytesize = chunk.bytesize
if split.size == 1 && original_bytesize == 0
# It is obviously case that BufferChunkOverflowError should be raised here,
# but if it raises here, already processed 'split' or
# the proceeding 'split' will be lost completely.
# so it is a last resort to delay raising such a exception
errors << "concatenated/appended a #{adding_bytes} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})"
writing_splits_index += 1
next
if adding_bytes < @chunk_limit_size
# As already processed content is kept after rollback, then unstaged chunk should be queued.
# After that, re-process current split again.
# New chunk should be allocated, to do it, modify @stage and so on.
synchronize { @stage.delete(modified_metadata) }
staged_chunk_used = false
chunk.unstaged!
break
else
# It is obviously case that BufferChunkOverflowError should be raised here,
# but if it raises here, already processed 'split' or
# the proceeding 'split' will be lost completely.
# so it is a last resort to delay raising such a exception
errors << "concatenated/appended a #{adding_bytes} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})"
writing_splits_index += 1
next
end
end

if chunk_size_full?(chunk) || split.size == 1
Expand Down
77 changes: 77 additions & 0 deletions test/plugin/test_buffer.rb
Expand Up @@ -990,6 +990,51 @@ def create_chunk_es(metadata, es)
assert_equal [@dm0], @p.queue.map(&:metadata)
assert_equal [5000], @p.queue.map(&:size)
end

test "confirm that every message which is smaller than chunk threshold does not raise BufferChunkOverflowError" do
assert_equal [@dm0], @p.stage.keys
assert_equal [], @p.queue.map(&:metadata)
timestamp = event_time('2016-04-11 16:00:02 +0000')
es = Fluent::ArrayEventStream.new([[timestamp, {"message" => "a" * 1_000_000}],
[timestamp, {"message" => "b" * 1_000_000}],
[timestamp, {"message" => "c" * 1_000_000}]])

# https://github.com/fluent/fluentd/issues/1849
# Even though 1_000_000 < 1_280_000 (chunk_limit_size), it raised BufferChunkOverflowError before.
# It should not be raised and message a,b,c should be stored into 3 chunks.
assert_nothing_raised do
@p.write({@dm0 => es}, format: @format)
end
messages = []
# pick up first letter to check whether chunk is queued in expected order
3.times do |index|
chunk = @p.queue[index]
es = Fluent::MessagePackEventStream.new(chunk.chunk)
es.ensure_unpacked!
records = es.instance_eval{ @unpacked_records }
records.each do |record|
messages << record["message"][0]
end
end
es = Fluent::MessagePackEventStream.new(@p.stage[@dm0].chunk)
es.ensure_unpacked!
staged_message = es.instance_eval{ @unpacked_records }.first["message"]
# message a and b are queued, message c is staged
assert_equal([
[@dm0],
"c" * 1_000_000,
[@dm0, @dm0, @dm0],
[5000, 1, 1],
[["x"] * 5000, "a", "b"].flatten
],
[
@p.stage.keys,
staged_message,
@p.queue.map(&:metadata),
@p.queue.map(&:size),
messages
])
end
end

sub_test_case 'custom format with configuration for test with lower chunk limit size' do
Expand Down Expand Up @@ -1078,6 +1123,38 @@ def create_chunk_es(metadata, es)
@p.write({@dm0 => es})
end
end

test 'confirm that every array message which is smaller than chunk threshold does not raise BufferChunkOverflowError' do
assert_equal [@dm0], @p.stage.keys
assert_equal [], @p.queue.map(&:metadata)

assert_equal 1_280_000, @p.chunk_limit_size

es = ["a" * 1_000_000, "b" * 1_000_000, "c" * 1_000_000]
assert_nothing_raised do
@p.write({@dm0 => es})
end
queue_messages = @p.queue.collect do |chunk|
# collect first character of each message
chunk.chunk[0]
end
assert_equal([
[@dm0],
1,
"c",
[@dm0, @dm0, @dm0],
[5000, 1, 1],
["x", "a", "b"]
],
[
@p.stage.keys,
@p.stage[@dm0].size,
@p.stage[@dm0].chunk[0],
@p.queue.map(&:metadata),
@p.queue.map(&:size),
queue_messages
])
end
end

sub_test_case 'with configuration for test with lower limits' do
Expand Down

0 comments on commit cca4f44

Please sign in to comment.