/
directory_tracker.go
278 lines (255 loc) · 6.65 KB
/
directory_tracker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
package medorg
import (
"errors"
"fmt"
"io/fs"
"log"
"path/filepath"
"sync"
"sync/atomic"
)
var errorMissingDe = errors.New("missing de when evaluating directory")
type DirectoryTrackerInterface interface {
ErrChan() <-chan error
Start() error
Close()
// You must call the callback after you have finished whatever you are doing that might be
// resource intensive.
VisitFile(dir, file string, d fs.DirEntry, callback func())
Revisit(dir string, visitor func(dm DirectoryEntryInterface, directory string, file string, fileStruct FileStruct) error)
}
type finishedB struct {
cnt uint32
fc chan struct{}
}
func (f *finishedB) Get() bool {
return atomic.LoadUint32(&f.cnt) > 0
}
func (f *finishedB) Set() {
close(f.fc)
atomic.StoreUint32(&f.cnt, 1)
}
func (f *finishedB) Clear() {
f.fc = make(chan struct{})
atomic.StoreUint32(&f.cnt, 0)
}
type DirTracker struct {
directoryCountTotal int64
directoryCountVisited int64
// We do not lock the dm map as we only access it in a single threaded manner
// i.e. only the directory walker or things it calls have access
dm map[string]DirectoryTrackerInterface
newEntry func(dir string) (DirectoryTrackerInterface, error)
lastPath lastPath
tokenChan chan struct{}
wg *sync.WaitGroup
errChan chan error
finished finishedB
}
func makeTokenChan(numOutsanding int) chan struct{} {
tokenChan := make(chan struct{}, numOutsanding)
for i := 0; i < numOutsanding; i++ {
tokenChan <- struct{}{}
}
return tokenChan
}
const NumTrackerOutstanding = 4
// NewDirTracker does what it says
// a dir tracker will walk the supplied directory
// for each directory it finds on its walk it will create a newEntry
// That new entry will then have its visitor called for each file in that directory
// At some later time, we will then close the directory
// There are no guaranetees about when this will happen
func NewDirTracker(dir string, newEntry func(string) (DirectoryTrackerInterface, error)) *DirTracker {
numOutsanding := NumTrackerOutstanding // FIXME expose this
var dt DirTracker
dt.dm = make(map[string]DirectoryTrackerInterface)
dt.newEntry = newEntry
dt.tokenChan = makeTokenChan(numOutsanding)
dt.wg = new(sync.WaitGroup)
dt.errChan = make(chan error)
dt.wg.Add(1) // add one for populateDircount
dt.finished.Clear()
go dt.populateDircount(dir)
go func() {
err := filepath.WalkDir(dir, dt.directoryWalker)
if err != nil {
dt.errChan <- err
}
for _, val := range dt.dm {
val.Close()
}
dt.wg.Wait()
if dt.Total() != dt.Value() {
panic("hadn't actually finished")
}
dt.finished.Set()
close(dt.errChan)
close(dt.tokenChan)
}()
return &dt
}
// ErrChan - returns any errors we encounter
// We retuyrn as a channel as we don't stop on *most* errors
func (dt *DirTracker) ErrChan() <-chan error {
return dt.errChan
}
// Total tracks how many items there are to visit
func (dt *DirTracker) Total() int64 {
return atomic.LoadInt64(&dt.directoryCountTotal)
}
// Value is how far we are though visiting
func (dt *DirTracker) Value() int64 {
return atomic.LoadInt64(&dt.directoryCountVisited)
}
// Finished - have we finished yet?
func (dt *DirTracker) Finished() bool {
return dt.finished.Get()
}
// Finished - have we finished yet?
func (dt *DirTracker) FinishedChan() <-chan struct{} {
return dt.finished.fc
}
func (dt *DirTracker) runChild(de DirectoryTrackerInterface) {
// Start is allowed to consume significant time
// In fact it may directly be the main runner
err := de.Start()
if err != nil {
dt.errChan <- err
}
dt.wg.Done()
}
func (dt *DirTracker) serviceChild(de DirectoryTrackerInterface) {
for err := range de.ErrChan() {
if err != nil {
dt.errChan <- err
}
}
dt.wg.Done()
}
// Should we export this?
// so that clients can not have to recreate them
func (dt *DirTracker) getDirectoryEntry(path string) (DirectoryTrackerInterface, error) {
// Fast path - does it already exist? If so, use it!
de, ok := dt.dm[path]
if ok && de != nil {
return de, nil
}
// Call out to the external function to return a new entry
de, err := dt.newEntry(path)
if err != nil {
return nil, err
}
dt.dm[path] = de
dt.wg.Add(2)
go dt.runChild(de)
go dt.serviceChild(de)
return de, nil
}
func (dt *DirTracker) populateDircount(dir string) {
defer dt.wg.Done()
err := filepath.WalkDir(dir, dt.directoryWalkerPopulateDircount)
if err != nil {
dt.directoryCountTotal = -1
return
}
}
func (dt *DirTracker) directoryWalkerPopulateDircount(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if d.IsDir() {
if isHiddenDirectory(path) {
return filepath.SkipDir
}
log.Println("popping dir", path, dt.Total())
atomic.AddInt64(&dt.directoryCountTotal, 1)
}
_, file := filepath.Split(path)
if file == ".mdSkipDir" {
return filepath.SkipDir
}
return nil
}
func (dt *DirTracker) directoryWalker(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if d.IsDir() {
if isHiddenDirectory(path) {
return filepath.SkipDir
}
log.Println("visiting dir", path, dt.Value())
atomic.AddInt64(&dt.directoryCountVisited, 1)
closerFunc := func(pt string) {
// FIXME we will want this back when we are not revisiting
// de, ok := dt.dm[pt]
// if ok {
// de.Close()
// }
// delete(dt.dm, pt)
}
dt.lastPath.Closer(path, closerFunc)
de, err := dt.getDirectoryEntry(path)
if err != nil {
return fmt.Errorf("%w::%s", err, path)
}
if de == nil {
return fmt.Errorf("%w::%s", errorMissingDe, path)
}
return nil
}
dir, file := filepath.Split(path)
if file == ".mdSkipDir" {
log.Println("Skipping:", dir)
return filepath.SkipDir
}
if dir == "" {
dir = "."
} else {
// We would:
// dir = strings.TrimSuffix(dir, "/")
// but since we always have this suffix(Thanks filepath!), this is faster:
dir = dir[:len(dir)-1]
}
<-dt.tokenChan
callback := func() {
dt.tokenChan <- struct{}{}
}
de, err := dt.getDirectoryEntry(dir)
if err != nil {
return fmt.Errorf("%w::%s", err, path)
}
if de == nil {
return fmt.Errorf("%w::%s", errorMissingDe, path)
}
de.VisitFile(dir, file, d, callback)
return nil
}
func (dt *DirTracker) Revisit(
dir string,
dirVisitor func(dt *DirTracker),
fileVisitor func(dm DirectoryEntryInterface, dir, fn string, fileStruct FileStruct) error,
closer <-chan struct{},
) {
dt.finished.Clear()
defer dt.finished.Set()
atomic.StoreInt64(&dt.directoryCountVisited, 0)
if dirVisitor != nil {
dirVisitor(dt)
}
for path, de := range dt.dm {
if closer != nil {
select {
case _, ok := <-closer:
if !ok {
log.Println("Revisit saw a closer")
return
}
default:
}
}
atomic.AddInt64(&dt.directoryCountVisited, 1)
de.Revisit(path, fileVisitor)
}
}