Skip to content

Commit

Permalink
test chunk IO closed issue
Browse files Browse the repository at this point in the history
  • Loading branch information
daipom committed Jun 15, 2023
1 parent dcc05e5 commit 0f5947b
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 7 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/linux-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ name: Testing on Ubuntu

on:
push:
branches: [master]
pull_request:
branches: [master]

Expand Down Expand Up @@ -33,4 +32,4 @@ jobs:
- name: Install dependencies
run: bundle install
- name: Run tests
run: bundle exec rake test
run: bundle exec rake test TESTOPTS="-t'BufferedOutputTest::hoge'"
3 changes: 1 addition & 2 deletions .github/workflows/macos-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ name: Testing on macOS

on:
push:
branches: [master]
pull_request:
branches: [master]

Expand Down Expand Up @@ -31,4 +30,4 @@ jobs:
- name: Install dependencies
run: bundle install
- name: Run tests
run: bundle exec rake test
run: bundle exec rake test TESTOPTS="-t'BufferedOutputTest::hoge'"
3 changes: 1 addition & 2 deletions .github/workflows/windows-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ name: Testing on Windows

on:
push:
branches: [master]
pull_request:
branches: [master]

Expand Down Expand Up @@ -46,4 +45,4 @@ jobs:
- name: Install dependencies
run: ridk exec bundle install
- name: Run tests
run: bundle exec rake test TESTOPTS=-v ${{ matrix.ruby-lib-opt }}
run: bundle exec rake test TESTOPTS="-t'BufferedOutputTest::hoge'"
4 changes: 4 additions & 0 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
end
chunk.mon_exit
rescue => e
p e
chunk.rollback
chunk.mon_exit
errors << e
Expand Down Expand Up @@ -678,6 +679,7 @@ def write_once(metadata, data, format: nil, size: nil, &block)
if format && empty_chunk
if chunk.bytesize > @chunk_limit_size
log.warn "chunk bytes limit exceeds for an emitted event stream: #{adding_bytesize}bytes"
p "chunk bytes limit exceeds for an emitted event stream: #{adding_bytesize}bytes"
else
log.warn "chunk size limit exceeds for an emitted event stream: #{chunk.size}records"
end
Expand Down Expand Up @@ -727,6 +729,7 @@ def write_once(metadata, data, format: nil, size: nil, &block)
# 3. create unstaged chunk and append rest splits -> repeat it for all splits

def write_step_by_step(metadata, data, format, splits_count, &block)
p "write_step_by_step"
splits = []
errors = []
if splits_count > data.size
Expand Down Expand Up @@ -812,6 +815,7 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
adding_bytes = chunk.bytesize - committed_bytesize

if chunk_size_over?(chunk) # split size is larger than difference between size_full? and size_over?
p "ShouldRetry"
chunk.rollback
committed_bytesize = chunk.bytesize

Expand Down
51 changes: 50 additions & 1 deletion test/plugin/test_output_as_buffered.rb
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,12 @@ def waiting(seconds)
end
end

setup do
def setup
@i = nil
Dir.mktmpdir do |tmp_dir|
@tmp_dir = Pathname(tmp_dir)
yield
end
end

teardown do
Expand All @@ -223,6 +227,51 @@ def waiting(seconds)
Timecop.return
end

sub_test_case "hoge" do
def test_hoge
events_from_chunk = []
conf = {
"@type" => "file_single",
"path" => @tmp_dir.to_s,
"chunk_limit_size" => 1280,
"chunk_format" => :msgpack,
"flush_thread_count" => 4,
"flush_interval" => 1,
"queue_limit_length" => 100,
}
@i = create_output(:standard)
@i.configure(config_element('ROOT','',{},[config_element('buffer','',conf)]))
@i.register(:prefer_delayed_commit){ false }
@i.register(:write){ |chunk|
e = []
assert chunk.respond_to?(:each)
chunk.each{|t,r| e << [t,r]}
events_from_chunk << [:write, e]
p "write!!"
}
# @i.register(:try_write){ |chunk| e = []; assert chunk.respond_to?(:each); chunk.each{|t,r| e << [t,r]}; events_from_chunk << [:try_write, e] }
@i.start
@i.after_start

1000.times do
es = Fluent::ArrayEventStream.new(
[
[event_time('2016-04-11 16:00:01 +0000'), {"message" => "x" * 500}],
[event_time('2016-04-11 16:00:01 +0000'), {"message" => "x" * 500}],
[event_time('2016-04-11 16:00:01 +0000'), {"message" => "x" * 500}],
[event_time('2016-04-11 16:00:01 +0000'), {"message" => "x" * 500}],
[event_time('2016-04-11 16:00:01 +0000'), {"message" => "x" * 500}],
[event_time('2016-04-11 16:00:01 +0000'), {"message" => "x" * 500}],
]
)
@i.emit_events("test.tag", es)
sleep 1.0
end

# waiting(5){ sleep 0.1 until events_from_chunk.size == 2 }
end
end

test 'queued_chunks_limit_size is same as flush_thread_count by default' do
hash = {'flush_thread_count' => 4}
i = create_output
Expand Down

0 comments on commit 0f5947b

Please sign in to comment.