forked from distribution/distribution
-
Notifications
You must be signed in to change notification settings - Fork 0
/
filewriter.go
198 lines (163 loc) · 4.8 KB
/
filewriter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
package storage
import (
"bufio"
"bytes"
"fmt"
"io"
"os"
storagedriver "github.com/docker/distribution/registry/storage/driver"
)
const (
fileWriterBufferSize = 5 << 20
)
// fileWriter implements a remote file writer backed by a storage driver.
type fileWriter struct {
driver storagedriver.StorageDriver
// identifying fields
path string
// mutable fields
size int64 // size of the file, aka the current end
offset int64 // offset is the current write offset
err error // terminal error, if set, reader is closed
}
type bufferedFileWriter struct {
fileWriter
bw *bufio.Writer
}
// fileWriterInterface makes the desired io compliant interface that the
// filewriter should implement.
type fileWriterInterface interface {
io.WriteSeeker
io.WriterAt
io.ReaderFrom
io.Closer
}
var _ fileWriterInterface = &fileWriter{}
// newFileWriter returns a prepared fileWriter for the driver and path. This
// could be considered similar to an "open" call on a regular filesystem.
func newFileWriter(driver storagedriver.StorageDriver, path string) (*bufferedFileWriter, error) {
fw := fileWriter{
driver: driver,
path: path,
}
if fi, err := driver.Stat(path); err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
// ignore, offset is zero
default:
return nil, err
}
} else {
if fi.IsDir() {
return nil, fmt.Errorf("cannot write to a directory")
}
fw.size = fi.Size()
}
buffered := bufferedFileWriter{
fileWriter: fw,
}
buffered.bw = bufio.NewWriterSize(&buffered.fileWriter, fileWriterBufferSize)
return &buffered, nil
}
// wraps the fileWriter.Write method to buffer small writes
func (bfw *bufferedFileWriter) Write(p []byte) (int, error) {
return bfw.bw.Write(p)
}
// wraps fileWriter.Close to ensure the buffer is flushed
// before we close the writer.
func (bfw *bufferedFileWriter) Close() (err error) {
if err = bfw.Flush(); err != nil {
return err
}
err = bfw.fileWriter.Close()
return err
}
// wraps fileWriter.Seek to ensure offset is handled
// correctly in respect to pending data in the buffer
func (bfw *bufferedFileWriter) Seek(offset int64, whence int) (int64, error) {
if err := bfw.Flush(); err != nil {
return 0, err
}
return bfw.fileWriter.Seek(offset, whence)
}
// wraps bufio.Writer.Flush to allow intermediate flushes
// of the bufferedFileWriter
func (bfw *bufferedFileWriter) Flush() error {
return bfw.bw.Flush()
}
// Write writes the buffer p at the current write offset.
func (fw *fileWriter) Write(p []byte) (n int, err error) {
nn, err := fw.readFromAt(bytes.NewReader(p), -1)
return int(nn), err
}
// WriteAt writes p at the specified offset. The underlying offset does not
// change.
func (fw *fileWriter) WriteAt(p []byte, offset int64) (n int, err error) {
nn, err := fw.readFromAt(bytes.NewReader(p), offset)
return int(nn), err
}
// ReadFrom reads reader r until io.EOF writing the contents at the current
// offset.
func (fw *fileWriter) ReadFrom(r io.Reader) (n int64, err error) {
return fw.readFromAt(r, -1)
}
// Seek moves the write position do the requested offest based on the whence
// argument, which can be os.SEEK_CUR, os.SEEK_END, or os.SEEK_SET.
func (fw *fileWriter) Seek(offset int64, whence int) (int64, error) {
if fw.err != nil {
return 0, fw.err
}
var err error
newOffset := fw.offset
switch whence {
case os.SEEK_CUR:
newOffset += int64(offset)
case os.SEEK_END:
newOffset = fw.size + int64(offset)
case os.SEEK_SET:
newOffset = int64(offset)
}
if newOffset < 0 {
err = fmt.Errorf("cannot seek to negative position")
} else {
// No problems, set the offset.
fw.offset = newOffset
}
return fw.offset, err
}
// Close closes the fileWriter for writing.
// Calling it once is valid and correct and it will
// return a nil error. Calling it subsequent times will
// detect that fw.err has been set and will return the error.
func (fw *fileWriter) Close() error {
if fw.err != nil {
return fw.err
}
fw.err = fmt.Errorf("filewriter@%v: closed", fw.path)
return nil
}
// readFromAt writes to fw from r at the specified offset. If offset is less
// than zero, the value of fw.offset is used and updated after the operation.
func (fw *fileWriter) readFromAt(r io.Reader, offset int64) (n int64, err error) {
if fw.err != nil {
return 0, fw.err
}
var updateOffset bool
if offset < 0 {
offset = fw.offset
updateOffset = true
}
nn, err := fw.driver.WriteStream(fw.path, offset, r)
if updateOffset {
// We should forward the offset, whether or not there was an error.
// Basically, we keep the filewriter in sync with the reader's head. If an
// error is encountered, the whole thing should be retried but we proceed
// from an expected offset, even if the data didn't make it to the
// backend.
fw.offset += nn
if fw.offset > fw.size {
fw.size = fw.offset
}
}
return nn, err
}