/
dedup.go
210 lines (192 loc) · 5.31 KB
/
dedup.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
// Package dedup exposes primitives for detecting files with duplicate checksums
// from a list of file paths.
package dedup
import (
"bufio"
"fmt"
"io"
"os"
"runtime"
"strings"
"sync"
"github.com/bdragon/dedup/filesys"
)
// Options groups configuration options for Filter and FilterDir.
type Options struct {
FollowSymlinks bool // Follow symbolic links.
Recursive bool // Recurse if reading from a directory.
ExitOnError bool // Stop if an error occurs.
ExitOnDup bool // Stop if a file with a previously-seen checksum is found.
Cancel <-chan struct{} // Close to signal cancellation.
UniqWriter io.Writer // Write paths of files with previously-unseen checksums.
DupWriter io.Writer // Write paths of files with previously-seen checksums.
ErrWriter io.Writer // Write errors.
fs filesys.FileSystem
}
// Errors implements the error interface for a slice of errors.
type Errors []error
func (el Errors) Error() string {
s := make([]string, len(el))
for i, err := range el {
s[i] = err.Error()
}
return strings.Join(s, "\n")
}
// Filter reads newline-delimited file paths from r, evaluates each file in
// search of duplicate checksums, and returns a *Sums and any error(s) that
// may have occurred during evaluation. If err is non-nil, its type will be
// Errors.
func Filter(r io.Reader, opts *Options) (*Sums, error) {
if opts.fs == nil {
opts.fs = filesys.OS()
}
f := newChanFilter(readLines(r), maxProcs, opts)
return run(f, opts)
}
// FilterDir is like Filter except it reads file paths from the directory
// located at path.
func FilterDir(path string, opts *Options) (*Sums, error) {
if opts.fs == nil {
opts.fs = filesys.OS()
}
f := newDirFilter(path, opts)
return run(f, opts)
}
// run starts and monitors the specified filter and returns f.Sums() and any
// error(s) that may have occurred. If err is non-nil, it will be of type
// Errors; if ExitOnError is true, err will contain the first error that
// occurred, otherwise it will contain all errors encountered during
// evaluation.
func run(f filter, opts *Options) (sums *Sums, err error) {
var errors Errors
f.Start()
loop:
for {
select {
case <-opts.Cancel:
f.Cancel()
break loop
case err, ok := <-f.Err():
if !ok {
break loop
}
if opts.ErrWriter != nil {
_, _ = fmt.Fprintln(opts.ErrWriter, err)
}
errors = append(errors, err)
if opts.ExitOnError {
f.Cancel()
break loop
}
case path, ok := <-f.Dup():
if !ok {
break loop
}
if opts.DupWriter != nil {
_, _ = fmt.Fprintln(opts.DupWriter, path)
}
if opts.ExitOnDup {
f.Cancel()
break loop
}
case path, ok := <-f.Uniq():
if !ok {
break loop
}
if opts.UniqWriter != nil {
_, _ = fmt.Fprintln(opts.UniqWriter, path)
}
}
}
sums = f.Sums()
if len(errors) > 0 {
err = errors
}
return
}
// signal provides a broadcast mechanism by exposing a receive-only channel
// that is guaranteed to be closed only once, when Once is called.
type signal struct {
c chan struct{}
once *sync.Once
}
func newSignal() *signal {
return &signal{
c: make(chan struct{}),
once: new(sync.Once),
}
}
// C returns a receive-only view of the channel managed by s. Subscribers
// will receive the zero value for the channel when s.Once is called.
func (s *signal) C() <-chan struct{} { return s.c }
// Once closes the channel managed by s the first time it is called.
// Subsequent calls of Once have no effect.
func (s *signal) Once() {
s.once.Do(func() { close(s.c) })
}
// readLines returns an unbuffered channel on which newline-delimited text
// lines read from r are sent. The channel is closed when all lines have been
// read from r.
func readLines(r io.Reader) <-chan string {
c := make(chan string)
go func() {
defer close(c)
s := bufio.NewScanner(r)
for s.Scan() {
if line := s.Text(); line != "" {
c <- line
} else {
break
}
}
}()
return c
}
// lstat wraps fs.Lstat, resolving symbolic links if followSymlinks is true.
// If path is a symbolic link, info will be the os.FileInfo of the linked
// file and newPath will be its path; otherwise, info will be the os.FileInfo
// of the file located at path, and newPath will be equal to path.
func lstat(fs filesys.FileSystem, path string, followSymlinks bool) (info os.FileInfo, newPath string, err error) {
info, err = fs.Lstat(path)
if err != nil {
return
}
newPath = path
if followSymlinks && info.Mode()&os.ModeSymlink == os.ModeSymlink {
newPath, err = fs.Readlink(path)
if err == nil {
info, err = fs.Lstat(newPath)
}
}
return
}
// mergeErrors returns a receive-only channel on which errors received from
// each channel in ins are sent. The channel will be closed once all values
// have been received from each channel in ins.
func mergeErrors(ins ...<-chan error) <-chan error {
var wg sync.WaitGroup
out := make(chan error)
multiplex := func(in <-chan error) {
defer wg.Done()
for err := range in {
out <- err
}
}
wg.Add(len(ins))
for _, in := range ins {
go multiplex(in)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
var maxProcs = runtime.GOMAXPROCS(0)
// ratioMaxProcs returns the greater of runtime.GOMAXPROCS(0)*n/d and 1.
func ratioMaxProcs(n, d int) int {
if x := maxProcs * n / d; x >= 1 {
return x
}
return 1
}