This repository has been archived by the owner on Jan 12, 2023. It is now read-only.
/
reader.go
276 lines (233 loc) · 7.5 KB
/
reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
package io
import (
"errors"
"fmt"
"io"
"os"
"sync/atomic"
)
// Size for allocated buffers
const bufSize = 32 * 1024
// Actually, this must remain 0 for our sync to work, right now, without pool
const readChannelSize = 0
// A utility structure to associate trees with a reader.
// NOTE: Very similar to RootedReadController !
type RootedReadController struct {
// The trees the controller should write to
Trees []string
// A possibly shared controller which may write to the given tree
Ctrl ReadChannelController
}
// The result of a read operation, similar to what Reader.Read returns
type readResult struct {
buf []byte
n int
err error
}
type ReadChannelController struct {
c chan *ChannelReader
}
// Contains all information about a file or reader to be read
type ChannelReader struct {
// An optional path, which will be opened for reading when Reader is nil
path string
// The mode of the file to read
mode os.FileMode
// A Reader interface, in case Path is unset. Use this if you want to open the file or provide your
// own custom reader
reader io.Reader
// The channel to transport read results
results chan readResult
// Protects the buffer from simulateous access
ready chan bool
// Our buffer
buf []byte
}
// Return amount of streams we handle in parallel
func (r *ReadChannelController) Streams() int {
return cap(r.c)
}
// Return a new channel reader
// You should set either path
// The buffer must not be shared among multiple channel readers !
func (r *ReadChannelController) NewChannelReaderFromPath(path string, mode os.FileMode, buf []byte) *ChannelReader {
// NOTE: size of this channel controls how much we can cache into memory before we block
// as the consumer doesn't keep up
cr := ChannelReader{
path: path,
mode: mode,
buf: buf,
results: make(chan readResult, readChannelSize),
ready: make(chan bool),
}
r.c <- &cr
return &cr
}
func (r *ReadChannelController) NewChannelReaderFromReader(reader io.Reader, buf []byte) *ChannelReader {
cr := ChannelReader{
reader: reader,
buf: buf,
results: make(chan readResult, readChannelSize),
ready: make(chan bool),
}
r.c <- &cr
return &cr
}
// Allows to use a ChannelReader as source for io.Copy operations
// This should be preferred as it will save a copy operation
// WriteTo will block until a Reader is ready to serve us bytes
// Note that the read operation is performed by N reader routines - we just receive the data
// and pass it on
// Also we assume that write blocks until the operation is finished. If you perform non-blocking writes,
// you must copy the buffer !
func (p *ChannelReader) WriteTo(w io.Writer) (n int64, err error) {
// We are just consuming, and assume the channel is closed when the reading is finished
var written int
// initial ready indicator - now remote reader produces result
p.ready <- true
// We will receive results until the other end is done reading
for res := range p.results {
// Write what's possible - don't check for 0, as we also have to deal with empty files
// Without the write call, they wouldn't be created after all.
written, err = w.Write(res.buf)
n += int64(written)
// now we are ready for the next one
// This would block as the remote will stop sending results on error
if res.err == nil {
p.ready <- true
} else {
if res.err != io.EOF {
err = res.err
}
}
// in any case, claim we are done with the result !
if res.n == 0 && res.err == nil {
panic("If 0 bytes have been read, there should at least be an EOF (in case of empty files)")
}
} // for each read result
// whatever is held in n, err, we return
return
}
// Create a new parallel reader with nprocs go-routines and return a channel to it.
// Feed the channel with ChannelReader structures and listen on it's channel to read bytes until EOF, which
// is when the channel will be closed by the reader
// done will allow long reads to be interrupted by closing the channel
func NewReadChannelController(nprocs int, stats *Stats, done <-chan bool) ReadChannelController {
if nprocs < 1 {
panic("nprocs must be >= 1")
}
ctrl := ReadChannelController{
make(chan *ChannelReader, nprocs),
}
reader := func(info *ChannelReader) {
// in any case, close the results channel
defer close(info.results)
defer close(info.ready)
sendError := func(err error) {
// Add one - the client reader will call Done after receiving our result
// We are always required to signal ready before we send
<-info.ready
info.results <- readResult{nil, 0, err}
}
var err error
ourReader := false
if info.reader == nil {
if info.mode&os.ModeSymlink == os.ModeSymlink {
ldest, err := os.Readlink(info.path)
if err != nil {
sendError(err)
return
} else {
// The contents of the link is our result - therefore, we finish it here
<-info.ready
atomic.AddUint64(&stats.BytesRead, uint64(len(ldest)))
if n := copy(info.buf, []byte(ldest)); n != len(ldest) {
panic("Couldn't copy symlink into buffer - was it larger than our buffer ??")
}
info.results <- readResult{info.buf[:len(ldest)], len(ldest), io.EOF}
return
}
} else {
ourReader = true
info.reader, err = os.Open(info.path)
if err != nil {
sendError(err)
return
}
}
}
// Now read until it's done
var nread int
readForever:
for {
// The buffer will be put back by the one reading from the channel (e.g. in WriteTo()) !
// wait until writer from previous iteration is done using the buffer
// Have to ask for it in any case - if we quit this loop, the receiver may stall otherwise
<-info.ready
select {
case <-done:
{
var err error
if ourReader {
err = fmt.Errorf("Reading of '%s' cancelled", info.path)
} else {
err = errors.New("Reading cancelled by user")
}
info.results <- readResult{err: err}
break readForever
}
default:
{
nread, err = info.reader.Read(info.buf)
atomic.AddUint64(&stats.BytesRead, uint64(nread))
info.results <- readResult{info.buf[:nread], nread, err}
// we send all results, but abort if the reader is done for whichever reason
if err != nil {
break readForever
}
}
} // end select
} // readForever
if ourReader {
info.reader.(*os.File).Close()
info.reader = nil
}
}
for i := 0; i < nprocs; i++ {
go func() {
for info := range ctrl.c {
atomic.AddUint32(&stats.FilesBeingRead, uint32(1))
reader(info)
atomic.AddUint32(&stats.FilesBeingRead, ^uint32(0))
atomic.AddUint32(&stats.TotalFilesRead, uint32(1))
}
}()
}
return ctrl
}
type RootedReadControllers []RootedReadController
// A new list of Controllers, one per device it handles, which is associated with the tree's it can handle
func NewDeviceReadControllers(nprocs int, trees []string, stats *Stats, done <-chan bool) RootedReadControllers {
dm := DeviceMap(trees)
res := make(RootedReadControllers, len(dm))
for did, trees := range dm {
// each device as so and so many sources. Each source uses the same read controller
res[did] = RootedReadController{
Trees: trees,
Ctrl: NewReadChannelController(nprocs, stats, done),
}
} // for each tree set in deviceMap
return res
}
// NOTE: Can this be a custom type, with just a function ? I think so !
// Return the number of streams being handled in parallel
func (rctrls RootedReadControllers) Streams() int {
if len(rctrls) == 0 {
panic("Input map was empty")
}
nstreams := 0
for _, rctrl := range rctrls {
nstreams += rctrl.Ctrl.Streams()
}
return nstreams
}