Skip to content

Commit 45a398f

Browse files
committed
core: rewrite ufest decompress and unpack logic; introduce MaxChunkCount
* introduced `MaxChunkCount` const to limit number of chunks for both storing and loading paths. * updated `packedChunkSize` to reflect worst-case size per chunk. * refactored `unpack` to use SGL buffer, allowing dynamic growth up to the defined size limit `MaxChunkCount * packedChunkSize`. Signed-off-by: Tony Chen <a122774007@gmail.com>
1 parent 56a8b02 commit 45a398f

File tree

3 files changed

+366
-118
lines changed

3 files changed

+366
-118
lines changed

ais/test/object_test.go

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/NVIDIA/aistore/core"
2929
"github.com/NVIDIA/aistore/core/meta"
3030
"github.com/NVIDIA/aistore/core/mock"
31+
"github.com/NVIDIA/aistore/sys"
3132
"github.com/NVIDIA/aistore/tools"
3233
"github.com/NVIDIA/aistore/tools/docker"
3334
"github.com/NVIDIA/aistore/tools/readers"
@@ -2173,6 +2174,125 @@ func TestMultipartUploadParallel(t *testing.T) {
21732174
tlog.Logfln("parallel multipart upload completed successfully with correct content ordering")
21742175
}
21752176

2177+
func TestMultipartMaxChunks(t *testing.T) {
2178+
tools.CheckSkip(t, &tools.SkipTestArgs{Long: true})
2179+
2180+
var (
2181+
proxyURL = tools.RandomProxyURL(t)
2182+
baseParams = tools.BaseAPIParams(proxyURL)
2183+
bck = cmn.Bck{
2184+
Name: trand.String(10),
2185+
Provider: apc.AIS,
2186+
}
2187+
miniPartData = []byte("x") // Minimal 1-byte data per part
2188+
)
2189+
2190+
tools.CreateBucket(t, proxyURL, bck, nil, true /*cleanup*/)
2191+
2192+
t.Run("exceed-limit", func(t *testing.T) {
2193+
var (
2194+
objName = "test-multipart-exceed-limit"
2195+
numParts = core.MaxChunkCount + 100 // Exceed limit by 100
2196+
)
2197+
2198+
uploadID, err := api.CreateMultipartUpload(baseParams, bck, objName)
2199+
tassert.CheckFatal(t, err)
2200+
2201+
err = uploadPartsInParallel(objName, uploadID, numParts, bck, miniPartData)
2202+
2203+
// We expect an error because we're exceeding MaxChunkCount
2204+
tassert.Fatalf(t, err != nil, "expected error when exceeding MaxChunkCount, but upload succeeded")
2205+
herr := cmn.UnwrapErrHTTP(err)
2206+
tassert.Fatalf(t, herr != nil, "expected ErrHTTP, got %v", err)
2207+
tassert.Fatalf(t, strings.Contains(herr.Message, "exceeds the maximum allowed"),
2208+
"expected error message to contain 'exceeds the maximum allowed', got: %v", err)
2209+
2210+
tlog.Logfln("multipart upload correctly rejected when exceeding MaxChunkCount (%d)", core.MaxChunkCount)
2211+
2212+
// Cleanup: abort the upload
2213+
_ = api.AbortMultipartUpload(baseParams, bck, objName, uploadID)
2214+
})
2215+
2216+
t.Run("equal-to-limit", func(t *testing.T) {
2217+
var (
2218+
objName = "test-multipart-at-limit"
2219+
numParts = core.MaxChunkCount // Exactly at the limit
2220+
)
2221+
2222+
uploadID, err := api.CreateMultipartUpload(baseParams, bck, objName)
2223+
tassert.CheckFatal(t, err)
2224+
tassert.Fatalf(t, uploadID != "", "upload ID should not be empty")
2225+
2226+
// Upload parts in parallel - all should succeed
2227+
err = uploadPartsInParallel(objName, uploadID, numParts, bck, miniPartData)
2228+
tassert.CheckFatal(t, err)
2229+
2230+
// Complete multipart upload
2231+
partNumbers := make([]int, numParts)
2232+
for i := range numParts {
2233+
partNumbers[i] = i + 1
2234+
}
2235+
err = api.CompleteMultipartUpload(baseParams, bck, objName, uploadID, partNumbers)
2236+
tassert.CheckFatal(t, err)
2237+
2238+
tlog.Logfln("multipart upload completed successfully with %d parts at MaxChunkCount", numParts)
2239+
2240+
// Verify the uploaded object
2241+
hargs := api.HeadArgs{FltPresence: apc.FltPresent}
2242+
objAttrs, err := api.HeadObject(baseParams, bck, objName, hargs)
2243+
tassert.CheckFatal(t, err)
2244+
2245+
expectedSize := int64(numParts * len(miniPartData))
2246+
tassert.Fatalf(t, objAttrs.Size == expectedSize, "object size mismatch: expected %d, got %d", expectedSize, objAttrs.Size)
2247+
2248+
// GET the object and validate content
2249+
writer := bytes.NewBuffer(nil)
2250+
getArgs := api.GetArgs{Writer: writer}
2251+
_, err = api.GetObject(baseParams, bck, objName, &getArgs)
2252+
tassert.CheckFatal(t, err)
2253+
2254+
downloadedContent := writer.Bytes()
2255+
tassert.Errorf(t, len(downloadedContent) == numParts,
2256+
"content length mismatch: expected %d bytes, got %d", numParts, len(downloadedContent))
2257+
2258+
// Validate all bytes are 'x'
2259+
for i, b := range downloadedContent {
2260+
if b != 'x' {
2261+
t.Fatalf("byte at position %d is %q, expected 'x'", i, b)
2262+
}
2263+
}
2264+
2265+
tlog.Logfln("object content validated successfully: %d bytes, all 'x'", len(downloadedContent))
2266+
})
2267+
}
2268+
2269+
func uploadPartsInParallel(objName, uploadID string, numParts int, bck cmn.Bck, data []byte) error {
2270+
g := errgroup.Group{}
2271+
g.SetLimit(sys.MaxParallelism())
2272+
2273+
for partNum := 1; partNum <= numParts; partNum++ {
2274+
pn := partNum
2275+
2276+
g.Go(func() error {
2277+
putPartArgs := &api.PutPartArgs{
2278+
PutArgs: api.PutArgs{
2279+
BaseParams: baseParams,
2280+
Bck: bck,
2281+
ObjName: objName,
2282+
Reader: readers.NewBytes(data),
2283+
Size: uint64(len(data)),
2284+
},
2285+
UploadID: uploadID,
2286+
PartNumber: pn,
2287+
}
2288+
2289+
return api.UploadPart(putPartArgs)
2290+
})
2291+
}
2292+
2293+
return g.Wait()
2294+
}
2295+
21762296
func TestMultipartUploadAbort(t *testing.T) {
21772297
var (
21782298
proxyURL = tools.RandomProxyURL(t)

core/lchunk_test.go

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,10 +207,11 @@ var _ = Describe("Ufest Core Functionality", func() {
207207
// Test chunk number exceeding uint16 limit
208208
_, err := manifest.NewChunk(70000, lom) // > math.MaxUint16
209209
Expect(err).To(HaveOccurred())
210-
Expect(err.Error()).To(ContainSubstring("invalid chunk number"))
210+
Expect(err.Error()).To(ContainSubstring("exceeds the maximum allowed"))
211211

212212
_, err = manifest.NewChunk(-1, lom)
213213
Expect(err).To(HaveOccurred())
214+
Expect(err.Error()).To(ContainSubstring("invalid chunk number"))
214215
})
215216

216217
It("should handle concurrent chunk additions safely", func() {
@@ -534,6 +535,53 @@ var _ = Describe("Ufest Core Functionality", func() {
534535
"expected ErrLmetaCorrupted or ErrBadCksum")
535536
})
536537

538+
It("detects corrupted compressed data via checksum or decompression failure", func() {
539+
testObject := "mpu/checksum-corrupt.bin"
540+
localFQN := mix.MakePathFQN(&localBck, fs.ObjCT, testObject)
541+
createTestFile(localFQN, 0)
542+
lom := newBasicLom(localFQN)
543+
544+
u, err := core.NewUfest("cksum-test-"+cos.GenTie(), lom, false)
545+
Expect(err).NotTo(HaveOccurred())
546+
547+
// Add multiple chunks to ensure we have enough data
548+
for i := 1; i <= 3; i++ {
549+
c, err := u.NewChunk(i, lom)
550+
Expect(err).NotTo(HaveOccurred())
551+
createTestChunk(c.Path(), 8*cos.KiB, nil)
552+
Expect(u.Add(c, 8*cos.KiB, int64(i))).NotTo(HaveOccurred())
553+
}
554+
Expect(lom.CompleteUfest(u, false)).NotTo(HaveOccurred())
555+
556+
mfqn := lom.GenFQN(fs.ChunkMetaCT)
557+
buf, err := os.ReadFile(mfqn)
558+
Expect(err).NotTo(HaveOccurred())
559+
Expect(len(buf)).To(BeNumerically(">", 16)) // Need enough data
560+
561+
// Corrupt the compressed data (not the trailing checksum)
562+
// Trailing checksum is last 8 bytes, so corrupt byte before that
563+
corruptIdx := len(buf) - 16
564+
buf[corruptIdx] ^= 0xFF
565+
Expect(os.WriteFile(mfqn, buf, 0o644)).NotTo(HaveOccurred())
566+
567+
// Reload and expect either decompression failure or checksum mismatch
568+
loaded, err := core.NewUfest("", lom, true)
569+
Expect(err).NotTo(HaveOccurred())
570+
lom.Lock(false)
571+
err = loaded.LoadCompleted(lom)
572+
lom.Unlock(false)
573+
574+
// Corrupting compressed data can fail in two ways:
575+
// 1. LZ4 decompression fails (ErrLmetaCorrupted) - most common
576+
// 2. Checksum validation fails (ErrBadCksum) - if corruption doesn't break LZ4
577+
var (
578+
lmerr *cmn.ErrLmetaCorrupted
579+
bcerr *cos.ErrBadCksum
580+
)
581+
Expect(errors.As(err, &lmerr) || errors.As(err, &bcerr)).To(BeTrue(),
582+
"expected ErrLmetaCorrupted or ErrBadCksum for corrupted compressed data")
583+
})
584+
537585
})
538586

539587
Describe("Locking Behavior", func() {
@@ -593,6 +641,50 @@ var _ = Describe("Ufest Core Functionality", func() {
593641
// Test exceeding uint16 limit
594642
_, err := manifest.NewChunk(70000, manifest.Lom()) // > math.MaxUint16 (65535)
595643
Expect(err).To(HaveOccurred())
644+
Expect(err.Error()).To(ContainSubstring("exceeds the maximum allowed"))
645+
})
646+
})
647+
648+
Describe("MaxChunkCount Limit", func() {
649+
var (
650+
manifest *core.Ufest
651+
lom *core.LOM
652+
)
653+
654+
BeforeEach(func() {
655+
var err error
656+
testObjectName := "test-objects/maxchunks-test.bin"
657+
testFileSize := int64(1 * cos.TiB)
658+
localFQN := mix.MakePathFQN(&localBck, fs.ObjCT, testObjectName)
659+
lom = newBasicLom(localFQN, testFileSize)
660+
manifest, err = core.NewUfest("test-maxchunks-"+trand.String(8), lom, false)
661+
Expect(err).NotTo(HaveOccurred())
662+
})
663+
664+
It("should reject chunk number exceeding MaxChunkCount", func() {
665+
invalidChunkNum := core.MaxChunkCount + 1
666+
_, err := manifest.NewChunk(invalidChunkNum, lom)
667+
Expect(err).To(HaveOccurred())
668+
Expect(err.Error()).To(ContainSubstring("exceeds the maximum allowed"))
669+
})
670+
671+
It("should accept chunk number at MaxChunkCount", func() {
672+
validChunkNum := core.MaxChunkCount
673+
chunk, err := manifest.NewChunk(validChunkNum, lom)
674+
Expect(err).NotTo(HaveOccurred())
675+
Expect(chunk).NotTo(BeNil())
676+
Expect(chunk.Num()).To(Equal(uint16(validChunkNum)))
677+
})
678+
679+
It("should reject chunk number at zero", func() {
680+
_, err := manifest.NewChunk(0, lom)
681+
Expect(err).To(HaveOccurred())
682+
Expect(err.Error()).To(ContainSubstring("invalid chunk number"))
683+
})
684+
685+
It("should reject negative chunk number", func() {
686+
_, err := manifest.NewChunk(-1, lom)
687+
Expect(err).To(HaveOccurred())
596688
Expect(err.Error()).To(ContainSubstring("invalid chunk number"))
597689
})
598690
})

0 commit comments

Comments
 (0)