This repository has been archived by the owner on Mar 22, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3
/
out_ruby_one_liner.rb
91 lines (73 loc) · 2.03 KB
/
out_ruby_one_liner.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
module Fluent
class Fluent::RubyOneLinerOutput < Fluent::Output
Fluent::Plugin.register_output('ruby_one_liner', self)
config_param :require_libs, :string,:default => ''
config_param :command, :string, :default => ''
config_param :commandfile, :string, :default => ''
config_param :run_interval, :integer
def initialize
super
end
def configure(config)
super
libs = @require_libs.split(',')
libs.each {|lib| require lib}
Signal.trap :INT do
$log.warn 'out_ruby_one_liner: reload commandfile start'
reload_commandfile!
$log.warn 'out_ruby_one_liner: reload commandfile end'
end
command = if !@command.empty?
@command
elsif !@commandfile.empty?
open(@commandfile).read
else
raise ConfigError, "out_ruby_one_liner: command or commandfile is required to be set."
end
@config = config
@lambda = eval("lambda {|tag, time, record| #{command}}")
@q = Queue.new
end
def start
super
@thread = Thread.new(&method(:run))
rescue
$log.warn "raises exception: #{$!.class}, '#{$!.message}"
end
def shutdown
super
Thread.kill(@thread)
end
def emit(tag, es, chain)
es.each {|time, record|
param = OpenStruct.new
param.tag = tag
param.time = time
param.record = record
@q.push param
}
chain.next
end
private
def reload_commandfile!
command = open(@commandfile).read
Thread.kill(@thread)
@lambda = eval("lambda {|tag, time, record| #{command}}")
@thread = Thread.new(&method(:run))
end
def run
loop do
param = @q.pop
tag = param.tag
time = param.time
record = param.record
begin
@lambda.call tag, time, record
sleep @run_interval
rescue
$log.warn "raises exception: #{$!.class}, '#{$!.message}, #{param}'"
end
end
end
end
end