Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
Signed-off-by: Daijiro Fukuda <fukuda@clear-code.com>
  • Loading branch information
daipom committed Mar 27, 2024
1 parent fedb6d4 commit 3ea2a2e
Showing 1 changed file with 59 additions and 0 deletions.
59 changes: 59 additions & 0 deletions test/plugin/test_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,65 @@ def create_chunk_es(metadata, es)

assert_equal 2, purge_count
end

# https://github.com/fluent/fluentd/issues/4446
test "#write_step_by_step keeps chunks kept in locked in entire #write process" do
assert_equal 8 * 1024 * 1024, @p.chunk_limit_size
assert_equal 0.95, @p.chunk_full_threshold

mon_enter_counts_by_chunk = {}
mon_exit_counts_by_chunk = {}

stub.proxy(@p).generate_chunk(anything) do |chunk|
stub(chunk).mon_enter do
enter_count = 1 + mon_enter_counts_by_chunk.fetch(chunk, 0)
exit_count = mon_exit_counts_by_chunk.fetch(chunk, 0)
mon_enter_counts_by_chunk[chunk] = enter_count

# Assert that chunk is passed to &block of write_step_by_step before exiting the lock.
# (i.e. The lock count must be 2 greater than the exit count).
# Since ShouldRetry occurs once, the staged chunk takes the lock 3 times when calling the block.
if chunk.staged?
lock_in_block = enter_count == 3
assert_equal(enter_count - 2, exit_count) if lock_in_block
else
lock_in_block = enter_count == 2
assert_equal(enter_count - 2, exit_count) if lock_in_block
end
end
stub(chunk).mon_exit do
exit_count = 1 + mon_exit_counts_by_chunk.fetch(chunk, 0)
mon_exit_counts_by_chunk[chunk] = exit_count
end
chunk
end

m = @p.metadata(timekey: Time.parse('2016-04-11 16:40:00 +0000').to_i)
small_row = "x" * 1024 * 400
big_row = "x" * 1024 * 1024 * 8 # just `chunk_size_limit`, it does't cause BufferOverFlowError.

# Write 42 events in 1 event stream, last one is for triggering `ShouldRetry`
@p.write({m => [small_row] * 40 + [big_row] + ["x"]})

# Above event strem will be splitted twice by `Buffer#write_step_by_step`
#
# 1. `write_once`: 42 [events] * 1 [stream]
# 2. `write_step_by_step`: 4 [events]* 10 [streams] + 2 [events] * 1 [stream]
# 3. `write_step_by_step` (by `ShouldRetry`): 1 [event] * 42 [streams]
#
# Example of staged chunk lock behavior:
#
# 1. mon_enter in write_step_by_step
# 2. ShouldRetry occurs
# 3. mon_exit in write_step_by_step
# 4. mon_enter again in write_step_by_step (retry)
# 5. passed to &block of write_step_by_step
# 6. mon_enter in the block (write)
# 7. mon_exit in write_step_by_step
# 8. mon_exit in write

assert_equal(mon_enter_counts_by_chunk.values, mon_exit_counts_by_chunk.values)
end
end

sub_test_case 'standard format with configuration for test with lower chunk limit size' do
Expand Down

0 comments on commit 3ea2a2e

Please sign in to comment.