-
Notifications
You must be signed in to change notification settings - Fork 1
/
filereader.go
177 lines (145 loc) · 3.98 KB
/
filereader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package storage
import (
"bufio"
"bytes"
"fmt"
"io"
"io/ioutil"
"os"
"github.com/docker/distribution/context"
storagedriver "github.com/docker/distribution/registry/storage/driver"
)
// TODO(stevvooe): Set an optimal buffer size here. We'll have to
// understand the latency characteristics of the underlying network to
// set this correctly, so we may want to leave it to the driver. For
// out of process drivers, we'll have to optimize this buffer size for
// local communication.
const fileReaderBufferSize = 4 << 20
// remoteFileReader provides a read seeker interface to files stored in
// storagedriver. Used to implement part of layer interface and will be used
// to implement read side of LayerUpload.
type fileReader struct {
driver storagedriver.StorageDriver
ctx context.Context
// identifying fields
path string
size int64 // size is the total size, must be set.
// mutable fields
rc io.ReadCloser // remote read closer
brd *bufio.Reader // internal buffered io
offset int64 // offset is the current read offset
err error // terminal error, if set, reader is closed
}
// newFileReader initializes a file reader for the remote file. The reader
// takes on the size and path that must be determined externally with a stat
// call. The reader operates optimistically, assuming that the file is already
// there.
func newFileReader(ctx context.Context, driver storagedriver.StorageDriver, path string, size int64) (*fileReader, error) {
return &fileReader{
ctx: ctx,
driver: driver,
path: path,
size: size,
}, nil
}
func (fr *fileReader) Read(p []byte) (n int, err error) {
if fr.err != nil {
return 0, fr.err
}
rd, err := fr.reader()
if err != nil {
return 0, err
}
n, err = rd.Read(p)
fr.offset += int64(n)
// Simulate io.EOR error if we reach filesize.
if err == nil && fr.offset >= fr.size {
err = io.EOF
}
return n, err
}
func (fr *fileReader) Seek(offset int64, whence int) (int64, error) {
if fr.err != nil {
return 0, fr.err
}
var err error
newOffset := fr.offset
switch whence {
case os.SEEK_CUR:
newOffset += int64(offset)
case os.SEEK_END:
newOffset = fr.size + int64(offset)
case os.SEEK_SET:
newOffset = int64(offset)
}
if newOffset < 0 {
err = fmt.Errorf("cannot seek to negative position")
} else {
if fr.offset != newOffset {
fr.reset()
}
// No problems, set the offset.
fr.offset = newOffset
}
return fr.offset, err
}
func (fr *fileReader) Close() error {
return fr.closeWithErr(fmt.Errorf("fileReader: closed"))
}
// reader prepares the current reader at the lrs offset, ensuring its buffered
// and ready to go.
func (fr *fileReader) reader() (io.Reader, error) {
if fr.err != nil {
return nil, fr.err
}
if fr.rc != nil {
return fr.brd, nil
}
// If we don't have a reader, open one up.
rc, err := fr.driver.ReadStream(fr.ctx, fr.path, fr.offset)
if err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
// NOTE(stevvooe): If the path is not found, we simply return a
// reader that returns io.EOF. However, we do not set fr.rc,
// allowing future attempts at getting a reader to possibly
// succeed if the file turns up later.
return ioutil.NopCloser(bytes.NewReader([]byte{})), nil
default:
return nil, err
}
}
fr.rc = rc
if fr.brd == nil {
fr.brd = bufio.NewReaderSize(fr.rc, fileReaderBufferSize)
} else {
fr.brd.Reset(fr.rc)
}
return fr.brd, nil
}
// resetReader resets the reader, forcing the read method to open up a new
// connection and rebuild the buffered reader. This should be called when the
// offset and the reader will become out of sync, such as during a seek
// operation.
func (fr *fileReader) reset() {
if fr.err != nil {
return
}
if fr.rc != nil {
fr.rc.Close()
fr.rc = nil
}
}
func (fr *fileReader) closeWithErr(err error) error {
if fr.err != nil {
return fr.err
}
fr.err = err
// close and release reader chain
if fr.rc != nil {
fr.rc.Close()
}
fr.rc = nil
fr.brd = nil
return fr.err
}