Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add suspend option to in dummy plugin #900

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 9 additions & 2 deletions lib/fluent/plugin/in_dummy.rb
Expand Up @@ -26,13 +26,16 @@ class DummyInput < Input
helpers :thread, :storage

BIN_NUM = 10
DEFAULT_STORAGE_TYPE = 'local'

desc "The value is the tag assigned to the generated events."
config_param :tag, :string
desc "It configures how many events to generate per second."
config_param :rate, :integer, default: 1
desc "If specified, each generated event has an auto-incremented key field."
config_param :auto_increment_key, :string, default: nil
desc "The boolean to suspend-and-resume incremental value after restart"
config_param :suspend, :bool, default: false
desc "The dummy data to be generated. An array of JSON hashes or a single JSON hash."
config_param :dummy, default: [{"message"=>"dummy"}] do |val|
begin
Expand All @@ -58,12 +61,16 @@ def initialize
def configure(conf)
super
@dummy_index = 0
config = conf.elements.select{|e| e.name == 'storage' }.first
@storage = storage_create(usage: 'suspend', conf: config, default_type: DEFAULT_STORAGE_TYPE)
end

def start
super

@storage = storage_create(type: 'local')
@storage.put(:increment_value, 0) unless @storage.get(:increment_value)
@storage.put(:dummy_index, 0) unless @storage.get(:dummy_index)

if @auto_increment_key && !@storage.get(:auto_increment_value)
@storage.put(:auto_increment_value, -1)
end
Expand Down Expand Up @@ -100,7 +107,7 @@ def generate
d = @dummy[@dummy_index]
unless d
@dummy_index = 0
d = @dummy[0]
d = @dummy[@dummy_index]
end
@dummy_index += 1
if @auto_increment_key
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/storage_local.rb
Expand Up @@ -58,7 +58,7 @@ def configure(conf)
raise Fluent::ConfigError, "Unexpected error: failed to read data from plugin storage file: '#{@path}'"
end
else
raise Fluent::ConfigError, "Directory is not writable for plugin storage file '#{@path}'" unless File.writable?(@path)
raise Fluent::ConfigError, "Directory is not writable for plugin storage file '#{dir}'" unless File.writable?(dir)
end
end
end
Expand Down
95 changes: 95 additions & 0 deletions test/plugin/test_in_dummy.rb
@@ -1,6 +1,7 @@
require_relative '../helper'
require 'fluent/test/driver/input'
require 'fluent/plugin/in_dummy'
require 'fileutils'

class DummyTest < Test::Unit::TestCase
def setup
Expand Down Expand Up @@ -90,4 +91,98 @@ def create_driver(conf)
end
end
end

TEST_PLUGIN_STORAGE_PATH = File.join( File.dirname(File.dirname(__FILE__)), 'tmp', 'in_dummy', 'store' )
FileUtils.mkdir_p TEST_PLUGIN_STORAGE_PATH

sub_test_case "doesn't suspend internal counters in default" do
config1 = {
'tag' => 'dummy',
'rate' => '2',
'dummy' => '[{"x": 1, "y": "1"}, {"x": 2, "y": "2"}, {"x": 3, "y": "3"}]',
'auto_increment_key' => 'id',
'suspend' => false,
}
conf1 = config_element('ROOT', '', config1, [])
test "value of auto increment key is not suspended after stop-and-start" do
assert !File.exist?(File.join(TEST_PLUGIN_STORAGE_PATH, 'json', 'test-01.json'))

d1 = create_driver(conf1)
d1.run(timeout: 0.5) do
d1.instance.emit(4)
end

first_id1 = d1.events.first[2]['id']
assert_equal 0, first_id1

last_id1 = d1.events.last[2]['id']
assert { last_id1 > 0 }

assert !File.exist?(File.join(TEST_PLUGIN_STORAGE_PATH, 'json', 'test-01.json'))

d2 = create_driver(conf1)
d2.run(timeout: 0.5) do
d2.instance.emit(4)
end

first_id2 = d2.events.first[2]['id']
assert_equal 0, first_id2

assert !File.exist?(File.join(TEST_PLUGIN_STORAGE_PATH, 'json', 'test-01.json'))
end
end

sub_test_case "suspend internal counters if suspend is true" do
setup do
FileUtils.rm_rf(TEST_PLUGIN_STORAGE_PATH)
FileUtils.mkdir_p(File.join(TEST_PLUGIN_STORAGE_PATH, 'json'))
FileUtils.chmod_R(0755, File.join(TEST_PLUGIN_STORAGE_PATH, 'json'))
end

config2 = {
'@id' => 'test-02',
'tag' => 'dummy',
'rate' => '2',
'dummy' => '[{"x": 1, "y": "1"}, {"x": 2, "y": "2"}, {"x": 3, "y": "3"}]',
'auto_increment_key' => 'id',
'suspend' => true,
}
conf2 = config_element('ROOT', '', config2, [
config_element(
'storage', '',
{'@type' => 'local',
'@id' => 'test-02',
'path' => File.join(TEST_PLUGIN_STORAGE_PATH,
'json', 'test-02.json'),
'persistent' => true,
})
])
test "value of auto increment key is suspended after stop-and-start" do
assert !File.exist?(File.join(TEST_PLUGIN_STORAGE_PATH, 'json', 'test-02.json'))

d1 = create_driver(conf2)
d1.run(timeout: 0.5) do
d1.instance.emit(4)
end

first_id1 = d1.events.first[2]['id']
assert_equal 0, first_id1

last_id1 = d1.events.last[2]['id']
assert { last_id1 > 0 }

assert File.exist?(File.join(TEST_PLUGIN_STORAGE_PATH, 'json', 'test-02.json'))

d2 = create_driver(conf2)
d2.run(timeout: 0.5) do
d2.instance.emit(4)
end
d2.events

first_id2 = d2.events.first[2]['id']
assert_equal last_id1 + 1, first_id2

assert File.exist?(File.join(TEST_PLUGIN_STORAGE_PATH, 'json', 'test-02.json'))
end
end
end