Skip to content

Commit

Permalink
fix indent
Browse files Browse the repository at this point in the history
I separated commits for clarity.

Signed-off-by: Daijiro Fukuda <fukuda@clear-code.com>
  • Loading branch information
daipom committed Mar 26, 2024
1 parent f4a4591 commit fedb6d4
Showing 1 changed file with 66 additions and 66 deletions.
132 changes: 66 additions & 66 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -768,91 +768,91 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
chunk.mon_enter
modified_chunks << {chunk: chunk, adding_bytesize: 0, errors: errors}

raise ShouldRetry unless chunk.writable?
staged_chunk_used = true if chunk.staged?
raise ShouldRetry unless chunk.writable?
staged_chunk_used = true if chunk.staged?

original_bytesize = committed_bytesize = chunk.bytesize
begin
while writing_splits_index < splits.size
split = splits[writing_splits_index]
formatted_split = format ? format.call(split) : nil
original_bytesize = committed_bytesize = chunk.bytesize
begin
while writing_splits_index < splits.size
split = splits[writing_splits_index]
formatted_split = format ? format.call(split) : nil

if split.size == 1 # Check BufferChunkOverflowError
if split.size == 1 # Check BufferChunkOverflowError
determined_bytesize = nil
if @compress != :text
determined_bytesize = nil
if @compress != :text
determined_bytesize = nil
elsif formatted_split
determined_bytesize = formatted_split.bytesize
elsif split.first.respond_to?(:bytesize)
determined_bytesize = split.first.bytesize
end

if determined_bytesize && determined_bytesize > @chunk_limit_size
# It is a obvious case that BufferChunkOverflowError should be raised here.
# But if it raises here, already processed 'split' or
# the proceeding 'split' will be lost completely.
# So it is a last resort to delay raising such a exception
errors << "a #{determined_bytesize} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})"
writing_splits_index += 1
next
end

if determined_bytesize.nil? || chunk.bytesize + determined_bytesize > @chunk_limit_size
# The split will (might) cause size over so keep already processed
# 'split' content here (allow performance regression a bit).
chunk.commit
committed_bytesize = chunk.bytesize
end
elsif formatted_split
determined_bytesize = formatted_split.bytesize
elsif split.first.respond_to?(:bytesize)
determined_bytesize = split.first.bytesize
end

if format
chunk.concat(formatted_split, split.size)
else
chunk.append(split, compress: @compress)
if determined_bytesize && determined_bytesize > @chunk_limit_size
# It is a obvious case that BufferChunkOverflowError should be raised here.
# But if it raises here, already processed 'split' or
# the proceeding 'split' will be lost completely.
# So it is a last resort to delay raising such a exception
errors << "a #{determined_bytesize} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})"
writing_splits_index += 1
next
end
adding_bytes = chunk.bytesize - committed_bytesize

if chunk_size_over?(chunk) # split size is larger than difference between size_full? and size_over?
chunk.rollback
if determined_bytesize.nil? || chunk.bytesize + determined_bytesize > @chunk_limit_size
# The split will (might) cause size over so keep already processed
# 'split' content here (allow performance regression a bit).
chunk.commit
committed_bytesize = chunk.bytesize
end
end

if split.size == 1 # Check BufferChunkOverflowError again
if adding_bytes > @chunk_limit_size
errors << "concatenated/appended a #{adding_bytes} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})"
writing_splits_index += 1
next
else
# As already processed content is kept after rollback, then unstaged chunk should be queued.
# After that, re-process current split again.
# New chunk should be allocated, to do it, modify @stage and so on.
synchronize { @stage.delete(modified_metadata) }
staged_chunk_used = false
chunk.unstaged!
break
end
end
if format
chunk.concat(formatted_split, split.size)
else
chunk.append(split, compress: @compress)
end
adding_bytes = chunk.bytesize - committed_bytesize

if chunk_size_over?(chunk) # split size is larger than difference between size_full? and size_over?
chunk.rollback
committed_bytesize = chunk.bytesize

if chunk_size_full?(chunk) || split.size == 1
enqueue_chunk_before_retry = true
if split.size == 1 # Check BufferChunkOverflowError again
if adding_bytes > @chunk_limit_size
errors << "concatenated/appended a #{adding_bytes} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})"
writing_splits_index += 1
next
else
splits_count *= 10
# As already processed content is kept after rollback, then unstaged chunk should be queued.
# After that, re-process current split again.
# New chunk should be allocated, to do it, modify @stage and so on.
synchronize { @stage.delete(modified_metadata) }
staged_chunk_used = false
chunk.unstaged!
break
end
end

raise ShouldRetry
if chunk_size_full?(chunk) || split.size == 1
enqueue_chunk_before_retry = true
else
splits_count *= 10
end

writing_splits_index += 1
raise ShouldRetry
end

if chunk_size_full?(chunk)
break
end
writing_splits_index += 1

if chunk_size_full?(chunk)
break
end
rescue
chunk.purge if chunk.unstaged? # unstaged chunk will leak unless purge it
raise
end
rescue
chunk.purge if chunk.unstaged? # unstaged chunk will leak unless purge it
raise
end

modified_chunks.last[:adding_bytesize] = chunk.bytesize - original_bytesize
modified_chunks.last[:adding_bytesize] = chunk.bytesize - original_bytesize
end
modified_chunks.each do |data|
block.call(data[:chunk], data[:adding_bytesize], data[:errors])
Expand Down

0 comments on commit fedb6d4

Please sign in to comment.