Skip to content

Commit 61a0ba6

Browse files
committed
core: object chunks; chunk manifest
* lz4-compress chunk manifest with trailing xxhash checksum * each chunk entry now includes its own path * remove byte-packer; use SGL buffers instead * set maximum manifest size to 64K (vs. LOM's 4K) * part two, prev. commit: c413336 Signed-off-by: Alex Aizman <alex.aizman@gmail.com>
1 parent 63faaf4 commit 61a0ba6

File tree

9 files changed

+529
-160
lines changed

9 files changed

+529
-160
lines changed

cmn/cos/unsafe.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ const (
1818
SizeofI64 = int(unsafe.Sizeof(uint64(0)))
1919
SizeofI32 = int(unsafe.Sizeof(uint32(0)))
2020
SizeofI16 = int(unsafe.Sizeof(uint16(0)))
21+
22+
SizeXXHash64 = SizeofI64
2123
)
2224

2325
// Unsafe cast (string => []byte) and ([]byte => string)

cmn/jsp/io.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
)
2121

2222
const (
23-
sizeXXHash64 = cos.SizeofI64
2423
lz4BufferSize = lz4.Block64Kb
2524
)
2625

@@ -57,7 +56,7 @@ func Encode(ws cos.WriterAt, v any, opts Options) error {
5756
debug.Assert(off == prefLen)
5857
}
5958
if opts.Checksum {
60-
var cksum [sizeXXHash64]byte
59+
var cksum [cos.SizeXXHash64]byte
6160
w.Write(cksum[:]) // reserve for checksum
6261
}
6362
if opts.Compress {
@@ -69,7 +68,7 @@ func Encode(ws cos.WriterAt, v any, opts Options) error {
6968
}
7069
if opts.Checksum {
7170
h = onexxh.New64()
72-
cos.Assert(h.Size() == sizeXXHash64)
71+
cos.Assert(h.Size() == cos.SizeXXHash64)
7372
w = io.MultiWriter(h, w)
7473
}
7574

@@ -136,7 +135,7 @@ func Decode(r io.Reader, v any, opts Options, tag string) (*cos.Cksum, error) {
136135
}
137136

138137
func _decksum(r io.Reader, v any, opts Options, tag string) (*cos.Cksum, error) {
139-
var cksum [sizeXXHash64]byte
138+
var cksum [cos.SizeXXHash64]byte
140139
if _, err := r.Read(cksum[:]); err != nil {
141140
return nil, err
142141
}

core/lchunk.go

Lines changed: 174 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,20 @@
55
package core
66

77
import (
8+
"bytes"
9+
"encoding/binary"
10+
"errors"
811
"fmt"
12+
"io"
913
"os"
1014

1115
"github.com/NVIDIA/aistore/cmn"
1216
"github.com/NVIDIA/aistore/cmn/cos"
1317
"github.com/NVIDIA/aistore/fs"
18+
"github.com/NVIDIA/aistore/memsys"
1419

1520
onexxh "github.com/OneOfOne/xxhash"
21+
"github.com/pierrec/lz4/v4"
1622
)
1723

1824
const (
@@ -22,148 +28,245 @@ const (
2228
type (
2329
Uchunk struct {
2430
Siz int64
31+
Path string // (may become v2 _location_)
2532
CksumVal string
2633
}
2734
Ufest struct {
28-
Size int64
2935
Num uint16
3036
CksumTyp string
3137
Chunks []Uchunk
38+
// runtime state
39+
Size int64
3240
}
3341
)
3442

3543
// on-disk xattr
3644
const (
3745
xattrChunk = "user.ais.chunk"
38-
)
3946

40-
const (
41-
utag = "chunk-manifest"
47+
xattrChunkDflt = memsys.DefaultBufSize
48+
xattrChunkMax = memsys.DefaultBuf2Size // maximum 64K
4249
)
4350

44-
// interface guard
45-
var (
46-
_ cos.Unpacker = (*Ufest)(nil)
47-
_ cos.Packer = (*Ufest)(nil)
51+
const (
52+
utag = "chunk-manifest"
53+
itag = "invalid " + utag
54+
tooShort = "failed to unpack: too short"
4855
)
4956

50-
// TODO -- FIXME: 4K only; reuse memsys
5157
func (u *Ufest) Load(lom *LOM) error {
52-
b, err := fs.GetXattr(lom.FQN, xattrChunk)
58+
cbuf, cslab := g.pmm.AllocSize(xattrChunkMax)
59+
defer cslab.Free(cbuf)
60+
61+
data, err := lom.getXchunk(cbuf)
5362
if err != nil {
5463
return _ucerr(lom.Cname(), err)
5564
}
56-
unpacker := cos.NewUnpacker(b)
57-
if err := unpacker.ReadAny(u); err != nil {
65+
66+
// validate and strip/remove the trailing checksum
67+
checksumOffset := len(data) - cos.SizeXXHash64
68+
expectedChecksum := binary.BigEndian.Uint64(data[checksumOffset:])
69+
compressedData := data[:checksumOffset]
70+
actualChecksum := onexxh.Checksum64S(compressedData, cos.MLCG32)
71+
if expectedChecksum != actualChecksum {
72+
return cos.NewErrMetaCksum(expectedChecksum, actualChecksum, utag)
73+
}
74+
75+
// decompress into a 2nd buffer
76+
dbuf, dslab := g.pmm.AllocSize(xattrChunkMax)
77+
err = u._load(lom, compressedData, dbuf)
78+
dslab.Free(dbuf)
79+
return err
80+
}
81+
82+
func (u *Ufest) _load(lom *LOM, compressedData, buf []byte) error {
83+
data := buf[:0]
84+
zr := lz4.NewReader(bytes.NewReader(compressedData))
85+
86+
for {
87+
if len(data) == cap(data) {
88+
return fmt.Errorf("%s too large", utag)
89+
}
90+
n, err := zr.Read(data[len(data):cap(data)])
91+
data = data[:len(data)+n]
92+
if err != nil {
93+
if err == io.EOF {
94+
break
95+
}
96+
return fmt.Errorf("%s decompression failed: %v", utag, err)
97+
}
98+
}
99+
100+
if err := u.unpack(data); err != nil {
58101
return _ucerr(lom.Cname(), err)
59102
}
103+
if u.Size != lom.Lsize(true) {
104+
return fmt.Errorf("%s load size mismatch: %d vs %d", _utag(lom.Cname()), u.Size, lom.Lsize(true))
105+
}
60106
return nil
61107
}
62108

63109
func _ucerr(cname string, err error) error {
64-
tag := utag + "[" + cname + "]"
65110
if cos.IsErrXattrNotFound(err) {
66-
return cmn.NewErrLmetaNotFound(tag, err)
111+
return cmn.NewErrLmetaNotFound(_utag(cname), err)
67112
}
68-
return os.NewSyscallError(getxattr, fmt.Errorf("%s, err: %w", tag, err))
113+
return os.NewSyscallError(getxattr, fmt.Errorf("%s, err: %w", _utag(cname), err))
69114
}
70115

116+
func _utag(cname string) string { return itag + "[" + cname + "]" }
117+
71118
func (u *Ufest) Store(lom *LOM) error {
72119
// validate
120+
var total int64
73121
if u.Num == 0 || int(u.Num) != len(u.Chunks) {
74-
return fmt.Errorf("%s[%s]: invalid manifest num=%d, len=%d", utag, lom.Cname(), u.Num, len(u.Chunks))
122+
return fmt.Errorf("%s: num %d vs %d", _utag(lom.Cname()), u.Num, len(u.Chunks))
123+
}
124+
lsize := lom.Lsize(true)
125+
if u.Size != 0 && u.Size != lsize {
126+
return fmt.Errorf("%s store size: %d vs %d", _utag(lom.Cname()), u.Size, lsize)
75127
}
76-
var total int64
77128
for i := range u.Num {
78129
total += u.Chunks[i].Siz
79130
}
80-
if u.Size == 0 {
81-
u.Size = total
82-
} else if total != u.Size {
83-
return fmt.Errorf("%s total size mismatch: %d vs %d", utag, total, u.Size)
131+
if total != lsize {
132+
return fmt.Errorf("%s: total size mismatch (%d vs %d)", _utag(lom.Cname()), total, lsize)
84133
}
85134

86135
// pack
87-
psize := u.PackedSize()
88-
if psize > xattrMaxSize { // TODO -- FIXME: see 4K above
89-
return fmt.Errorf("%s[%s]: manifest too large (%d > %d)", utag, lom.Cname(), psize, xattrMaxSize)
90-
}
136+
sgl := g.pmm.NewSGL(xattrChunkDflt)
137+
defer sgl.Free()
138+
139+
zw := lz4.NewWriter(sgl)
140+
u.pack(zw)
141+
zw.Close()
142+
143+
// compute and write trailing checksum
144+
data := sgl.Bytes()
145+
h := onexxh.Checksum64S(data, cos.MLCG32)
146+
var checksumBuf [cos.SizeXXHash64]byte
147+
binary.BigEndian.PutUint64(checksumBuf[:], h)
148+
sgl.Write(checksumBuf[:])
91149

92-
packer := cos.NewPacker(nil, psize)
93-
u.Pack(packer)
150+
if sgl.Len() > xattrChunkMax {
151+
return fmt.Errorf("%s: too large (%d > %d max)", _utag(lom.Cname()), sgl.Len(), xattrChunkMax)
152+
}
94153

95154
// write
96-
if err := fs.SetXattr(lom.FQN, xattrChunk, packer.Bytes()); err != nil {
155+
if err := lom.setXchunk(sgl.Bytes()); err != nil {
97156
return err
98157
}
99-
100158
lom.md.lid.setlmfl(lmflChunk) // TODO -- FIXME: persist
101159
return nil
102160
}
103161

104-
func (u *Ufest) Pack(packer *cos.BytePack) {
105-
packer.WriteByte(umetaver)
106-
packer.WriteInt64(u.Size)
107-
packer.WriteUint16(u.Num)
108-
packer.WriteString(u.CksumTyp)
162+
func (u *Ufest) pack(w io.Writer) {
163+
// meta-version
164+
w.Write([]byte{umetaver})
165+
166+
// number of chunks
167+
var buf [cos.SizeofI16]byte
168+
binary.BigEndian.PutUint16(buf[:], u.Num)
169+
w.Write(buf[:])
170+
171+
// checksum type
172+
_packStr(w, u.CksumTyp)
173+
174+
// chunks
109175
for _, c := range u.Chunks {
110-
packer.WriteInt64(c.Siz)
111-
packer.WriteString(c.CksumVal)
176+
// chunk size
177+
var sizeBuf [cos.SizeofI64]byte
178+
binary.BigEndian.PutUint64(sizeBuf[:], uint64(c.Siz))
179+
w.Write(sizeBuf[:])
180+
181+
// chunk path
182+
_packStr(w, c.Path)
183+
184+
// chunk checksum
185+
_packStr(w, c.CksumVal)
112186
}
113-
h := onexxh.Checksum64S(packer.Bytes(), cos.MLCG32)
114-
packer.WriteUint64(h)
115187
}
116188

117-
func (u *Ufest) PackedSize() (ll int) {
118-
for _, c := range u.Chunks {
119-
ll += cos.SizeofI64 + cos.PackedStrLen(c.CksumVal)
120-
}
121-
return ll + 1 + cos.SizeofI64 + cos.SizeofI16 + cos.PackedStrLen(u.CksumTyp) + cos.SizeofI64
189+
func _packStr(w io.Writer, s string) {
190+
var lenBuf [cos.SizeofI16]byte
191+
binary.BigEndian.PutUint16(lenBuf[:], uint16(len(s)))
192+
w.Write(lenBuf[:])
193+
w.Write(cos.UnsafeB(s))
122194
}
123195

124-
func (u *Ufest) Unpack(unpacker *cos.ByteUnpack) (err error) {
125-
var metaver byte
126-
if metaver, err = unpacker.ReadByte(); err != nil {
127-
return fmt.Errorf("invalid %s (too short)", utag)
196+
func (u *Ufest) unpack(data []byte) (err error) {
197+
if len(data) < 1 {
198+
return errors.New(tooShort)
128199
}
200+
201+
var offset int
202+
203+
// meta-version
204+
metaver := data[offset]
205+
offset++
129206
if metaver != umetaver {
130207
return fmt.Errorf("unsupported %s meta-version %d (expecting %d)", utag, metaver, umetaver)
131208
}
132-
if u.Size, err = unpacker.ReadInt64(); err != nil {
133-
return fmt.Errorf("invalid %s size: %v", utag, err)
134-
}
135-
if u.Num, err = unpacker.ReadUint16(); err != nil {
136-
return fmt.Errorf("invalid %s num: %v", utag, err)
209+
210+
// number of chunks
211+
if len(data) < offset+cos.SizeofI16 {
212+
return errors.New(tooShort)
137213
}
138-
if u.CksumTyp, err = unpacker.ReadString(); err != nil {
139-
return fmt.Errorf("invalid %s cksum type: %v", utag, err)
214+
u.Num = binary.BigEndian.Uint16(data[offset:])
215+
offset += cos.SizeofI16
216+
217+
// checksum type
218+
if u.CksumTyp, offset, err = _unpackStr(data, offset); err != nil {
219+
return err
140220
}
141221

222+
// Read chunks
142223
u.Chunks = make([]Uchunk, u.Num)
143-
var total int64
224+
u.Size = 0
144225
for i := range u.Num {
145226
c := &u.Chunks[i]
146-
if c.Siz, err = unpacker.ReadInt64(); err != nil {
147-
return fmt.Errorf("invalid %s chunk idx %d size: %v", utag, i, err)
227+
228+
// chunk size
229+
if len(data) < offset+cos.SizeofI64 {
230+
return errors.New(tooShort)
148231
}
149-
if c.CksumVal, err = unpacker.ReadString(); err != nil {
150-
return fmt.Errorf("invalid %s chunk idx %d cksum val: %v", utag, i, err)
232+
c.Siz = int64(binary.BigEndian.Uint64(data[offset:]))
233+
offset += cos.SizeofI64
234+
u.Size += c.Siz
235+
// chunk path
236+
if c.Path, offset, err = _unpackStr(data, offset); err != nil {
237+
return err
238+
}
239+
// chunk checksum
240+
if c.CksumVal, offset, err = _unpackStr(data, offset); err != nil {
241+
return err
151242
}
152-
total += c.Siz
153-
}
154-
if total != u.Size {
155-
return fmt.Errorf("%s total size mismatch: %d vs %d", utag, total, u.Size)
156243
}
244+
return nil
245+
}
157246

158-
var h uint64
159-
if h, err = unpacker.ReadUint64(); err != nil {
160-
return fmt.Errorf("%s corrupted: %v", utag, err)
247+
func _unpackStr(data []byte, offset int) (string, int, error) {
248+
if len(data) < offset+cos.SizeofI16 {
249+
return "", offset, errors.New(tooShort)
161250
}
162-
b := unpacker.Bytes()
163-
hh := onexxh.Checksum64S(b[:len(b)-cos.SizeofI64], cos.MLCG32)
164-
if h != hh {
165-
return cos.NewErrMetaCksum(h, hh, utag)
251+
252+
l := int(binary.BigEndian.Uint16(data[offset:]))
253+
offset += cos.SizeofI16
254+
if len(data) < offset+l {
255+
return "", offset, errors.New(tooShort)
166256
}
257+
str := string(data[offset : offset+l])
258+
offset += l
259+
return str, offset, nil
260+
}
167261

168-
return nil
262+
//
263+
// additional lom
264+
//
265+
266+
func (lom *LOM) getXchunk(buf []byte) ([]byte, error) {
267+
return fs.GetXattrBuf(lom.FQN, xattrChunk, buf)
268+
}
269+
270+
func (lom *LOM) setXchunk(data []byte) error {
271+
return fs.SetXattr(lom.FQN, xattrChunk, data)
169272
}

0 commit comments

Comments
 (0)