/
collectfiles.go
79 lines (62 loc) · 1.57 KB
/
collectfiles.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package util
import (
"fmt"
"sort"
"sync"
"github.com/archivekeep/archivekeep/archive"
)
const WorkerCount = 128
type FileCollectibleArchive interface {
StoredFiles() ([]string, error)
FileInfo(filename string) (*archive.FileInfo, error)
}
func CollectFiles(a FileCollectibleArchive) ([]*archive.FileInfo, error) {
filenames, err := a.StoredFiles()
if err != nil {
return nil, err
}
return statFiles(a, filenames)
}
func statFiles(a FileCollectibleArchive, allFiles []string) ([]*archive.FileInfo, error) {
fileToStatChan := make(chan string, 120)
fileStatToCollectChan := make(chan *archive.FileInfo, 120)
// TODO: improve error collection
var anyError error
files := make([]*archive.FileInfo, 0, len(allFiles))
filesToCollect := sync.WaitGroup{}
go func() {
for stat := range fileStatToCollectChan {
files = append(files, stat)
filesToCollect.Done()
}
}()
fileToStat := sync.WaitGroup{}
for i := 0; i < WorkerCount; i++ {
go func() {
for filename := range fileToStatChan {
info, err := a.FileInfo(filename)
if err != nil {
anyError = fmt.Errorf("get file info %s: %w", filename, err)
}
filesToCollect.Add(1)
fileStatToCollectChan <- info
fileToStat.Done()
}
}()
}
fileToStat.Add(len(allFiles))
for _, filename := range allFiles {
fileToStatChan <- filename
}
fileToStat.Wait()
filesToCollect.Wait()
close(fileToStatChan)
close(fileStatToCollectChan)
if anyError != nil {
return nil, anyError
}
sort.Slice(files, func(i, j int) bool {
return files[i].Path < files[j].Path
})
return files, nil
}