-
Notifications
You must be signed in to change notification settings - Fork 3
/
uploader-stages.go
388 lines (364 loc) · 13.9 KB
/
uploader-stages.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
package pipeline
import (
"crypto/md5"
"encoding/hex"
"encoding/json"
"fmt"
"github.com/ibmjstart/swiftlygo/auth"
"io"
"strings"
"time"
)
// BuildChunks sends back a channel of FileChunk structs
// each with Size of chunkSize or less and each with its
// Number field set sequentially from 0 upward. It also returns
// the number of chunks that it will yield on the channel. The Size
// of each chunk will be less than chunkSize when the final chunk
// doesn't need to be chunkSize to contain the remainder of the data.
// Both dataSize and chunkSize need to be greater than zero, and
// chunkSize must not be larger than dataSize
func BuildChunks(dataSize, chunkSize uint) (<-chan FileChunk, uint) {
chunks := make(chan FileChunk)
if dataSize < 1 || chunkSize < 1 || chunkSize > dataSize {
close(chunks)
return chunks, 0
}
numChunks := dataSize / chunkSize
if dataSize%chunkSize != 0 {
numChunks++
}
go func() {
defer close(chunks)
var currentChunkNumber uint
for currentChunkNumber*chunkSize < dataSize {
chunks <- FileChunk{
Number: currentChunkNumber,
Size: min(dataSize-currentChunkNumber*chunkSize, chunkSize),
Offset: currentChunkNumber * chunkSize,
}
currentChunkNumber++
}
}()
return chunks, numChunks
}
func min(a, b uint) uint {
if a < b {
return a
}
return b
}
// ReadData populates the FileChunk structs that come in on the chunks channel
// with the data from the dataSource corresponding to that chunk's region
// of the file and sends its errors back on the errors channel. In order to work
// ReadData needs chunks with the Size and Offset properties set.
//
// Deprecated: This consumes unnecessary memory. Use ReadHashAndUpload instead.
func ReadData(chunks <-chan FileChunk, errors chan<- error, dataSource io.ReaderAt) <-chan FileChunk {
var dataBuffer []byte
return Map(chunks, errors, func(chunk FileChunk) (FileChunk, error) {
if chunk.Size < 1 {
return chunk, fmt.Errorf("ReadData needs chunks with the Size and Number properties set. Encountered chunk %d with no size", chunk.Number)
}
dataBuffer = make([]byte, chunk.Size)
bytesRead, err := dataSource.ReadAt(dataBuffer, int64(chunk.Offset))
if err != nil {
return chunk, err
} else if uint(bytesRead) != chunk.Size {
return chunk, fmt.Errorf("Expected to read %d bytes, but only read %d for chunk %d", chunk.Size, bytesRead, chunk.Number)
}
chunk.Data = dataBuffer
return chunk, nil
})
}
// ObjectNamer assigns names to objects based on their Size and Number.
// Use a Printf style string to format the names, and use %[1]d to refer
// to the Number and %[2]d to refer to the size.
func ObjectNamer(chunks <-chan FileChunk, errors chan<- error, nameFormat string) <-chan FileChunk {
return Map(chunks, errors, func(chunk FileChunk) (FileChunk, error) {
chunk.Object = fmt.Sprintf(nameFormat, chunk.Number, chunk.Size)
if strings.Contains(chunk.Object, "%!(EXTRA") {
chunk.Object = strings.Split(chunk.Object, "%!(EXTRA")[0]
}
return chunk, nil
})
}
// Containerizer assigns each FileChunk the provided container.
func Containerizer(chunks <-chan FileChunk, errors chan<- error, container string) <-chan FileChunk {
return Map(chunks, errors, func(chunk FileChunk) (FileChunk, error) {
chunk.Container = container
return chunk, nil
})
}
// HashData attaches the hash of a FileChunk's data. Do not give it FileChunks without
// Data attached. It returns errors if you do.
//
// Deprecated: This consumes unnecessary memory. Use ReadHashAndUpload instead.
func HashData(chunks <-chan FileChunk, errors chan<- error) <-chan FileChunk {
return Map(chunks, errors, func(chunk FileChunk) (FileChunk, error) {
if len(chunk.Data) < 1 {
return chunk, fmt.Errorf("Chunks should have data before being hashed, chunk %d lacks data", chunk.Number)
}
sum := md5.Sum(chunk.Data)
chunk.Hash = hex.EncodeToString(sum[:])
return chunk, nil
})
}
// UploadData sends FileChunks to object storage via the provided destination. It places
// the objects in their Container with their Object name and checks the md5 of the upload,
// retrying on failure. It requires all fields of the FileChunk to be filled out before
// attempting an upload, and will send errors if it encountes FileChunks with missing
// fields. The retry wait is the base wait before a retry is attempted.
//
// Deprecated: This consumes unnecessary memory. Use ReadHashAndUpload instead.
func UploadData(chunks <-chan FileChunk, errors chan<- error, dest auth.Destination, retryWait time.Duration) <-chan FileChunk {
const maxAttempts = 5
dataChunks := make(chan FileChunk)
// attempt makes a single pass at uploading the data from a chunk and returns an error
// if it fails.
attempt := func(chunk *FileChunk) error {
upload, err := dest.CreateFile(chunk.Container, chunk.Object, true, chunk.Hash)
if err != nil {
return fmt.Errorf("Err creating upload for chunk %d: %s", chunk.Number, err)
}
written, err := upload.Write(chunk.Data)
if err != nil {
return fmt.Errorf("Err uploading data for chunk %d: %s", chunk.Number, err)
}
if uint(written) != chunk.Size {
return fmt.Errorf("Problem uploading chunk %d, uploaded %d bytes but chunk is %d bytes long", chunk.Number, written, chunk.Size)
}
err = upload.Close()
if err != nil {
return fmt.Errorf("Err closing upload for chunk %d: %s", chunk.Number, err)
}
return nil
}
// retry reattempts uploads on an exponential backoff and aggregates the
// errors that occur. If all upload attempts fail, all errors are concatenated
// together and sent. If the retryWait parameter of UploadData is set to zero,
// there is no wait between retries (this is useful for testing).
retry := func(chunk *FileChunk) {
defer func() {
chunk.Data = nil // Garbage-collect the data
}()
var sleep uint = 1
for err := attempt(chunk); err != nil; sleep++ { // retry
errors <- err
if sleep >= maxAttempts {
errors <- fmt.Errorf("Final upload attempt for chunk %d failed after %d retries ", chunk.Number, sleep)
return
}
time.Sleep(retryWait * (1 << sleep))
err = attempt(chunk)
}
}
go func() {
defer close(dataChunks)
for chunk := range chunks {
if chunk.Size < 1 || uint(len(chunk.Data)) != chunk.Size ||
chunk.Object == "" || chunk.Container == "" || chunk.Hash == "" {
errors <- fmt.Errorf("Chunk %d is missing required data", chunk.Number)
continue
}
retry(&chunk)
dataChunks <- chunk
}
}()
return dataChunks
}
// ManifestBuilder accepts FileChunks and creates SLO manifests out of them. If there are more than
// 1000 chunks, it will emit multiple FileChunks, each of which contains an SLO manifest for that region
// of the file. The FileChunks that are emitted have a Number (which is their manifest number), Data
// (the JSON of the manifest), and a Size (number of bytes in manifest JSON). They will need to be
// assigned and Object and Container before they can be uploaded.
func ManifestBuilder(chunks <-chan FileChunk, errors chan<- error) <-chan FileChunk {
manifestOut := make(chan FileChunk)
go func() {
defer close(manifestOut)
var masterManifest []FileChunk
for chunk := range chunks {
//chunk numbers are zero based, but lengths are 1-based
for chunk.Number+1 > uint(len(masterManifest)) {
temp := make([]FileChunk, chunk.Number+1)
copy(temp, masterManifest)
masterManifest = temp
}
masterManifest[chunk.Number] = chunk
}
for i := 0; i*1000 < len(masterManifest); i++ {
var (
data []FileChunk
apparentSize uint
etags string
)
if (i+1)*1000 >= len(masterManifest) {
data = masterManifest[i*1000:]
} else {
data = masterManifest[i*1000 : (i+1)*1000]
}
for _, chunk := range data {
etags += chunk.Hash
apparentSize += chunk.Size
}
sum := md5.Sum([]byte(etags))
json, err := json.Marshal(data)
if err != nil {
errors <- fmt.Errorf("Error generating JSON manifest for manifest %d: %s", i, err)
continue
}
manifestOut <- FileChunk{
Hash: hex.EncodeToString(sum[:]),
Number: uint(i),
Data: json,
Size: apparentSize,
}
}
}()
return manifestOut
}
// UploadManifests treats the incoming FileChunks as manifests and uploads them with the special
// SLO manifest headers.
func UploadManifests(manifests <-chan FileChunk, errors chan<- error, dest auth.Destination) <-chan FileChunk {
return Map(manifests, errors, func(manifest FileChunk) (FileChunk, error) {
err := dest.CreateSLO(manifest.Container, manifest.Object, manifest.Hash, manifest.Data)
if err != nil {
return manifest, fmt.Errorf("Problem uploading manifest file: %s", err)
}
return manifest, nil
})
}
// Json converts the incoming FileChunks into JSON, sending any conversion errors
// back on its errors channel.
func Json(chunks <-chan FileChunk, errors chan<- error) <-chan []byte {
jsonOut := make(chan []byte)
go func() {
defer close(jsonOut)
for chunk := range chunks {
data, err := json.Marshal(chunk)
if err != nil {
errors <- fmt.Errorf("Problem converting chunk %d to JSON: %s", chunk.Number, err)
}
jsonOut <- data
}
}()
return jsonOut
}
// Counter provides basic information on the data that passes through it.
// Be careful to read the outbound Count channel to prevent blocking
// the flow of data through it.
func Counter(chunks <-chan FileChunk) (<-chan FileChunk, <-chan Count) {
outChunks := make(chan FileChunk)
outCount := make(chan Count, 1)
started := time.Now()
current := Count{
Bytes: 0,
Chunks: 0,
}
go func() {
defer close(outChunks)
defer close(outCount)
for chunk := range chunks {
current.Bytes += chunk.Size
current.Chunks++
current.Elapsed = time.Since(started)
outChunks <- chunk
outCount <- current
}
}()
return outChunks, outCount
}
// UploadBufferSize is the size of the data buffer that each ReadHashAndUpload goroutine
// will use to read data from the hard drive. This works best as a multiple of the hard
// drive sector size for an internal hard drive. If using a network-mounted hard drive,
// some experimentation may be needed to find an optimal value.
var UploadBufferSize uint = 1024 * 4
// UploadMaxAttempts is the number of times that each ReadHashAndUpload goroutine will
// retry a failing upload before moving on to the next one.
var UploadMaxAttempts uint = 5
// UploadRetryBaseWait is the shortest time unit that each ReadHashAndUpload goroutine will
// wait between upload attempts.
var UploadRetryBaseWait time.Duration = time.Second
// ReadHashAndUpload reads the data, performs the hash, and uploads it. Its monolithic design isn't very
// modular, but it reads the file and discards the data within a single function, which saves a lot of
// memory. Use this if memory footprint is a major concern.
// ReadHashAndUpload requires that incoming chunks have the Size, Number, Offset, Object, and Container
// properties already set.
func ReadHashAndUpload(chunks <-chan FileChunk, errors chan<- error, dataSource io.ReaderAt, dest auth.Destination) <-chan FileChunk {
// Pre-allocate variables to reduce memory overhead
var (
dataBuffer = make([]byte, UploadBufferSize)
upload auth.WriteCloseHeader
err error
)
return Map(chunks, errors, func(chunk FileChunk) (FileChunk, error) {
// Reject invalid chunks
switch {
case chunk.Size < 1:
return chunk, fmt.Errorf("ReadHashAndUpload needs chunks with the Size and Number properties set. Encountered chunk %d with no size", chunk.Number)
case chunk.Object == "":
return chunk, fmt.Errorf("ReadHashAndUpload encountered chunk %d with no Object Name", chunk.Number)
case chunk.Container == "":
return chunk, fmt.Errorf("ReadHashAndUpload encountered chunk %d with no Container Name", chunk.Number)
}
// Loop until an upload succeeds
RetryLoop:
for attempts := uint(0); true; attempts++ {
// Exit loop if we retry the max times or if we succeed
if attempts > UploadMaxAttempts {
break RetryLoop
} else if attempts > 0 {
if err != nil {
time.Sleep(UploadRetryBaseWait << attempts)
} else {
break RetryLoop
}
}
// Track how many bytes that we've read for the current chunk
var bytesReadTotal int64
// Create the upload for this chunk. Ask the uploader to check the MD5 sum
// itself. We will also compute it because we have no way to access the
// one that the upload computes internally, and we need it to generate
// the manifest file
upload, err = dest.CreateFile(chunk.Container, chunk.Object, true, "")
if err != nil {
errors <- fmt.Errorf("ReadHashAndUpload encountered an error trying to initialize the upload for chunk %d: %s", chunk.Number, err)
continue RetryLoop
}
// Loop until we've read all of the bytes for this chunk
for uint(bytesReadTotal) < chunk.Size {
bytesRead, err := dataSource.ReadAt(dataBuffer, int64(chunk.Offset)+bytesReadTotal)
if err != nil && err != io.EOF {
errors <- fmt.Errorf("Error reading chunk %d: %s", chunk.Number, err)
continue RetryLoop
}
chunkEndDepth := int64(bytesRead)
// If this is the last buffer of data for this chunk, ensure that future slices
// don't catch garbage data at the end of the buffer.
if bytesRemaining := int64(chunk.Size) - bytesReadTotal; bytesRemaining < chunkEndDepth {
chunkEndDepth = bytesRemaining
}
_, err = upload.Write(dataBuffer[:chunkEndDepth]) // Add data to running upload
if err != nil {
errors <- fmt.Errorf("Error uploading chunk %d: %s", chunk.Number, err)
continue RetryLoop
}
// Update the total bytes read
bytesReadTotal += int64(bytesRead)
}
// Finalize upload
err = upload.Close()
if err != nil {
errors <- fmt.Errorf("Error closing upload for chunk %d: %s", chunk.Number, err)
continue RetryLoop
}
// Get final hash for data
headers, err := upload.Headers()
if err != nil {
errors <- fmt.Errorf("Unable to get object headers, can't get hash for chunk %d: %s", chunk.Number, err)
continue RetryLoop
}
chunk.Hash = headers["Etag"]
}
return chunk, nil
})
}