Skip to content

Commit

Permalink
in_dummy: Follow plugin_helper changes
Browse files Browse the repository at this point in the history
  • Loading branch information
cosmo0920 committed Aug 5, 2016
1 parent c612e64 commit 7a54762
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 66 deletions.
45 changes: 13 additions & 32 deletions lib/fluent/plugin/in_dummy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,9 @@

require 'fluent/plugin/input'
require 'fluent/config/error'
require 'fluent/plugin_helper/thread'
require 'fluent/plugin_helper/storage'

module Fluent::Plugin
class DummyInput < Input
include Fluent::PluginHelper::Thread
include Fluent::PluginHelper::Storage

Fluent::Plugin.register_input('dummy', self)

helpers :thread, :storage
Expand Down Expand Up @@ -65,21 +60,16 @@ def initialize
def configure(conf)
super
@dummy_index = 0
unless @suspend
storage.autosave = false
storage.save_at_shutdown = false
end
config = conf.elements.select{|e| e.name == 'storage' }.first
@storage = storage_create(usage: 'suspend', conf: config, type: :local)
end

def start
super

storage.put(:increment_value, 0) unless storage.get(:increment_value)
storage.put(:dummy_index, 0) unless storage.get(:dummy_index)
@storage.put(:increment_value, 0) unless @storage.get(:increment_value)
@storage.put(:dummy_index, 0) unless @storage.get(:dummy_index)

@running = true
@thread = Thread.new(&method(:run))
@storage = storage_create(type: 'local')
if @auto_increment_key && !@storage.get(:auto_increment_value)
@storage.put(:auto_increment_value, -1)
end
Expand Down Expand Up @@ -113,26 +103,17 @@ def emit(num)
end

def generate
storage.synchronize do
d = @dummy[@dummy_index]
unless d
@dummy_index = 0
d = @dummy[@dummy_index]
unless d
@dummy_index = 0
d = @dummy[0]
end
@dummy_index += 1
if @auto_increment_key
d = d.dup
d[@auto_increment_key] = @storage.update(:auto_increment_value){|v| v + 1 }

if @auto_increment_key
d = d.dup
inc_value = storage.get(:increment_value)
d[@auto_increment_key] = inc_value
storage.put(:increment_value, inc_value + 1)
end
d
end
end
@dummy_index += 1
if @auto_increment_key
d = d.dup
d[@auto_increment_key] = @storage.update(:auto_increment_value){|v| v + 1 }
end
d
end

def wait(time)
Expand Down
76 changes: 42 additions & 34 deletions test/plugin/test_in_dummy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -96,33 +96,32 @@ def create_driver(conf)
FileUtils.mkdir_p TEST_PLUGIN_STORAGE_PATH

sub_test_case "doesn't suspend internal counters in default" do
config1 = %[
@id test-01
tag dummy
rate 10
dummy [{"x": 1, "y": "1"}, {"x": 2, "y": "2"}, {"x": 3, "y": "3"}]
auto_increment_key id
]
config1 = {
'@id' => 'test-01',
'tag' => 'dummy',
'rate' => '2',
'dummy' => '[{"x": 1, "y": "1"}, {"x": 2, "y": "2"}, {"x": 3, "y": "3"}]',
'auto_increment_key' => 'id',
}
conf1 = config_element('ROOT', '', config1, [])
test "value of auto increment key is not suspended after stop-and-start" do
assert !File.exist?(File.join(TEST_PLUGIN_STORAGE_PATH, 'json', 'test-01.json'))

d1 = create_driver(config1, plugin_storage_path: TEST_PLUGIN_STORAGE_PATH)
d1.expected_emits_length = 4
d1.run
d1 = create_driver(conf1)
d1.run(expect_emits: 4, timeout: 1)

first_id1 = d1.emits.first[2]['id']
first_id1 = d1.instance.events.first[2]['id']
assert_equal 0, first_id1

last_id1 = d1.emits.last[2]['id']
last_id1 = d1.events.last[2]['id']
assert { last_id1 > 0 }

assert !File.exist?(File.join(TEST_PLUGIN_STORAGE_PATH, 'json', 'test-01.json'))

d2 = create_driver(config1, plugin_storage_path: TEST_PLUGIN_STORAGE_PATH)
d2.expected_emits_length = 4
d2.run
d2 = create_driver(conf1)
d2.run(expect_emits: 4, timeout: 1)

first_id2 = d2.emits.first[2]['id']
first_id2 = d2.events.first[2]['id']
assert_equal 0, first_id2

assert !File.exist?(File.join(TEST_PLUGIN_STORAGE_PATH, 'json', 'test-01.json'))
Expand All @@ -132,37 +131,46 @@ def create_driver(conf)
sub_test_case "suspend internal counters if suspend is true" do
setup do
FileUtils.rm_rf(TEST_PLUGIN_STORAGE_PATH)
FileUtils.mkdir_p(File.join(TEST_PLUGIN_STORAGE_PATH, 'json'))
FileUtils.chmod_R(0755, File.join(TEST_PLUGIN_STORAGE_PATH, 'json'))
end

config2 = %[
@id test-02
tag dummy
rate 2
dummy [{"x": 1, "y": "1"}, {"x": 2, "y": "2"}, {"x": 3, "y": "3"}]
auto_increment_key id
suspend true
]
config2 = {
'@id' => 'test-02',
'tag' => 'dummy',
'rate' => '2',
'dummy' => '[{"x": 1, "y": "1"}, {"x": 2, "y": "2"}, {"x": 3, "y": "3"}]',
'auto_increment_key' => 'id',
'suspend' => true
}
conf2 = config_element('ROOT', '', config2, [
config_element(
'storage', '',
{'@type' => 'local',
'@id' => 'test-02',
'path' => File.join(TEST_PLUGIN_STORAGE_PATH,
'json', 'test-02.json'),
'persistent' => true,
})
])
test "value of auto increment key is suspended after stop-and-start" do
assert !File.exist?(File.join(TEST_PLUGIN_STORAGE_PATH, 'json', 'test-02.json'))

d1 = create_driver(config2, plugin_storage_path: TEST_PLUGIN_STORAGE_PATH)

d1.expected_emits_length = 4
d1.run
d1 = create_driver(conf2)
d1.run(expect_emits: 4, timeout: 1)

first_id1 = d1.emits.first[2]['id']
first_id1 = d1.events.first[2]['id']
assert_equal 0, first_id1

last_id1 = d1.emits.last[2]['id']
last_id1 = d1.events.last[2]['id']
assert { last_id1 > 0 }

assert File.exist?(File.join(TEST_PLUGIN_STORAGE_PATH, 'json', 'test-02.json'))

d2 = create_driver(config2, plugin_storage_path: TEST_PLUGIN_STORAGE_PATH)
d2.expected_emits_length = 4
d2.run
d2 = create_driver(conf2)
d2.run(expect_emits: 4, timeout: 1)

first_id2 = d2.emits.first[2]['id']
first_id2 = d2.events.first[2]['id']
assert_equal last_id1 + 1, first_id2

assert File.exist?(File.join(TEST_PLUGIN_STORAGE_PATH, 'json', 'test-02.json'))
Expand Down

0 comments on commit 7a54762

Please sign in to comment.