forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 0
/
eventreader_fsnotify.go
90 lines (76 loc) · 1.81 KB
/
eventreader_fsnotify.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
// +build linux freebsd openbsd netbsd windows darwin
package file
import (
"syscall"
"time"
"github.com/fsnotify/fsnotify"
"github.com/elastic/beats/libbeat/logp"
)
type reader struct {
watcher *fsnotify.Watcher
config Config
eventC chan Event
}
// NewEventReader creates a new EventProducer backed by fsnotify.
func NewEventReader(c Config) (EventProducer, error) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
return &reader{
watcher: watcher,
config: c,
eventC: make(chan Event, 1),
}, nil
}
func (r *reader) Start(done <-chan struct{}) (<-chan Event, error) {
for _, p := range r.config.Paths {
if err := r.watcher.Add(p); err != nil {
if err == syscall.EMFILE {
logp.Warn("%v Failed to watch %v: %v (check the max number of "+
"open files allowed with 'ulimit -a')", logPrefix, p, err)
} else {
logp.Warn("%v Failed to watch %v: %v", logPrefix, p, err)
}
}
}
go r.consumeEvents()
return r.eventC, nil
}
func (r *reader) consumeEvents() {
defer close(r.eventC)
defer r.watcher.Close()
for {
select {
case event := <-r.watcher.Events:
if event.Name == "" {
continue
}
debugf("Received fsnotify event: path=%v action=%v",
event.Name, event.Op.String())
start := time.Now()
e := NewEvent(event.Name, opToAction(event.Op), SourceFSNotify,
r.config.MaxFileSizeBytes, r.config.HashTypes)
e.rtt = time.Since(start)
r.eventC <- e
case err := <-r.watcher.Errors:
logp.Warn("%v fsnotify watcher error: %v", logPrefix, err)
}
}
}
func opToAction(op fsnotify.Op) Action {
switch op {
case fsnotify.Create:
return Created
case fsnotify.Write:
return Updated
case fsnotify.Remove:
return Deleted
case fsnotify.Rename:
return Moved
case fsnotify.Chmod:
return AttributesModified
default:
return 0
}
}