This repository has been archived by the owner on Jul 27, 2022. It is now read-only.
/
readahead.go
265 lines (244 loc) · 7.86 KB
/
readahead.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
// Copyright 2016 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package readahead provides readers which enable concurrent read from seekable or compressed files.
// It's useful when reading from a network file system (like Google Cloud Storage).
package readahead // import "github.com/google/readahead"
import (
"errors"
"io"
"sync"
"github.com/golang/glog"
)
// reader takes a ReaderAt and reads ahead concurrently, while providing io.Reader functionality.
type reader struct {
name string
err error
buf []byte
bufToReturn []byte
chunkPool sync.Pool
chunkRespc <-chan *chunkResp
closedc chan<- struct{}
done sync.WaitGroup
}
func makeReader(name string, chunkSize int, chunkAhead int) (chan<- *chunkResp, <-chan struct{}, *reader) {
chunkRespc := make(chan *chunkResp, chunkAhead)
closedc := make(chan struct{})
res := &reader{name: name, chunkRespc: chunkRespc, closedc: closedc}
res.chunkPool.New = func() interface{} {
return make([]byte, chunkSize)
}
res.done.Add(1)
return chunkRespc, closedc, res
}
// NewConcurrentReader creates a new reader with the specified chunk size and number of workers.
// Name is only used for logging. It reads ahead up to chunkAhead chunks of chunkSize with numWorkers
// and tries to maintain the readahead buffer.
func NewConcurrentReader(name string, r io.ReaderAt, chunkSize int, chunkAhead int, numWorkers int) io.ReadCloser {
chunkRespc, closedc, res := makeReader(name, chunkSize, chunkAhead)
go func() {
defer res.done.Done()
runAt(name, r, chunkRespc, closedc, chunkSize, chunkAhead, numWorkers, &res.chunkPool)
}()
return res
}
// NewReader creates a readahead reader. It will read up to chunkAhead chunks of
// chunkSize bytes each and use a separate goroutine for that. It is useful when reading
// from a compressed stream or from network. If an incoming stream supports io.ReaderAt,
// NewConcurrentReader is a faster option. Name is only used for logging.
// The resulting reader must be read to EOF or Close must be called to
// prevent memory leaks.
func NewReader(name string, r io.Reader, chunkSize, chunkAhead int) io.ReadCloser {
chunkRespc, closedc, res := makeReader(name, chunkSize, chunkAhead)
go func() {
defer res.done.Done()
run(name, r, chunkRespc, closedc, &res.chunkPool)
}()
return res
}
type chunkResp struct {
off int64
err error
chunk []byte
}
func run(name string, in io.Reader, chunkRespc chan<- *chunkResp, closedc <-chan struct{}, chunkPool *sync.Pool) {
defer close(chunkRespc)
var off int64
for {
chunk := chunkPool.Get().([]byte)
n, err := in.Read(chunk)
glog.V(2).Infof("Read %d bytes, got error %v", n, err)
chunk = chunk[:n]
if err != nil && err != io.EOF {
glog.Errorf("readahead %s: about to report an error %v", name, err)
}
select {
case chunkRespc <- &chunkResp{chunk: chunk, off: off, err: err}:
case <-closedc:
return
}
if err != nil {
return
}
off += int64(len(chunk))
}
}
func runAt(name string, in io.ReaderAt, chunkRespc chan<- *chunkResp, closedc <-chan struct{}, chunkSize, chunkAhead, numWorkers int, chunkPool *sync.Pool) {
// runAt sets up a pipeline of Goroutines:
// - A goroutine to write read offsets to reqCh
// - numWorkers goroutines to perform reads in parallel, sending results to respCh
// - A goroutine to close respCh when the worker goroutines finish
// - This goroutine reads from respCh until it is closed,
// closing eofCh to halt the first goroutine on the first error.
defer close(chunkRespc)
reqCh := make(chan int64, numWorkers)
respCh := make(chan *chunkResp, numWorkers)
eofCh := make(chan struct{}) // eofCh is closed when we reach EOF or we encounter an error
// Write read offsets to reqCh until eofCh or closedc are closed.
go func() {
defer close(reqCh)
var off int64
for {
glog.V(2).Infof("Attempting to read %d", off)
select {
case reqCh <- off:
case <-eofCh:
glog.V(2).Info("eofCh closed, closing reqCh")
return
case <-closedc:
glog.V(2).Info("closedc closed, closing reqCh")
return
}
off += int64(chunkSize)
}
}()
// Worker goroutines that read from reqCh and write to respCh until reqCh is closed.
var readers sync.WaitGroup
for i := 0; i < numWorkers; i++ {
readers.Add(1)
go func() {
defer readers.Done()
chunkReader(in, reqCh, chunkPool, respCh, closedc)
}()
}
// Close respCh when the worker goroutines are done.
go func() {
readers.Wait()
glog.V(2).Info("Readers finished, closing read response channel")
close(respCh)
}()
// Read from respCh until it or closedc is closed.
reorderChunks(name, respCh, chunkRespc, eofCh, closedc)
// Wait for any remaining chunkReader goroutines to exit to avoid racing
// between ReadAt and Close.
readers.Wait()
}
// reorderChunks consumes out-of-order chunks from respCh and emits
// them in order to chunkRespc. When a chunk is received with an
// error, it closes eofCh to prevent new chunks from being read.
func reorderChunks(name string, respCh <-chan *chunkResp, chunkRespc chan<- *chunkResp, eofCh chan<- struct{}, closedc <-chan struct{}) {
pending := make(map[int64]*chunkResp)
var off int64
for resp := range respCh {
glog.V(2).Infof("Received chunk (off=%d chunk=<%d bytes> err=%v", resp.off, len(resp.chunk), resp.err)
pending[resp.off] = resp
if resp.err != nil && eofCh != nil {
close(eofCh)
eofCh = nil
}
for {
resp, ok := pending[off]
if !ok {
break
}
if resp.err != nil && resp.err != io.EOF {
glog.Errorf("readahead %s: about to report an error %v", name, resp.err)
}
glog.V(2).Infof("Returning chunk (off=%d chunk=<%d bytes> err=%v", resp.off, len(resp.chunk), resp.err)
select {
case chunkRespc <- resp:
case <-closedc:
// N.B. We are aborting here before draining respCh; the other
// goroutines will abort asynchronously on closedc as well.
return
}
delete(pending, off)
off += int64(len(resp.chunk))
}
}
}
func chunkReader(in io.ReaderAt, offs <-chan int64, chunkPool *sync.Pool, res chan<- *chunkResp, closedc <-chan struct{}) {
for off := range offs {
chunk := chunkPool.Get().([]byte)
n, err := in.ReadAt(chunk, off)
if n < len(chunk) && err == nil {
glog.Errorf("ReaderAt semantics violation: len(chunk): %d, nread: %d, err: %v", len(chunk), n, err)
}
if err != nil {
select {
case res <- &chunkResp{off: off, chunk: chunk[:n], err: err}:
case <-closedc:
return
}
continue
}
select {
case res <- &chunkResp{off: off, chunk: chunk}:
case <-closedc:
return
}
}
}
// Read implements io.Reader.
func (r *reader) Read(b []byte) (int, error) {
if len(r.buf) == 0 {
if r.bufToReturn != nil {
r.chunkPool.Put(r.bufToReturn)
r.bufToReturn = nil
}
if r.err != nil {
// Make sure any goroutines are done. Ignore
// the error since it will just be r.err.
r.Close()
return 0, r.err
}
resp, ok := <-r.chunkRespc
if !ok {
glog.Fatal("chunkRespc channel has been closed without a final error")
}
if resp.err != nil {
r.err = resp.err
}
r.buf = resp.chunk
r.bufToReturn = resp.chunk[:cap(resp.chunk)]
}
n := copy(b, r.buf)
r.buf = r.buf[n:]
if len(r.buf) == 0 {
return n, r.err
}
return n, nil
}
func (r *reader) Close() error {
if r.closedc != nil {
close(r.closedc)
r.closedc = nil
}
r.done.Wait()
err := r.err
if err == nil {
r.err = errors.New("readahead reader already closed")
r.buf = nil
}
return err
}