-
Notifications
You must be signed in to change notification settings - Fork 43
/
in_systemd.rb
157 lines (133 loc) · 4.78 KB
/
in_systemd.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# frozen_string_literal: true
# Copyright 2015-2018 Edward Robinson
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require 'systemd/journal'
require 'fluent/plugin/input'
require 'fluent/plugin/systemd/entry_mutator'
module Fluent
module Plugin
# Fluentd plugin for reading from the systemd journal
class SystemdInput < Input # rubocop:disable Metrics/ClassLength
Fluent::Plugin.register_input('systemd', self)
helpers :timer, :storage
DEFAULT_STORAGE_TYPE = 'local'
config_param :path, :string, default: '/var/log/journal'
config_param :filters, :array, default: [], deprecated: 'filters has been renamed as matches'
config_param :matches, :array, default: nil
config_param :read_from_head, :bool, default: false
config_param :tag, :string
config_section :storage do
config_set_default :usage, 'positions'
config_set_default :@type, DEFAULT_STORAGE_TYPE
config_set_default :persistent, false
end
config_section :entry, param_name: 'entry_opts', required: false, multi: false do
config_param :field_map, :hash, default: {}
config_param :field_map_strict, :bool, default: false
config_param :fields_strip_underscores, :bool, default: false
config_param :fields_lowercase, :bool, default: false
end
def configure(conf)
super
@journal = nil
@pos_storage = storage_create(usage: 'positions')
@mutator = SystemdEntryMutator.new(**@entry_opts.to_h)
@mutator.warnings.each { |warning| log.warn(warning) }
end
def start
super
@running = true
timer_execute(:in_systemd_emit_worker, 1, &method(:run))
end
def shutdown
@running = false
@journal&.close
@journal = nil
@pos_storage = nil
@mutator = nil
super
end
private
def init_journal
# TODO: ruby 2.3
@journal.close if @journal # rubocop:disable Style/SafeNavigation
@journal = Systemd::Journal.new(path: @path)
@journal.filter(*(@matches || @filters))
seek
true
rescue Systemd::JournalError => e
log.warn("#{e.class}: #{e.message} retrying in 1s")
false
end
def seek
cursor = @pos_storage.get(:journal)
seek_to(cursor || read_from)
rescue Systemd::JournalError
log.warn(
"Could not seek to cursor #{cursor} found in position file: #{@pos_storage.path}, " \
"falling back to reading from #{read_from}"
)
seek_to(read_from)
end
# according to https://github.com/ledbettj/systemd-journal/issues/64#issuecomment-271056644
# and https://bugs.freedesktop.org/show_bug.cgi?id=64614, after doing a seek(:tail),
# you must move back in such a way that the next move_next will return the last
# record
def seek_to(pos)
@journal.seek(pos)
return if pos == :head
if pos == :tail
@journal.move(-2)
else
@journal.move(1)
end
end
def read_from
@read_from_head ? :head : :tail
end
def run
return unless @journal || init_journal
init_journal if @journal.wait(0) == :invalidate
watch do |entry|
emit(entry)
end
end
def emit(entry)
router.emit(@tag, Fluent::EventTime.from_time(entry.realtime_timestamp), formatted(entry))
rescue Fluent::Plugin::Buffer::BufferOverflowError => e
retries ||= 0
raise e if retries > 10
retries += 1
sleep 1.5**retries + rand(0..3)
retry
rescue => e # rubocop:disable Style/RescueStandardError
log.error("Exception emitting record: #{e}")
end
def formatted(entry)
@mutator.run(entry)
end
def watch(&block)
yield_current_entry(&block) while @running && @journal.move_next
rescue Systemd::JournalError => e
log.warn("Error moving to next Journal entry: #{e.class}: #{e.message}")
end
def yield_current_entry
yield @journal.current_entry
@pos_storage.put(:journal, @journal.cursor)
rescue Systemd::JournalError => e
log.warn("Error reading from Journal: #{e.class}: #{e.message}")
end
end
end
end