Skip to content

Commit 05174b6

Browse files
committed
core: object chunks; chunk manifest
* update on-disk structure: - add manifest flags and per-chunk flags - the former to indicate 'completed' - per-chunk checksum is now `cos.Cksum` - remove checksum type from the manifest * rename fields * pass unit test * part five, prev. commit: d6c34c5 Signed-off-by: Alex Aizman <alex.aizman@gmail.com>
1 parent 3ca6382 commit 05174b6

File tree

5 files changed

+223
-101
lines changed

5 files changed

+223
-101
lines changed

ais/s3/mpt.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func ListUploads(all []*core.Ufest, bckName, idMarker string, maxUploads int) *L
2222
results = append(results, UploadInfoResult{
2323
Key: manifest.Lom.ObjName,
2424
UploadID: manifest.ID,
25-
Initiated: manifest.StartTime,
25+
Initiated: manifest.Created,
2626
})
2727
}
2828
}

ais/tgts3mpt.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ func (t *target) putMptPart(w http.ResponseWriter, r *http.Request, items []stri
180180
ecode int
181181
partSHA = r.Header.Get(cos.S3HdrContentSHA256)
182182
checkPartSHA = partSHA != "" && partSHA != cos.S3UnsignedPayload
183-
cksumSHA = &cos.CksumHash{}
183+
cksumSHA *cos.CksumHash
184184
cksumMD5 = &cos.CksumHash{}
185185
remote = bck.IsRemoteS3() || bck.IsRemoteOCI()
186186
)
@@ -238,7 +238,7 @@ func (t *target) putMptPart(w http.ResponseWriter, r *http.Request, items []stri
238238

239239
size := mw.Size()
240240
if size != expectedSize {
241-
err := fmt.Errorf("put-part size mismatch (%d vs %d)", size, expectedSize)
241+
err := fmt.Errorf("part %d size mismatch (%d vs %d)", partNum, size, expectedSize)
242242
s3.WriteMptErr(w, r, err, ecode, lom, uploadID)
243243
return
244244
}
@@ -254,19 +254,21 @@ func (t *target) putMptPart(w http.ResponseWriter, r *http.Request, items []stri
254254
cksumSHA.Finalize()
255255
recvSHA := cos.NewCksum(cos.ChecksumSHA256, partSHA)
256256
if !cksumSHA.Equal(recvSHA) {
257-
detail := fmt.Sprintf("upload %q, %s, part %d", uploadID, lom, partNum)
257+
detail := fmt.Sprintf("part %d", partNum)
258258
err = cos.NewErrDataCksum(&cksumSHA.Cksum, recvSHA, detail)
259259
s3.WriteMptErr(w, r, err, http.StatusInternalServerError, lom, uploadID)
260260
return
261261
}
262262
}
263263

264264
chunk := &core.Uchunk{
265-
MD5: md5,
266-
Path: chunkPath,
267-
Siz: size,
268-
Num: uint16(partNum),
269-
CksumVal: md5,
265+
MD5: md5,
266+
Path: chunkPath,
267+
Siz: size,
268+
Num: uint16(partNum),
269+
}
270+
if checkPartSHA {
271+
chunk.Cksum = &cksumSHA.Cksum
270272
}
271273

272274
// - see NOTE above in re "active uploads in memory"

core/lchunk.go

Lines changed: 94 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -31,25 +31,29 @@ const (
3131
)
3232

3333
const (
34-
chunkNameSepa = "."
35-
defaultChunkCap = 16
34+
chunkNameSepa = "."
35+
36+
iniChunksCap = 16
37+
38+
completed = 1 // Ufest.Flags
3639
)
3740

3841
type (
3942
Uchunk struct {
40-
Siz int64
41-
Path string // (may become v2 _location_)
42-
CksumVal string
43-
Num uint16 // chunk/part number
44-
MD5 string // S3-specific MD5 hash
43+
Siz int64 // size
44+
Path string // (may become v2 _remote location_)
45+
Cksum *cos.Cksum // nil means none; otherwise non-empty
46+
Num uint16 // chunk/part number
47+
Flags uint16 // bit flags (future use)
48+
MD5 string // S3/legacy
4549
}
4650
Ufest struct {
47-
ID string // upload/manifest ID
48-
StartTime time.Time // creation time
49-
Num uint16
50-
CksumTyp string
51-
Chunks []Uchunk
52-
Metadata map[string]string // remote object metadata
51+
ID string // upload/manifest ID
52+
Created time.Time // creation time
53+
Num uint16 // number of chunks (so far)
54+
Flags uint16 // bit flags { completed, ...}
55+
Chunks []Uchunk
56+
Metadata map[string]string // remote object metadata
5357

5458
// runtime state
5559
Lom *LOM
@@ -79,15 +83,16 @@ func NewUfest(id string, lom *LOM) *Ufest {
7983
id = cos.GenTAID(startTime)
8084
}
8185
return &Ufest{
82-
ID: id,
83-
StartTime: startTime,
84-
Chunks: make([]Uchunk, 0, defaultChunkCap),
85-
CksumTyp: cos.ChecksumOneXxh,
86-
Lom: lom,
87-
suffix: chunkNameSepa + id + chunkNameSepa,
86+
ID: id,
87+
Created: startTime,
88+
Chunks: make([]Uchunk, 0, iniChunksCap),
89+
Lom: lom,
90+
suffix: chunkNameSepa + id + chunkNameSepa,
8891
}
8992
}
9093

94+
func (u *Ufest) Completed() bool { return u.Flags&completed == completed }
95+
9196
func (u *Ufest) Lock() { u.mu.Lock() }
9297
func (u *Ufest) Unlock() { u.mu.Unlock() }
9398

@@ -266,6 +271,8 @@ func (u *Ufest) Store(lom *LOM) error {
266271
return fmt.Errorf("%s: failed to store, err: %v", _utag(lom.Cname()), err)
267272
}
268273

274+
u.Flags = completed
275+
269276
// pack
270277
sgl := g.pmm.NewSGL(xattrChunkDflt)
271278
defer sgl.Free()
@@ -311,29 +318,31 @@ func (u *Ufest) validNums() error {
311318
}
312319

313320
func (u *Ufest) pack(w io.Writer) {
321+
var (
322+
b64 [cos.SizeofI64]byte
323+
b16 [cos.SizeofI16]byte
324+
)
325+
314326
// meta-version
315327
w.Write([]byte{umetaver})
316328

317-
// upload ID
329+
// ID
318330
_packStr(w, u.ID)
319331

320-
// start time (Unix nano)
321-
var timeBuf [cos.SizeofI64]byte
322-
binary.BigEndian.PutUint64(timeBuf[:], uint64(u.StartTime.UnixNano()))
323-
w.Write(timeBuf[:])
332+
// creation time
333+
binary.BigEndian.PutUint64(b64[:], uint64(u.Created.UnixNano()))
334+
w.Write(b64[:])
324335

325336
// number of chunks
326-
var buf [cos.SizeofI16]byte
327-
binary.BigEndian.PutUint16(buf[:], u.Num)
328-
w.Write(buf[:])
329-
330-
// checksum type
331-
_packStr(w, u.CksumTyp)
337+
binary.BigEndian.PutUint16(b16[:], u.Num)
338+
w.Write(b16[:])
339+
// flags
340+
binary.BigEndian.PutUint16(b16[:], u.Flags)
341+
w.Write(b16[:])
332342

333343
// metadata map
334-
var metaBuf [cos.SizeofI16]byte
335-
binary.BigEndian.PutUint16(metaBuf[:], uint16(len(u.Metadata)))
336-
w.Write(metaBuf[:])
344+
binary.BigEndian.PutUint16(b16[:], uint16(len(u.Metadata)))
345+
w.Write(b16[:])
337346
for k, v := range u.Metadata {
338347
_packStr(w, k)
339348
_packStr(w, v)
@@ -342,33 +351,42 @@ func (u *Ufest) pack(w io.Writer) {
342351
// chunks
343352
for _, c := range u.Chunks {
344353
// chunk size
345-
var sizeBuf [cos.SizeofI64]byte
346-
binary.BigEndian.PutUint64(sizeBuf[:], uint64(c.Siz))
347-
w.Write(sizeBuf[:])
354+
binary.BigEndian.PutUint64(b64[:], uint64(c.Siz))
355+
w.Write(b64[:])
348356

349-
// chunk path
357+
// path
350358
_packStr(w, c.Path)
351359

352-
// chunk checksum
353-
_packStr(w, c.CksumVal)
360+
// checksum
361+
_packCksum(w, c.Cksum)
354362

355-
// chunk number
356-
var numBuf [cos.SizeofI16]byte
357-
binary.BigEndian.PutUint16(numBuf[:], c.Num)
358-
w.Write(numBuf[:])
363+
// chunk number and flags
364+
binary.BigEndian.PutUint16(b16[:], c.Num)
365+
w.Write(b16[:])
366+
binary.BigEndian.PutUint16(b16[:], c.Flags)
367+
w.Write(b16[:])
359368

360-
// chunk MD5 (S3-specific)
369+
// MD5 (legacy)
361370
_packStr(w, c.MD5)
362371
}
363372
}
364373

365374
func _packStr(w io.Writer, s string) {
366-
var lenBuf [cos.SizeofI16]byte
367-
binary.BigEndian.PutUint16(lenBuf[:], uint16(len(s)))
368-
w.Write(lenBuf[:])
375+
var b16 [cos.SizeofI16]byte
376+
binary.BigEndian.PutUint16(b16[:], uint16(len(s)))
377+
w.Write(b16[:])
369378
w.Write(cos.UnsafeB(s))
370379
}
371380

381+
func _packCksum(w io.Writer, cksum *cos.Cksum) {
382+
if cksum == nil || cksum.IsEmpty() {
383+
_packStr(w, "")
384+
return
385+
}
386+
_packStr(w, cksum.Ty())
387+
_packStr(w, cksum.Val())
388+
}
389+
372390
func (u *Ufest) unpack(data []byte) (err error) {
373391
if len(data) < 1 {
374392
return errors.New(tooShort)
@@ -393,7 +411,7 @@ func (u *Ufest) unpack(data []byte) (err error) {
393411
return errors.New(tooShort)
394412
}
395413
timeNano := int64(binary.BigEndian.Uint64(data[offset:]))
396-
u.StartTime = time.Unix(0, timeNano)
414+
u.Created = time.Unix(0, timeNano)
397415
offset += cos.SizeofI64
398416

399417
// number of chunks
@@ -402,11 +420,12 @@ func (u *Ufest) unpack(data []byte) (err error) {
402420
}
403421
u.Num = binary.BigEndian.Uint16(data[offset:])
404422
offset += cos.SizeofI16
405-
406-
// checksum type
407-
if u.CksumTyp, offset, err = _unpackStr(data, offset); err != nil {
408-
return err
423+
// flags
424+
if len(data) < offset+cos.SizeofI16 {
425+
return errors.New(tooShort)
409426
}
427+
u.Flags = binary.BigEndian.Uint16(data[offset:])
428+
offset += cos.SizeofI16
410429

411430
// metadata map
412431
if len(data) < offset+cos.SizeofI16 {
@@ -449,16 +468,21 @@ func (u *Ufest) unpack(data []byte) (err error) {
449468
}
450469

451470
// chunk checksum
452-
if c.CksumVal, offset, err = _unpackStr(data, offset); err != nil {
471+
if c.Cksum, offset, err = _unpackCksum(data, offset); err != nil {
453472
return err
454473
}
455474

456-
// chunk number
475+
// chunk number and flags
457476
if len(data) < offset+cos.SizeofI16 {
458477
return errors.New(tooShort)
459478
}
460479
c.Num = binary.BigEndian.Uint16(data[offset:])
461480
offset += cos.SizeofI16
481+
if len(data) < offset+cos.SizeofI16 {
482+
return errors.New(tooShort)
483+
}
484+
c.Flags = binary.BigEndian.Uint16(data[offset:])
485+
offset += cos.SizeofI16
462486

463487
// chunk MD5
464488
if c.MD5, offset, err = _unpackStr(data, offset); err != nil {
@@ -483,6 +507,22 @@ func _unpackStr(data []byte, offset int) (string, int, error) {
483507
return str, offset, nil
484508
}
485509

510+
func _unpackCksum(data []byte, offset int) (cksum *cos.Cksum, off int, err error) {
511+
var (
512+
ty, val string
513+
)
514+
ty, off, err = _unpackStr(data, offset)
515+
if err != nil || ty == "" {
516+
return nil, off, err
517+
}
518+
offset = off
519+
val, off, err = _unpackStr(data, offset)
520+
if err == nil {
521+
cksum = cos.NewCksum(ty, val)
522+
}
523+
return
524+
}
525+
486526
//
487527
// additional lom
488528
//

0 commit comments

Comments
 (0)