Skip to content

Commit 2c2afc1

Browse files
committed
core: object chunks; chunk manifest
* manifest metadata: deterministic ordering when serializing * enforce uint16 max * reset 'completed' on error * part size, prev. commit: 05174b6 Signed-off-by: Alex Aizman <alex.aizman@gmail.com>
1 parent 05174b6 commit 2c2afc1

File tree

4 files changed

+109
-44
lines changed

4 files changed

+109
-44
lines changed

ais/tgtmpt.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,18 @@ type (
3939
}
4040
)
4141

42-
func (ups *uploads) init(id string, lom *core.LOM, metadata map[string]string) {
42+
func (ups *uploads) init(id string, lom *core.LOM, metadata map[string]string) error {
4343
manifest := core.NewUfest(id, lom)
44-
manifest.Metadata = metadata
45-
44+
if err := manifest.SetMeta(metadata); err != nil {
45+
return err
46+
}
4647
ups.Lock()
4748
if ups.m == nil {
4849
ups.m = make(map[string]*core.Ufest, iniCapUploads)
4950
}
5051
ups._add(id, manifest)
5152
ups.Unlock()
53+
return nil
5254
}
5355

5456
func (ups *uploads) _add(id string, manifest *core.Ufest) {

ais/tgts3mpt.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,10 @@ func (t *target) startMpt(w http.ResponseWriter, r *http.Request, items []string
101101
if err != nil {
102102
return
103103
}
104-
t.ups.init(uploadID, lom, metadata)
104+
if err := t.ups.init(uploadID, lom, metadata); err != nil {
105+
s3.WriteErr(w, r, err, 0)
106+
return
107+
}
105108
result := &s3.InitiateMptUploadResult{Bucket: bck.Name, Key: objName, UploadID: uploadID}
106109

107110
sgl := t.gmm.NewSGL(0)
@@ -264,16 +267,14 @@ func (t *target) putMptPart(w http.ResponseWriter, r *http.Request, items []stri
264267
chunk := &core.Uchunk{
265268
MD5: md5,
266269
Path: chunkPath,
267-
Siz: size,
268-
Num: uint16(partNum),
269270
}
270271
if checkPartSHA {
271272
chunk.Cksum = &cksumSHA.Cksum
272273
}
273274

274275
// - see NOTE above in re "active uploads in memory"
275276
// - TODO: this is the place to call Ufest.Store(partial manifest)
276-
if err := manifest.Add(chunk); err != nil {
277+
if err := manifest.Add(chunk, size, int64(partNum)); err != nil {
277278
s3.WriteMptErr(w, r, err, 0, lom, uploadID)
278279
return
279280
}
@@ -400,7 +401,7 @@ func (t *target) completeMpt(w http.ResponseWriter, r *http.Request, items []str
400401
nparts[i] = c
401402
}
402403

403-
metadata := manifest.Metadata
404+
metadata := manifest.GetMeta()
404405
manifest.Unlock()
405406

406407
// 2. Create final work file

core/lchunk_test.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/NVIDIA/aistore/api/apc"
1212
"github.com/NVIDIA/aistore/cmn"
1313
"github.com/NVIDIA/aistore/cmn/cos"
14+
"github.com/NVIDIA/aistore/cmn/debug"
1415
"github.com/NVIDIA/aistore/core"
1516
"github.com/NVIDIA/aistore/core/meta"
1617
"github.com/NVIDIA/aistore/core/mock"
@@ -26,6 +27,13 @@ const (
2627
xattrChunk = "user.ais.chunk"
2728
)
2829

30+
var manifestMeta = map[string]string{
31+
"content-type": "application/octet-stream",
32+
"storage-class": "STANDARD",
33+
"cache-control": "no-cache",
34+
"user-agent": "ais-client/1.0",
35+
}
36+
2937
var _ = Describe("Chunk Manifest Xattrs", func() {
3038
const (
3139
tmpDir = "/tmp/chunk_xattr_test"
@@ -73,15 +81,17 @@ var _ = Describe("Chunk Manifest Xattrs", func() {
7381
manifest.Num = numChunks
7482
manifest.Chunks = make([]core.Uchunk, numChunks)
7583

84+
err := manifest.SetMeta(manifestMeta)
85+
debug.AssertNoErr(err)
86+
7687
for i := range numChunks {
77-
// Create a proper checksum instead of random string
7888
cksumVal := trand.String(16)
7989
manifest.Chunks[i] = core.Uchunk{
8090
Siz: chunkSizes[i],
8191
Num: i + 1,
8292
Path: trand.String(7),
83-
Cksum: cos.NewCksum(cos.ChecksumCesXxh, cksumVal), // Use proper checksum type
84-
MD5: trand.String(32), // S3 legacy MD5
93+
Cksum: cos.NewCksum(cos.ChecksumCesXxh, cksumVal),
94+
MD5: trand.String(32),
8595
}
8696
}
8797
return manifest
@@ -148,6 +158,12 @@ var _ = Describe("Chunk Manifest Xattrs", func() {
148158
err = loadedManifest.Load(lom)
149159
Expect(err).NotTo(HaveOccurred())
150160

161+
loadedMeta := loadedManifest.GetMeta()
162+
Expect(loadedMeta).To(HaveLen(len(manifestMeta)))
163+
for k, v := range manifestMeta {
164+
Expect(loadedMeta[k]).To(Equal(v), "metadata key %q mismatch", k)
165+
}
166+
151167
// Verify manifest contents including new fields
152168
Expect(loadedManifest.ID).To(Equal("test-session-001"))
153169
Expect(loadedManifest.Created.Unix()).To(Equal(manifest.Created.Unix()))
@@ -222,6 +238,12 @@ var _ = Describe("Chunk Manifest Xattrs", func() {
222238
err = loadedManifest.Load(lom)
223239
Expect(err).NotTo(HaveOccurred())
224240

241+
loadedMeta := loadedManifest.GetMeta()
242+
Expect(loadedMeta).To(HaveLen(len(manifestMeta)))
243+
for k, v := range manifestMeta {
244+
Expect(loadedMeta[k]).To(Equal(v), "metadata key %q mismatch", k)
245+
}
246+
225247
Expect(loadedManifest.ID).To(Equal("many-chunks-session"))
226248
Expect(loadedManifest.Num).To(Equal(numChunks))
227249
Expect(loadedManifest.Chunks).To(HaveLen(int(numChunks)))

core/lchunk.go renamed to core/ufest.go

Lines changed: 73 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"errors"
1111
"fmt"
1212
"io"
13+
"math"
1314
"os"
1415
"sort"
1516
"sync"
@@ -35,7 +36,26 @@ const (
3536

3637
iniChunksCap = 16
3738

38-
completed = 1 // Ufest.Flags
39+
maxChunkSize = 5 * cos.GiB
40+
maxMetaKeys = 1000
41+
)
42+
43+
const (
44+
completed uint16 = 1 << 0 // Ufest.Flags
45+
)
46+
47+
// on-disk xattr
48+
const (
49+
xattrChunk = "user.ais.chunk"
50+
51+
xattrChunkDflt = memsys.DefaultBufSize
52+
xattrChunkMax = memsys.DefaultBuf2Size // NOTE: 64K hard limit
53+
)
54+
55+
const (
56+
utag = "chunk-manifest"
57+
itag = "invalid " + utag
58+
tooShort = "failed to unpack: too short"
3959
)
4060

4161
type (
@@ -47,13 +67,14 @@ type (
4767
Flags uint16 // bit flags (future use)
4868
MD5 string // S3/legacy
4969
}
50-
Ufest struct {
51-
ID string // upload/manifest ID
52-
Created time.Time // creation time
53-
Num uint16 // number of chunks (so far)
54-
Flags uint16 // bit flags { completed, ...}
55-
Chunks []Uchunk
56-
Metadata map[string]string // remote object metadata
70+
mdpair struct{ k, v string }
71+
Ufest struct {
72+
ID string // upload/manifest ID
73+
Created time.Time // creation time
74+
Num uint16 // number of chunks (so far)
75+
Flags uint16 // bit flags { completed, ...}
76+
Chunks []Uchunk
77+
mdmap map[string]string // remote object metadata (gets sorted when packing)
5778

5879
// runtime state
5980
Lom *LOM
@@ -63,20 +84,6 @@ type (
6384
}
6485
)
6586

66-
// on-disk xattr
67-
const (
68-
xattrChunk = "user.ais.chunk"
69-
70-
xattrChunkDflt = memsys.DefaultBufSize
71-
xattrChunkMax = memsys.DefaultBuf2Size // maximum 64K
72-
)
73-
74-
const (
75-
utag = "chunk-manifest"
76-
itag = "invalid " + utag
77-
tooShort = "failed to unpack: too short"
78-
)
79-
8087
func NewUfest(id string, lom *LOM) *Ufest {
8188
startTime := time.Now()
8289
if id == "" {
@@ -93,10 +100,32 @@ func NewUfest(id string, lom *LOM) *Ufest {
93100

94101
func (u *Ufest) Completed() bool { return u.Flags&completed == completed }
95102

103+
// NOTE:
104+
// - GetMeta() returns a reference that callers must not mutate
105+
// - alternatively, clone it and call SetMeta
106+
// - on-disk order is deterministic (sorted at pack time)
107+
func (u *Ufest) SetMeta(md map[string]string) error {
108+
if l := len(md); l > maxMetaKeys {
109+
return fmt.Errorf("%s: number of metadata entries %d exceeds %d limit", utag, l, maxMetaKeys)
110+
}
111+
u.mdmap = md
112+
return nil
113+
}
114+
func (u *Ufest) GetMeta() map[string]string { return u.mdmap }
115+
96116
func (u *Ufest) Lock() { u.mu.Lock() }
97117
func (u *Ufest) Unlock() { u.mu.Unlock() }
98118

99-
func (u *Ufest) Add(c *Uchunk) error {
119+
func (u *Ufest) Add(c *Uchunk, size, num int64) error {
120+
if size > maxChunkSize {
121+
return fmt.Errorf("%s [add] chunk size %d exceeds %d limit", utag, size, maxChunkSize)
122+
}
123+
c.Siz = size
124+
if num > math.MaxUint16 || len(u.Chunks) >= math.MaxUint16 {
125+
return fmt.Errorf("%s [add] chunk number (%d, %d) exceeds %d limit", utag, num, len(u.Chunks), math.MaxUint16)
126+
}
127+
c.Num = uint16(num)
128+
100129
u.mu.Lock()
101130
defer u.mu.Unlock()
102131

@@ -115,7 +144,7 @@ func (u *Ufest) Add(c *Uchunk) error {
115144
dup := &u.Chunks[idx]
116145
if idx < l && dup.Num == c.Num {
117146
if err := cos.RemoveFile(dup.Path); err != nil {
118-
return fmt.Errorf("failed to replace chunk [%d, %s]: %v", c.Num, dup.Path, err)
147+
return fmt.Errorf("%s [add] failed to replace chunk [%d, %s]: %v", utag, c.Num, dup.Path, err)
119148
}
120149
u.Size += c.Siz - dup.Siz
121150
*dup = *c
@@ -164,7 +193,7 @@ func (u *Ufest) Abort(lom *LOM) error {
164193
func (u *Ufest) ChunkName(num int) (string, error) {
165194
lom := u.Lom
166195
if lom == nil {
167-
return "", errors.New("nil lom")
196+
return "", errors.New(utag + ": nil lom")
168197
}
169198
if num <= 0 {
170199
return "", fmt.Errorf("%s: invalid chunk number (%d)", _utag(lom.Cname()), num)
@@ -293,7 +322,9 @@ func (u *Ufest) Store(lom *LOM) error {
293322
}
294323

295324
// write
296-
if err := lom.setXchunk(sgl.Bytes()); err != nil {
325+
b := sgl.Bytes()
326+
if err := lom.setXchunk(b); err != nil {
327+
u.Flags &^= completed
297328
return err
298329
}
299330
lom.md.lid.setlmfl(lmflChunk) // TODO -- FIXME: persist
@@ -340,12 +371,21 @@ func (u *Ufest) pack(w io.Writer) {
340371
binary.BigEndian.PutUint16(b16[:], u.Flags)
341372
w.Write(b16[:])
342373

343-
// metadata map
344-
binary.BigEndian.PutUint16(b16[:], uint16(len(u.Metadata)))
374+
// metadata
375+
l := len(u.mdmap)
376+
binary.BigEndian.PutUint16(b16[:], uint16(l))
345377
w.Write(b16[:])
346-
for k, v := range u.Metadata {
347-
_packStr(w, k)
348-
_packStr(w, v)
378+
if l > 0 {
379+
mdslice := make([]mdpair, 0, l)
380+
for k, v := range u.mdmap {
381+
mdslice = append(mdslice, mdpair{k, v})
382+
}
383+
sort.Slice(mdslice, func(i, j int) bool { return mdslice[i].k < mdslice[j].k }) // deterministic
384+
for _, kv := range mdslice {
385+
_packStr(w, kv.k)
386+
_packStr(w, kv.v)
387+
}
388+
clear(mdslice)
349389
}
350390

351391
// chunks
@@ -435,7 +475,7 @@ func (u *Ufest) unpack(data []byte) (err error) {
435475
offset += cos.SizeofI16
436476

437477
if metaCount > 0 {
438-
u.Metadata = make(map[string]string, metaCount)
478+
u.mdmap = make(map[string]string, metaCount)
439479
for range metaCount {
440480
var k, v string
441481
if k, offset, err = _unpackStr(data, offset); err != nil {
@@ -444,7 +484,7 @@ func (u *Ufest) unpack(data []byte) (err error) {
444484
if v, offset, err = _unpackStr(data, offset); err != nil {
445485
return err
446486
}
447-
u.Metadata[k] = v
487+
u.mdmap[k] = v
448488
}
449489
}
450490

0 commit comments

Comments
 (0)