-
Notifications
You must be signed in to change notification settings - Fork 8
/
filesystem.go
109 lines (95 loc) · 2.5 KB
/
filesystem.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package watcher
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"runtime"
"github.com/artefactual-labs/enduro/internal/filenotify"
"github.com/fsnotify/fsnotify"
"gocloud.dev/blob"
"gocloud.dev/blob/fileblob"
)
// filesystemWatcher implements a Watcher for watching paths in a local filesystem.
type filesystemWatcher struct {
ctx context.Context
fsw filenotify.FileWatcher
ch chan *fsnotify.Event
path string
*commonWatcherImpl
}
var _ Watcher = (*filesystemWatcher)(nil)
func NewFilesystemWatcher(ctx context.Context, config *FilesystemConfig) (*filesystemWatcher, error) {
stat, err := os.Stat(config.Path)
if err != nil {
return nil, fmt.Errorf("error looking up stat info: %w", err)
}
if !stat.IsDir() {
return nil, errors.New("given path is not a directory")
}
abspath, err := filepath.Abs(config.Path)
if err != nil {
return nil, fmt.Errorf("error generating absolute path of %s: %v", config.Path, err)
}
// The inotify API isn't always available, fall back to polling.
var fsw filenotify.FileWatcher
if config.Inotify && runtime.GOOS != "windows" {
fsw, err = filenotify.New()
} else {
fsw, err = filenotify.NewPollingWatcher()
}
if err != nil {
return nil, fmt.Errorf("error creating filesystem watcher: %w", err)
}
w := &filesystemWatcher{
ctx: ctx,
fsw: fsw,
ch: make(chan *fsnotify.Event, 100),
path: abspath,
commonWatcherImpl: &commonWatcherImpl{
name: config.Name,
pipeline: config.Pipeline,
retentionPeriod: config.RetentionPeriod,
stripTopLevelDir: config.StripTopLevelDir,
},
}
go w.loop()
if err := fsw.Add(abspath); err != nil {
return nil, fmt.Errorf("error configuring filesystem watcher: %w", err)
}
return w, nil
}
func (w *filesystemWatcher) loop() {
for {
select {
case event, ok := <-w.fsw.Events():
if !ok || event.Op != fsnotify.Create {
continue
}
w.ch <- &event
case _, ok := <-w.fsw.Errors():
if !ok {
continue
}
case <-w.ctx.Done():
_ = w.fsw.Close()
close(w.ch)
return
}
}
}
func (w *filesystemWatcher) Watch(ctx context.Context) (*BlobEvent, error) {
fsevent, ok := <-w.ch
if !ok {
return nil, ErrWatchTimeout
}
rel, err := filepath.Rel(w.path, fsevent.Name)
if err != nil {
return nil, fmt.Errorf("error generating relative path of fsvent.Name %s - %w", fsevent.Name, err)
}
return NewBlobEvent(w, rel), nil
}
func (w *filesystemWatcher) OpenBucket(context.Context, *BlobEvent) (*blob.Bucket, error) {
return fileblob.OpenBucket(w.path, nil)
}