Skip to content

Commit

Permalink
in_tail: test: Add writing files before checking bytes limits case
Browse files Browse the repository at this point in the history
Signed-off-by: Hiroshi Hatake <hatake@calyptia.com>
  • Loading branch information
cosmo0920 committed May 13, 2021
1 parent bc11e6c commit bee9125
Showing 1 changed file with 59 additions and 0 deletions.
59 changes: 59 additions & 0 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,65 @@ def test_emit_with_read_bytes_limit_per_second(data)
d.instance.shutdown
end
end

sub_test_case "reads_bytes_per_second with several writing points" do
class Fluent::Plugin::TailInput::TailWatcher::IOHandler
alias_method :orig_limit_bytes_per_second_reached?, :limit_bytes_per_second_reached?
def limit_bytes_per_second_reached?(&block)
yield if block_given?

orig_limit_bytes_per_second_reached?
end
end

data("flat 8192 bytes, 2 events" => [:flat, 100, 8192, 2],
"flat #{8192*10} bytes, 20 events" => [:flat, 100, (8192 * 10), 20],
"parse #{8192*4} bytes, 8 events" => [:parse, 100, (8192 * 4), 8],
"parse #{8192*10} bytes, 20 events" => [:parse, 100, (8192 * 10), 20],
"flat 8k bytes with unit, 2 events" => [:flat, 100, "8k", 2],
"flat #{8*10}k bytes with unit, 20 events" => [:flat, 100, "#{8*10}k", 20],
"parse #{8*4}k bytes with unit, 8 events" => [:parse, 100, "#{8*4}k", 8],
"parse #{8*10}k bytes with unit, 20 events" => [:parse, 100, "#{8*10}k", 20])
def test_emit_with_read_bytes_limit_per_second(data)
config_style, limit, limit_bytes, num_events = data
case config_style
when :flat
config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes })
when :parse
config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG
end
d = create_driver(config)
msg = 'test' * 2000 # in_tail reads 8192 bytes at once.

mock.proxy(d.instance).io_handler(anything, anything) do |io_handler|
io_handler.limit_bytes_per_second_reached? do
File.open("#{TMP_DIR}/tail.txt", "ab") {|f|
for _x in 0..5
f.puts msg
end
}
end
io_handler
end

# We should not do shutdown here due to hard timeout.
d.run(expect_emits: 2, shutdown: false) do
File.open("#{TMP_DIR}/tail.txt", "ab") {|f|
for _x in 0..30
f.puts msg
end
}
end

events = d.events
assert_true(events.length <= num_events)
assert_equal({"message" => msg}, events[0][2])
assert_equal({"message" => msg}, events[1][2])

# Teardown in_tail plugin instance here.
d.instance.shutdown
end
end
end

data(flat: CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG,
Expand Down

0 comments on commit bee9125

Please sign in to comment.