Skip to content

Commit

Permalink
Don't raise exception when each message size is smaller enough
Browse files Browse the repository at this point in the history
Follow up #3553

In the previous versions, even though each record size is smaller
than chunk limit size, but whole message size exceeds
chunk limit size, BufferChunkOverflowError is raised unexpectedly.

For example, if chunk limit size is 1_280_000, when processing 3 event
stream (every 1_000_000 bytes), it throws an exception like this:

  Fluent::Plugin::Buffer::BufferChunkOverflowError(<a 1000025 bytes
  record (nth: 1) is larger than buffer chunk limit size, a 1000025
  bytes record (nth: 2) is larger than buffer chunk limit size>)

Now changed not to raise exception if it's record size is smaller
enough than chunk limit size. Thus each message is stored into
separated chunks.

The idea is based on that adding byte size is smaller than chunk
limit size, chunk should be unstaged and pushed into queue,
If not, it should be skipped like #3553.

NOTE: This approach depends that chunk implements adding_bytes
interface.

Signed-off-by: Kentaro Hayashi <hayashi@clear-code.com>
  • Loading branch information
kenhys committed Dec 7, 2021
1 parent 30132e1 commit 0c57f88
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 8 deletions.
30 changes: 22 additions & 8 deletions lib/fluent/plugin/buffer.rb
Expand Up @@ -780,10 +780,14 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
chunk.commit
else
big_record_size = formatted_split.bytesize
if chunk.bytesize + big_record_size > @chunk_limit_size
if big_record_size > @chunk_limit_size
# Just skip to next split (current split is ignored)
errors << "a #{big_record_size} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})"
writing_splits_index += 1
next
elsif chunk.bytesize + big_record_size > @chunk_limit_size
# No doubt that the split is expected to cause size over, keep 'split' content here.
chunk.commit
end
end
end
Expand All @@ -799,13 +803,23 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
chunk.rollback

if split.size == 1 && original_bytesize == 0
# It is obviously case that BufferChunkOverflowError should be raised here,
# but if it raises here, already processed 'split' or
# the proceeding 'split' will be lost completely.
# so it is a last resort to delay raising such a exception
errors << "concatenated/appended a #{adding_bytes} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})"
writing_splits_index += 1
next
# FIXME: adding_bytes should be implemented as an official chunk API in the future
if adding_bytes.is_a?(Integer) && adding_bytes < @chunk_limit_size
# already processed content is kept after rollback, then unstaged chunk should be queued.
# After that, re-process current split again (new chunk should be allocated, to do it, modify @stage and so on)
synchronize { @stage.delete(modified_metadata) }
staged_chunk_used = false
chunk.unstaged!
break
else
# It is obviously case that BufferChunkOverflowError should be raised here,
# but if it raises here, already processed 'split' or
# the proceeding 'split' will be lost completely.
# so it is a last resort to delay raising such a exception
errors << "concatenated/appended a #{adding_bytes} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})"
writing_splits_index += 1
next
end
end

if chunk_size_full?(chunk) || split.size == 1
Expand Down
77 changes: 77 additions & 0 deletions test/plugin/test_buffer.rb
Expand Up @@ -990,6 +990,51 @@ def create_chunk_es(metadata, es)
assert_equal [@dm0], @p.queue.map(&:metadata)
assert_equal [5000], @p.queue.map(&:size)
end

test "confirm that every message which is smaller than chunk threshold does not raise BufferChunkOverflowError" do
assert_equal [@dm0], @p.stage.keys
assert_equal [], @p.queue.map(&:metadata)
timestamp = event_time('2016-04-11 16:00:02 +0000')
es = Fluent::ArrayEventStream.new([[timestamp, {"message" => "a" * 1_000_000}],
[timestamp, {"message" => "b" * 1_000_000}],
[timestamp, {"message" => "c" * 1_000_000}]])

# https://github.com/fluent/fluentd/issues/1849
# Even though 1_000_000 < 1_280_000 (chunk_limit_size), it raised BufferChunkOverflowError before.
# It should not be raised and message a,b,c should be stored into 3 chunks.
assert_nothing_raised do
@p.write({@dm0 => es}, format: @format)
end
messages = []
# pick up first letter to check whether chunk is queued in expected order
3.times do |index|
chunk = @p.queue[index]
es = Fluent::MessagePackEventStream.new(chunk.chunk)
es.ensure_unpacked!
records = es.instance_eval{ @unpacked_records }
records.each do |record|
messages << record["message"][0]
end
end
es = Fluent::MessagePackEventStream.new(@p.stage[@dm0].chunk)
es.ensure_unpacked!
staged_message = es.instance_eval{ @unpacked_records }.first["message"]
# message a and b are queued, message c is staged
assert_equal([
[@dm0],
"c" * 1_000_000,
[@dm0, @dm0, @dm0],
[5000, 1, 1],
[["x"] * 5000, "a", "b"].flatten
],
[
@p.stage.keys,
staged_message,
@p.queue.map(&:metadata),
@p.queue.map(&:size),
messages
])
end
end

sub_test_case 'custom format with configuration for test with lower chunk limit size' do
Expand Down Expand Up @@ -1078,6 +1123,38 @@ def create_chunk_es(metadata, es)
@p.write({@dm0 => es})
end
end

test 'confirm that every array message which is smaller than chunk threshold does not raise BufferChunkOverflowError' do
assert_equal [@dm0], @p.stage.keys
assert_equal [], @p.queue.map(&:metadata)

assert_equal 1_280_000, @p.chunk_limit_size

es = ["a" * 1_000_000, "b" * 1_000_000, "c" * 1_000_000]
assert_nothing_raised do
@p.write({@dm0 => es})
end
queue_messages = @p.queue.collect do |chunk|
# collect first character of each message
chunk.chunk[0]
end
assert_equal([
[@dm0],
1,
"c",
[@dm0, @dm0, @dm0],
[5000, 1, 1],
["x", "a", "b"]
],
[
@p.stage.keys,
@p.stage[@dm0].size,
@p.stage[@dm0].chunk[0],
@p.queue.map(&:metadata),
@p.queue.map(&:size),
queue_messages
])
end
end

sub_test_case 'with configuration for test with lower limits' do
Expand Down

0 comments on commit 0c57f88

Please sign in to comment.