From 13a5199522ee1fc3f55c40a73d30c6036d34570f Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Wed, 27 Mar 2024 13:17:45 +0900 Subject: [PATCH] buffer: fix emit error of race condition (#4447) After 95438b2eeb4bb5586a4031c1fbc35756f3c12565 (#4342), there is a section where chunks do not have a lock in `write_step_by_step()`. `write_step_by_step()` must ensure their locks until passing them to the block. Otherwise, race condition can occur and it can cause emit error by IOError. Example of warning messages of emit error: [warn]: #0 emit transaction failed: error_class=IOError error="closed stream" location=... [warn]: #0 send an error event stream to @ERROR: error_class=IOError error="closed stream" location=... Signed-off-by: Daijiro Fukuda --- lib/fluent/plugin/buffer.rb | 143 +++++++++++++++++++----------------- test/plugin/test_buffer.rb | 59 +++++++++++++++ 2 files changed, 134 insertions(+), 68 deletions(-) diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index 0251de3409..80709c12bb 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -764,94 +764,95 @@ def write_step_by_step(metadata, data, format, splits_count, &block) while writing_splits_index < splits.size chunk = get_next_chunk.call errors = [] + # The chunk must be locked until being passed to &block. + chunk.mon_enter modified_chunks << {chunk: chunk, adding_bytesize: 0, errors: errors} - chunk.synchronize do - raise ShouldRetry unless chunk.writable? - staged_chunk_used = true if chunk.staged? - - original_bytesize = committed_bytesize = chunk.bytesize - begin - while writing_splits_index < splits.size - split = splits[writing_splits_index] - formatted_split = format ? format.call(split) : nil - if split.size == 1 # Check BufferChunkOverflowError - determined_bytesize = nil - if @compress != :text - determined_bytesize = nil - elsif formatted_split - determined_bytesize = formatted_split.bytesize - elsif split.first.respond_to?(:bytesize) - determined_bytesize = split.first.bytesize - end + raise ShouldRetry unless chunk.writable? + staged_chunk_used = true if chunk.staged? - if determined_bytesize && determined_bytesize > @chunk_limit_size - # It is a obvious case that BufferChunkOverflowError should be raised here. - # But if it raises here, already processed 'split' or - # the proceeding 'split' will be lost completely. - # So it is a last resort to delay raising such a exception - errors << "a #{determined_bytesize} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})" - writing_splits_index += 1 - next - end + original_bytesize = committed_bytesize = chunk.bytesize + begin + while writing_splits_index < splits.size + split = splits[writing_splits_index] + formatted_split = format ? format.call(split) : nil - if determined_bytesize.nil? || chunk.bytesize + determined_bytesize > @chunk_limit_size - # The split will (might) cause size over so keep already processed - # 'split' content here (allow performance regression a bit). - chunk.commit - committed_bytesize = chunk.bytesize - end + if split.size == 1 # Check BufferChunkOverflowError + determined_bytesize = nil + if @compress != :text + determined_bytesize = nil + elsif formatted_split + determined_bytesize = formatted_split.bytesize + elsif split.first.respond_to?(:bytesize) + determined_bytesize = split.first.bytesize end - if format - chunk.concat(formatted_split, split.size) - else - chunk.append(split, compress: @compress) + if determined_bytesize && determined_bytesize > @chunk_limit_size + # It is a obvious case that BufferChunkOverflowError should be raised here. + # But if it raises here, already processed 'split' or + # the proceeding 'split' will be lost completely. + # So it is a last resort to delay raising such a exception + errors << "a #{determined_bytesize} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})" + writing_splits_index += 1 + next end - adding_bytes = chunk.bytesize - committed_bytesize - if chunk_size_over?(chunk) # split size is larger than difference between size_full? and size_over? - chunk.rollback + if determined_bytesize.nil? || chunk.bytesize + determined_bytesize > @chunk_limit_size + # The split will (might) cause size over so keep already processed + # 'split' content here (allow performance regression a bit). + chunk.commit committed_bytesize = chunk.bytesize + end + end - if split.size == 1 # Check BufferChunkOverflowError again - if adding_bytes > @chunk_limit_size - errors << "concatenated/appended a #{adding_bytes} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})" - writing_splits_index += 1 - next - else - # As already processed content is kept after rollback, then unstaged chunk should be queued. - # After that, re-process current split again. - # New chunk should be allocated, to do it, modify @stage and so on. - synchronize { @stage.delete(modified_metadata) } - staged_chunk_used = false - chunk.unstaged! - break - end - end + if format + chunk.concat(formatted_split, split.size) + else + chunk.append(split, compress: @compress) + end + adding_bytes = chunk.bytesize - committed_bytesize - if chunk_size_full?(chunk) || split.size == 1 - enqueue_chunk_before_retry = true + if chunk_size_over?(chunk) # split size is larger than difference between size_full? and size_over? + chunk.rollback + committed_bytesize = chunk.bytesize + + if split.size == 1 # Check BufferChunkOverflowError again + if adding_bytes > @chunk_limit_size + errors << "concatenated/appended a #{adding_bytes} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})" + writing_splits_index += 1 + next else - splits_count *= 10 + # As already processed content is kept after rollback, then unstaged chunk should be queued. + # After that, re-process current split again. + # New chunk should be allocated, to do it, modify @stage and so on. + synchronize { @stage.delete(modified_metadata) } + staged_chunk_used = false + chunk.unstaged! + break end + end - raise ShouldRetry + if chunk_size_full?(chunk) || split.size == 1 + enqueue_chunk_before_retry = true + else + splits_count *= 10 end - writing_splits_index += 1 + raise ShouldRetry + end - if chunk_size_full?(chunk) - break - end + writing_splits_index += 1 + + if chunk_size_full?(chunk) + break end - rescue - chunk.purge if chunk.unstaged? # unstaged chunk will leak unless purge it - raise end - - modified_chunks.last[:adding_bytesize] = chunk.bytesize - original_bytesize + rescue + chunk.purge if chunk.unstaged? # unstaged chunk will leak unless purge it + raise end + + modified_chunks.last[:adding_bytesize] = chunk.bytesize - original_bytesize end modified_chunks.each do |data| block.call(data[:chunk], data[:adding_bytesize], data[:errors]) @@ -863,9 +864,15 @@ def write_step_by_step(metadata, data, format, splits_count, &block) if chunk.unstaged? chunk.purge rescue nil end + chunk.mon_exit rescue nil end enqueue_chunk(metadata) if enqueue_chunk_before_retry retry + ensure + modified_chunks.each do |data| + chunk = data[:chunk] + chunk.mon_exit + end end STATS_KEYS = [ diff --git a/test/plugin/test_buffer.rb b/test/plugin/test_buffer.rb index 451358a5b1..14d33af9c7 100644 --- a/test/plugin/test_buffer.rb +++ b/test/plugin/test_buffer.rb @@ -901,6 +901,65 @@ def create_chunk_es(metadata, es) assert_equal 2, purge_count end + + # https://github.com/fluent/fluentd/issues/4446 + test "#write_step_by_step keeps chunks kept in locked in entire #write process" do + assert_equal 8 * 1024 * 1024, @p.chunk_limit_size + assert_equal 0.95, @p.chunk_full_threshold + + mon_enter_counts_by_chunk = {} + mon_exit_counts_by_chunk = {} + + stub.proxy(@p).generate_chunk(anything) do |chunk| + stub(chunk).mon_enter do + enter_count = 1 + mon_enter_counts_by_chunk.fetch(chunk, 0) + exit_count = mon_exit_counts_by_chunk.fetch(chunk, 0) + mon_enter_counts_by_chunk[chunk] = enter_count + + # Assert that chunk is passed to &block of write_step_by_step before exiting the lock. + # (i.e. The lock count must be 2 greater than the exit count). + # Since ShouldRetry occurs once, the staged chunk takes the lock 3 times when calling the block. + if chunk.staged? + lock_in_block = enter_count == 3 + assert_equal(enter_count - 2, exit_count) if lock_in_block + else + lock_in_block = enter_count == 2 + assert_equal(enter_count - 2, exit_count) if lock_in_block + end + end + stub(chunk).mon_exit do + exit_count = 1 + mon_exit_counts_by_chunk.fetch(chunk, 0) + mon_exit_counts_by_chunk[chunk] = exit_count + end + chunk + end + + m = @p.metadata(timekey: Time.parse('2016-04-11 16:40:00 +0000').to_i) + small_row = "x" * 1024 * 400 + big_row = "x" * 1024 * 1024 * 8 # just `chunk_size_limit`, it does't cause BufferOverFlowError. + + # Write 42 events in 1 event stream, last one is for triggering `ShouldRetry` + @p.write({m => [small_row] * 40 + [big_row] + ["x"]}) + + # Above event strem will be splitted twice by `Buffer#write_step_by_step` + # + # 1. `write_once`: 42 [events] * 1 [stream] + # 2. `write_step_by_step`: 4 [events]* 10 [streams] + 2 [events] * 1 [stream] + # 3. `write_step_by_step` (by `ShouldRetry`): 1 [event] * 42 [streams] + # + # Example of staged chunk lock behavior: + # + # 1. mon_enter in write_step_by_step + # 2. ShouldRetry occurs + # 3. mon_exit in write_step_by_step + # 4. mon_enter again in write_step_by_step (retry) + # 5. passed to &block of write_step_by_step + # 6. mon_enter in the block (write) + # 7. mon_exit in write_step_by_step + # 8. mon_exit in write + + assert_equal(mon_enter_counts_by_chunk.values, mon_exit_counts_by_chunk.values) + end end sub_test_case 'standard format with configuration for test with lower chunk limit size' do