forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 0
/
metricset.go
306 lines (264 loc) · 7.86 KB
/
metricset.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
package file_integrity
import (
"bytes"
"os"
"time"
"github.com/boltdb/bolt"
"github.com/pkg/errors"
"github.com/elastic/beats/auditbeat/datastore"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
)
const (
moduleName = "file_integrity"
metricsetName = "file"
bucketName = "file.v1"
// Use old namespace for data until we do some field renaming for GA.
namespace = "."
)
func init() {
mb.Registry.MustAddMetricSet(moduleName, metricsetName, New,
mb.DefaultMetricSet(),
mb.WithHostParser(parse.EmptyHostParser),
mb.WithNamespace(namespace),
)
}
// EventProducer produces events.
type EventProducer interface {
// Start starts the event producer and writes events to the returned
// channel. When the producer is finished it will close the returned
// channel. If the returned event channel is not drained the producer will
// block (possibly causing data loss). The producer can be stopped
// prematurely by closing the provided done channel. An error is returned
// if the producer fails to start.
Start(done <-chan struct{}) (<-chan Event, error)
}
// MetricSet for monitoring file integrity.
type MetricSet struct {
mb.BaseMetricSet
config Config
reader EventProducer
scanner EventProducer
log *logp.Logger
// Runtime params that are initialized on Run().
bucket datastore.BoltBucket
scanStart time.Time
scanChan <-chan Event
fsnotifyChan <-chan Event
}
// New returns a new file.MetricSet.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
config := defaultConfig
if err := base.Module().UnpackConfig(&config); err != nil {
return nil, err
}
r, err := NewEventReader(config)
if err != nil {
return nil, errors.Wrap(err, "failed to initialize file event reader")
}
ms := &MetricSet{
BaseMetricSet: base,
config: config,
reader: r,
log: logp.NewLogger(moduleName),
}
if config.ScanAtStart {
ms.scanner, err = NewFileSystemScanner(config)
if err != nil {
return nil, errors.Wrap(err, "failed to initialize file scanner")
}
}
ms.log.Debugf("Initialized the file event reader. Running as euid=%v", os.Geteuid())
return ms, nil
}
// Run runs the MetricSet. The method will not return control to the caller
// until it is finished (to stop it close the reporter.Done() channel).
func (ms *MetricSet) Run(reporter mb.PushReporterV2) {
if !ms.init(reporter) {
return
}
for ms.fsnotifyChan != nil || ms.scanChan != nil {
select {
case event, ok := <-ms.fsnotifyChan:
if !ok {
ms.fsnotifyChan = nil
continue
}
ms.reportEvent(reporter, &event)
case event, ok := <-ms.scanChan:
if !ok {
ms.scanChan = nil
// When the scan completes purge datastore keys that no longer
// exist on disk based on being older than scanStart.
ms.purgeDeleted(reporter)
continue
}
ms.reportEvent(reporter, &event)
case <-reporter.Done():
return
}
}
}
// Close cleans up the MetricSet when it finishes.
func (ms *MetricSet) Close() error {
if ms.bucket != nil {
return ms.bucket.Close()
}
return nil
}
func (ms *MetricSet) init(reporter mb.PushReporterV2) bool {
bucket, err := datastore.OpenBucket(bucketName)
if err != nil {
err = errors.Wrap(err, "failed to open persistent datastore")
reporter.Error(err)
ms.log.Errorw("Failed to initialize", "error", err)
return false
}
ms.bucket = bucket.(datastore.BoltBucket)
ms.fsnotifyChan, err = ms.reader.Start(reporter.Done())
if err != nil {
err = errors.Wrap(err, "failed to start fsnotify event producer")
reporter.Error(err)
ms.log.Errorw("Failed to initialize", "error", err)
return false
}
ms.scanStart = time.Now().UTC()
if ms.scanner != nil {
ms.scanChan, err = ms.scanner.Start(reporter.Done())
if err != nil {
err = errors.Wrap(err, "failed to start file scanner")
reporter.Error(err)
ms.log.Errorw("Failed to initialize", "error", err)
return false
}
}
return true
}
func (ms *MetricSet) reportEvent(reporter mb.PushReporterV2, event *Event) bool {
if len(event.errors) == 1 {
ms.log.Debugw("Error in event", "file_path", event.Path,
"action", event.Action, "error", event.errors[0])
} else if len(event.errors) > 1 {
ms.log.Debugw("Multiple errors in event", "file_path", event.Path,
"action", event.Action, "errors", event.errors)
}
changed, lastEvent := ms.hasFileChangedSinceLastEvent(event)
if changed {
// Publish event if it changed.
if ok := reporter.Event(buildMetricbeatEvent(event, lastEvent != nil)); !ok {
return false
}
}
// Persist event locally.
if event.Info == nil {
if err := ms.bucket.Delete(event.Path); err != nil {
ms.log.Errorw("Failed during DB delete", "error", err)
}
} else {
if err := store(ms.bucket, event); err != nil {
ms.log.Errorw("Failed during DB store", "error", err)
}
}
return true
}
func (ms *MetricSet) hasFileChangedSinceLastEvent(event *Event) (changed bool, lastEvent *Event) {
// Load event from DB.
lastEvent, err := load(ms.bucket, event.Path)
if err != nil {
ms.log.Warnw("Failed during DB load", "error", err)
return true, lastEvent
}
action, changed := diffEvents(lastEvent, event)
if event.Action == 0 {
event.Action = action
}
if changed {
ms.log.Debugw("File changed since it was last seen",
"file_path", event.Path, "took", event.rtt,
logp.Namespace("event"), "old", lastEvent, "new", event)
}
return changed, lastEvent
}
func (ms *MetricSet) purgeDeleted(reporter mb.PushReporterV2) {
for _, prefix := range ms.config.Paths {
deleted, err := ms.purgeOlder(ms.scanStart, prefix)
if err != nil {
ms.log.Errorw("Failure while purging older records", "error", err)
continue
}
for _, e := range deleted {
// Don't persist!
if !ms.config.IsExcludedPath(e.Path) {
reporter.Event(buildMetricbeatEvent(e, true))
}
}
}
}
// Datastore utility functions.
// purgeOlder does a prefix scan of the keys in the datastore and purges items
// older than the specified time.
func (ms *MetricSet) purgeOlder(t time.Time, prefix string) ([]*Event, error) {
var (
deleted []*Event
totalKeys uint64
p = []byte(prefix)
matchesPrefix = func(path []byte) bool {
// XXX: This match may need to be smarter to accommodate multiple
// metricset instances working on similar paths (e.g. /a and /a/b)
// or when recursion is allowed.
return bytes.HasPrefix(path, p)
}
startTime = time.Now()
)
err := ms.bucket.Update(func(b *bolt.Bucket) error {
c := b.Cursor()
for path, v := c.Seek(p); path != nil && matchesPrefix(path); path, v = c.Next() {
totalKeys++
if fbIsEventTimestampBefore(v, t) {
if err := c.Delete(); err != nil {
return err
}
deleted = append(deleted, &Event{
Timestamp: t,
Action: Deleted,
Path: string(path),
})
}
}
return nil
})
took := time.Since(startTime)
ms.log.With(
"file_path", prefix,
"took", took,
"items_total", totalKeys,
"items_deleted", len(deleted)).
Debugf("Purged %v of %v entries in %v for %v", len(deleted), totalKeys,
time.Since(startTime), prefix)
return deleted, err
}
// store stores and Event in the given Bucket.
func store(b datastore.Bucket, e *Event) error {
builder, release := fbGetBuilder()
defer release()
data := fbEncodeEvent(builder, e)
if err := b.Store(e.Path, data); err != nil {
return errors.Wrapf(err, "failed to locally store event for %v", e.Path)
}
return nil
}
// load loads an Event from the datastore. It return a nil Event if the key was
// not found. It returns an error if there was a failure reading from the
// datastore or decoding the data.
func load(b datastore.Bucket, path string) (*Event, error) {
var e *Event
err := b.Load(path, func(blob []byte) error {
e = fbDecodeEvent(path, blob)
return nil
})
if err != nil {
return nil, errors.Wrapf(err, "failed to load locally persisted event for %v", path)
}
return e, nil
}