-
Notifications
You must be signed in to change notification settings - Fork 781
/
file.go
129 lines (105 loc) · 2.58 KB
/
file.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package dependency
import (
"fmt"
"io/ioutil"
"log"
"os"
"strings"
"time"
"github.com/pkg/errors"
)
var (
// Ensure implements
_ Dependency = (*FileQuery)(nil)
// FileQuerySleepTime is the amount of time to sleep between queries, since
// the fsnotify library is not compatible with solaris and other OSes yet.
FileQuerySleepTime = 2 * time.Second
)
// FileQuery represents a local file dependency.
type FileQuery struct {
stopCh chan struct{}
path string
stat os.FileInfo
}
// NewFileQuery creates a file dependency from the given path.
func NewFileQuery(s string) (*FileQuery, error) {
s = strings.TrimSpace(s)
if s == "" {
return nil, fmt.Errorf("file: invalid format: %q", s)
}
return &FileQuery{
stopCh: make(chan struct{}, 1),
path: s,
}, nil
}
// Fetch retrieves this dependency and returns the result or any errors that
// occur in the process.
func (d *FileQuery) Fetch(clients *ClientSet, opts *QueryOptions) (interface{}, *ResponseMetadata, error) {
log.Printf("[TRACE] %s: READ %s", d, d.path)
select {
case <-d.stopCh:
log.Printf("[TRACE] %s: stopped", d)
return "", nil, ErrStopped
case r := <-d.watch(d.stat):
if r.err != nil {
return "", nil, errors.Wrap(r.err, d.String())
}
log.Printf("[TRACE] %s: reported change", d)
data, err := ioutil.ReadFile(d.path)
if err != nil {
return "", nil, errors.Wrap(err, d.String())
}
d.stat = r.stat
return respWithMetadata(string(data))
}
}
// CanShare returns a boolean if this dependency is shareable.
func (d *FileQuery) CanShare() bool {
return false
}
// Stop halts the dependency's fetch function.
func (d *FileQuery) Stop() {
close(d.stopCh)
}
// String returns the human-friendly version of this dependency.
func (d *FileQuery) String() string {
return fmt.Sprintf("file(%s)", d.path)
}
// Type returns the type of this dependency.
func (d *FileQuery) Type() Type {
return TypeLocal
}
type watchResult struct {
stat os.FileInfo
err error
}
// watch watchers the file for changes
func (d *FileQuery) watch(lastStat os.FileInfo) <-chan *watchResult {
ch := make(chan *watchResult, 1)
go func(lastStat os.FileInfo) {
for {
stat, err := os.Stat(d.path)
if err != nil {
select {
case <-d.stopCh:
return
case ch <- &watchResult{err: err}:
return
}
}
changed := lastStat == nil ||
lastStat.Size() != stat.Size() ||
lastStat.ModTime() != stat.ModTime()
if changed {
select {
case <-d.stopCh:
return
case ch <- &watchResult{stat: stat}:
return
}
}
time.Sleep(FileQuerySleepTime)
}
}(lastStat)
return ch
}