-
Notifications
You must be signed in to change notification settings - Fork 4
/
file_watch_events.go
138 lines (125 loc) · 3.34 KB
/
file_watch_events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package events
import (
"crypto/sha256"
"encoding/hex"
"github.com/fsnotify/fsnotify"
log "github.com/sirupsen/logrus"
"io"
"os"
)
// hashes of files, only propagate event if hash changes
var watchedFilesHashes = make(map[string]string)
func pathHash(fileToHash string) string {
file, err := os.Open(fileToHash)
if err != nil {
return ""
}
defer file.Close()
hash := sha256.New()
_, err = io.Copy(hash, file)
if err != nil {
return ""
}
return hex.EncodeToString(hash.Sum(nil))
}
// only do this once
func StartFileWatch() {
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Errorf("While creating watcher: %s", err)
}
// channel to signify end of watch
endWatch := make(chan bool)
go func() {
log.Debugf("Started file change loop")
for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
if _, err := os.Stat(event.Name); err != nil {
log.Debugf("Error accessing file, eating event: %s", err)
continue
}
log.Tracef("file event: %s", event.Name)
// calculate new hash
newHash := pathHash(event.Name)
// if the new hash was calculated it means there is a file. if the hash didn't exist previously
// or the new hash and the old hash don't match send a message
if oldHash, found := watchedFilesHashes[event.Name]; "" != newHash && (!found || newHash != oldHash) {
// set changed hash
watchedFilesHashes[event.Name] = newHash
// build and publish message
message := &Message{"name": event.Name, "op": event.Op}
Send("file:"+event.Name, message)
log.Debugf("File event: %v", message)
}
case err, ok := <-watcher.Errors:
if !ok {
return
}
log.Errorf("Error: %s", err)
case <-endWatch:
endWatch <- true
log.Debugf("File change watcher event loop closed")
return
}
}
}()
// subscribe for watch requests, this acts as a service which allows
// uncoupled but interested methods to watch files by using the message bus
Listen("file:watch:start", func(message *Message) {
if value, ok := (*message)["path"]; ok {
if path, ok := value.(string); ok {
// remove previous hash
watchedFilesHashes[path] = ""
// ensure removed before new watch
_ = watcher.Remove(path)
// calculate hash
watchedFilesHashes[path] = pathHash(path)
// start watching
err := watcher.Add(path)
if err != nil {
log.Errorf("Failed to watch path: %s", err)
} else {
log.Tracef("Watching file: %s", path)
}
}
}
})
// stop listening to specific files
Listen("file:watch:end", func(message *Message) {
if value, ok := (*message)["path"]; ok {
if path, ok := value.(string); ok {
// remove previous hash
watchedFilesHashes[path] = ""
// ensure removed before new watch
_ = watcher.Remove(path)
}
}
})
// clear all watches
Listen("file:watch:clear", func(message *Message) {
for key := range watchedFilesHashes {
watchedFilesHashes[key] = ""
_ = watcher.Remove(key)
}
})
// close/end all watches
Listen("file:watch:close", func(message *Message) {
endWatch <- true
<-endWatch
close(endWatch)
watchedFilesHashes = nil
err := watcher.Close()
if err != nil {
log.Errorf("Could not close watcher: %s", err)
} else {
log.Debugf("File change watching service ended")
}
})
}
func StopFileWatch() {
Send("file:watch:close", &Message{})
}