Skip to content

Commit 68e291f

Browse files
committed
xs/blob: integrate load advisory; support single-threaded execution
* integrate load advisory system to dynamically adjust chunk size and workers; throttle downloads based on load conditions. * add single-threaded exection mode (`numWorker = -1`) with `runSerial()` method. Signed-off-by: Tony Chen <a122774007@gmail.com>
1 parent 90ab072 commit 68e291f

File tree

4 files changed

+320
-115
lines changed

4 files changed

+320
-115
lines changed

ais/test/blob_download_test.go

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,3 +463,130 @@ func TestBlobDownloadStreamGet(t *testing.T) {
463463
tassert.CheckFatal(t, err)
464464
tassert.Fatalf(t, tools.ReaderEqual(readerDup, result), "warm GET: data mismatch")
465465
}
466+
467+
// TestBlobDownloadSingleThreaded tests single-threaded blob download (numWorkers = -1).
468+
// It validates:
469+
// 1. Single-threaded blob download via API works correctly
470+
// 2. Single-threaded blob download via streaming GET works correctly
471+
// 3. Objects are downloaded and cached properly in single-threaded mode
472+
func TestBlobDownloadSingleThreaded(t *testing.T) {
473+
const (
474+
objSize = 32 * cos.MiB
475+
chunkSize = 8 * cos.MiB
476+
)
477+
var (
478+
proxyURL = tools.RandomProxyURL(t)
479+
baseParams = tools.BaseAPIParams(proxyURL)
480+
bck = cliBck
481+
)
482+
483+
tools.CheckSkip(t, &tools.SkipTestArgs{RemoteBck: true, Bck: bck})
484+
initMountpaths(t, proxyURL)
485+
486+
tests := []struct {
487+
name string
488+
streamGet bool
489+
}{
490+
{name: "blob-download", streamGet: false},
491+
{name: "streaming-get", streamGet: true},
492+
}
493+
494+
for _, test := range tests {
495+
t.Run(test.name, func(t *testing.T) {
496+
objName := "blob-single-threaded-" + test.name + "-" + trand.String(5)
497+
498+
// Provision object to remote bucket
499+
tlog.Logfln("Provisioning object %s (%s)", objName, cos.ToSizeIEC(objSize, 0))
500+
reader, err := readers.New(&readers.Arg{Type: readers.Rand, Size: objSize, CksumType: cos.ChecksumNone})
501+
tassert.CheckFatal(t, err)
502+
503+
_, err = api.PutObject(&api.PutArgs{
504+
BaseParams: baseParams,
505+
Bck: bck,
506+
ObjName: objName,
507+
Reader: reader,
508+
Size: uint64(objSize),
509+
})
510+
tassert.CheckFatal(t, err)
511+
defer api.DeleteObject(baseParams, bck, objName)
512+
513+
// Evict the object to force cold GET
514+
tlog.Logfln("Evicting object %s", objName)
515+
err = api.EvictObject(baseParams, bck, objName)
516+
tassert.CheckFatal(t, err)
517+
518+
if test.streamGet {
519+
// Test single-threaded blob download via streaming GET
520+
tlog.Logfln("Starting single-threaded blob download via streaming GET")
521+
coldGetBuf := &bytes.Buffer{}
522+
getArgs := &api.GetArgs{
523+
Writer: coldGetBuf,
524+
Header: http.Header{
525+
apc.HdrBlobDownload: []string{"true"},
526+
apc.HdrBlobChunk: []string{cos.ToSizeIEC(chunkSize, 0)},
527+
apc.HdrBlobWorkers: []string{"-1"}, // Single-threaded
528+
},
529+
}
530+
result, size, err := api.GetObjectReader(baseParams, bck, objName, getArgs)
531+
tassert.CheckFatal(t, err)
532+
tassert.Fatalf(t, size == objSize, "expected size %d, got %d", objSize, size)
533+
534+
// Verify content matches
535+
readerDup, err := reader.Open()
536+
tassert.CheckFatal(t, err)
537+
tassert.Fatalf(t, tools.ReaderEqual(readerDup, result), "cold GET: data mismatch")
538+
} else {
539+
// Test single-threaded blob download via API
540+
tlog.Logfln("Starting single-threaded blob download via API")
541+
blobMsg := &apc.BlobMsg{
542+
ChunkSize: chunkSize,
543+
FullSize: objSize,
544+
NumWorkers: -1, // Single-threaded
545+
LatestVer: false,
546+
}
547+
xid, err := api.BlobDownload(baseParams, bck, objName, blobMsg)
548+
tassert.CheckFatal(t, err)
549+
tlog.Logfln("Blob download started with xid=%s", xid)
550+
551+
// Wait for blob download to complete
552+
tlog.Logfln("Waiting for single-threaded blob download to complete")
553+
xactFinished := func(snaps xact.MultiSnap) (bool, bool) {
554+
tid, _, err := snaps.RunningTarget("")
555+
if err != nil {
556+
return false, false
557+
}
558+
finished := tid == "" // not running = finished
559+
return finished, false
560+
}
561+
args := xact.ArgsMsg{ID: xid, Kind: apc.ActBlobDl, Timeout: tools.EvictPrefetchTimeout}
562+
err = api.WaitForXactionNode(baseParams, &args, xactFinished)
563+
tassert.CheckFatal(t, err)
564+
565+
// Verify content via GET
566+
result, size, err := api.GetObjectReader(baseParams, bck, objName, nil)
567+
tassert.CheckFatal(t, err)
568+
tassert.Fatalf(t, size == objSize, "expected size %d, got %d", objSize, size)
569+
570+
readerDup, err := reader.Open()
571+
tassert.CheckFatal(t, err)
572+
tassert.Fatalf(t, tools.ReaderEqual(readerDup, result), "warm GET: data mismatch")
573+
}
574+
575+
// Verify object is cached
576+
tlog.Logfln("Verifying object is cached")
577+
cachedList, err := api.ListObjects(baseParams, bck, &apc.LsoMsg{Prefix: objName, Props: apc.GetPropsCached}, api.ListArgs{})
578+
tassert.CheckFatal(t, err)
579+
tassert.Fatalf(t, len(cachedList.Entries) == 1, "expected 1 cached object, got %d", len(cachedList.Entries))
580+
581+
// Verify object is chunked
582+
tlog.Logfln("Verifying object is chunked")
583+
m := &ioContext{t: t, bck: bck}
584+
chunks := m.findObjChunksOnDisk(bck, objName)
585+
expectedChunks := int((objSize + chunkSize - 1) / chunkSize)
586+
tassert.Fatalf(t, len(chunks)+1 == expectedChunks,
587+
"expected %d chunk files, found %d", expectedChunks, len(chunks)+1)
588+
589+
tlog.Logfln("Single-threaded blob download test (%s) completed successfully", test.name)
590+
})
591+
}
592+
}

api/apc/blob.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ func (msg *BlobMsg) FromHeader(hdr http.Header) error {
3636
if err != nil {
3737
return fmt.Errorf("%s: failed to parse %s=%s: %w", _bldl, HdrBlobWorkers, valWorkers[0], err)
3838
}
39-
if nw < 0 || nw > 128 {
40-
return fmt.Errorf("%s: invalid %s=%s: expecting (0..128) range", _bldl, HdrBlobWorkers, valWorkers[0])
39+
if nw < -1 || nw > 128 {
40+
return fmt.Errorf("%s: invalid %s=%s: expecting (-1..128) range", _bldl, HdrBlobWorkers, valWorkers[0])
4141
}
4242
msg.NumWorkers = int(nw)
4343
}

0 commit comments

Comments
 (0)