forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 0
/
recursive.go
136 lines (119 loc) · 3.12 KB
/
recursive.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package monitor
import (
"os"
"path/filepath"
"github.com/fsnotify/fsnotify"
"github.com/joeshaw/multierror"
"github.com/pkg/errors"
)
type recursiveWatcher struct {
inner *fsnotify.Watcher
tree FileTree
eventC chan fsnotify.Event
done chan bool
}
func newRecursiveWatcher(inner *fsnotify.Watcher) *recursiveWatcher {
return &recursiveWatcher{
inner: inner,
tree: FileTree{},
eventC: make(chan fsnotify.Event, 1),
}
}
func (watcher *recursiveWatcher) Start() error {
watcher.done = make(chan bool, 1)
go watcher.forwardEvents()
return nil
}
func (watcher *recursiveWatcher) Add(path string) error {
return watcher.addRecursive(path)
}
func (watcher *recursiveWatcher) Close() error {
if watcher.done != nil {
// has been Started(), goroutine takes care of cleanup
close(watcher.done)
return nil
}
// not started
return watcher.close()
}
func (watcher *recursiveWatcher) EventChannel() <-chan fsnotify.Event {
return watcher.eventC
}
func (watcher *recursiveWatcher) ErrorChannel() <-chan error {
return watcher.inner.Errors
}
func (watcher *recursiveWatcher) addRecursive(path string) error {
var errs multierror.Errors
err := filepath.Walk(path, func(path string, info os.FileInfo, err error) error {
if err != nil {
errs = append(errs, errors.Wrapf(watcher.inner.Add(path), "recursion into dir '%s' failed", path))
return nil
}
if info.IsDir() {
if err = watcher.tree.AddDir(path); err == nil {
if err = watcher.inner.Add(path); err != nil {
errs = append(errs, errors.Wrapf(err, "failed adding watcher to '%s'", path))
return nil
}
}
} else {
err = watcher.tree.AddFile(path)
}
return err
})
if err != nil {
errs = append(errs, errors.Wrapf(err, "failed to walk path '%s'", path))
}
return errs.Err()
}
func (watcher *recursiveWatcher) close() error {
close(watcher.eventC)
return watcher.inner.Close()
}
func (watcher *recursiveWatcher) forwardEvents() error {
defer watcher.close()
for {
select {
case <-watcher.done:
return nil
case event, ok := <-watcher.inner.Events:
if !ok {
return nil
}
if event.Name == "" {
continue
}
switch event.Op {
case fsnotify.Create:
if err := watcher.addRecursive(event.Name); err != nil {
watcher.inner.Errors <- errors.Wrapf(err, "unable to recurse path '%s'", event.Name)
}
watcher.tree.Visit(event.Name, PreOrder, func(path string, _ bool) error {
watcher.eventC <- fsnotify.Event{
Name: path,
Op: event.Op,
}
return nil
})
case fsnotify.Remove:
watcher.tree.Visit(event.Name, PostOrder, func(path string, _ bool) error {
watcher.eventC <- fsnotify.Event{
Name: path,
Op: event.Op,
}
return nil
})
watcher.tree.Remove(event.Name)
// Handling rename (move) as a special case to give this recursion
// the same semantics as macOS FSEvents:
// - Removal of a dir notifies removal for all files inside it
// - Moving a dir away sends only one notification for this dir
case fsnotify.Rename:
watcher.tree.Remove(event.Name)
fallthrough
default:
watcher.eventC <- event
}
}
}
}