Skip to content

Commit 2ac099e

Browse files
committed
aisloader: add support for percentage-based archive workload (read, write)
* aisloader work-order now includes `archpath` * PUT: create shards at specified percentage: `arch.pct=30` means 30% of all PUTs create shards, 70% - plain objects new aisloader operation: `opPutShard` * GET: read archived files from existing shards * aisloader's list-objects: - use `apc.LsArchDir` flag - filter shard objects using 'entry-is-archive' * with refactoring, minor fixes, and micro-optimizations ------------- * target's list-objects: - contained files should not be listed with 'entry-is-archive' (fix) ------------- * add misc helpers: * archive.SplitAtExtension() common helper * encoding/decoding with zero-byte separator ------------- * TODO: - get-batch from archives - multipart upload shards Signed-off-by: Alex Aizman <alex.aizman@gmail.com>
1 parent cee0906 commit 2ac099e

File tree

10 files changed

+201
-94
lines changed

10 files changed

+201
-94
lines changed

bench/tools/aisloader/client.go

Lines changed: 64 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -377,26 +377,28 @@ func newTraceCtx(proxyURL string) *traceCtx {
377377
return tctx
378378
}
379379

380-
func newGetRequest(proxyURL string, bck cmn.Bck, objName string, offset, length int64, latest bool) (*http.Request, error) {
381-
var (
382-
hdr http.Header
383-
query = url.Values{}
384-
)
385-
query = bck.AddToQuery(query)
380+
func newGetRequest(proxyURL string, wo *workOrder, p *params) (*http.Request, error) {
381+
query := p.bck.AddToQuery(nil)
386382
if etlName != "" {
387383
query.Add(apc.QparamETLName, etlName)
388384
}
389-
if latest {
385+
if p.latest {
390386
query.Add(apc.QparamLatestVer, "true")
391387
}
392-
if length > 0 {
393-
rng := cmn.MakeRangeHdr(offset, length)
388+
if wo.archpath != "" {
389+
query.Add(apc.QparamArchpath, wo.archpath)
390+
}
391+
392+
var hdr http.Header
393+
if p.readLen > 0 {
394+
rng := cmn.MakeRangeHdr(p.readOff, p.readLen)
394395
hdr = http.Header{cos.HdrRange: []string{rng}}
395396
}
397+
396398
reqArgs := cmn.HreqArgs{
397399
Method: http.MethodGet,
398400
Base: proxyURL,
399-
Path: apc.URLPathObjects.Join(bck.Name, objName),
401+
Path: apc.URLPathObjects.Join(p.bck.Name, wo.objName),
400402
Query: query,
401403
Header: hdr,
402404
}
@@ -431,8 +433,8 @@ func s3getDiscard(bck cmn.Bck, objName string) (int64, error) {
431433
}
432434

433435
// getDiscard sends a GET request and discards returned data.
434-
func getDiscard(proxyURL string, bck cmn.Bck, objName string, offset, length int64, validate, latest bool) (int64, error) {
435-
req, err := newGetRequest(proxyURL, bck, objName, offset, length, latest)
436+
func getDiscard(proxyURL string, wo *workOrder, p *params) (int64, error) {
437+
req, err := newGetRequest(proxyURL, wo, p)
436438
if err != nil {
437439
return 0, err
438440
}
@@ -446,30 +448,33 @@ func getDiscard(proxyURL string, bck cmn.Bck, objName string, offset, length int
446448
}
447449

448450
var hdrCksumValue, hdrCksumType string
449-
if validate {
451+
if p.verifyHash {
450452
hdrCksumValue = resp.Header.Get(apc.HdrObjCksumVal)
451453
hdrCksumType = resp.Header.Get(apc.HdrObjCksumType)
452454
}
453-
src := "GET " + bck.Cname(objName)
454-
n, cksumValue, err := readDiscard(resp, src, hdrCksumType)
455+
n, cksumValue, err := readDiscard(resp, hdrCksumType)
455456

456457
resp.Body.Close()
457458
if err != nil {
458-
return 0, err
459+
tag := "GET " + p.bck.Cname(wo.objName)
460+
if wo.archpath != "" {
461+
tag += "/" + wo.archpath
462+
}
463+
return 0, fmt.Errorf("%s: %v", tag, err)
459464
}
460-
if validate && hdrCksumValue != cksumValue {
465+
if p.verifyHash && hdrCksumValue != cksumValue {
461466
return 0, cmn.NewErrInvalidCksum(hdrCksumValue, cksumValue)
462467
}
463468
return n, err
464469
}
465470

466471
// Same as above, but with HTTP trace.
467-
func getTraceDiscard(proxyURL string, bck cmn.Bck, objName string, latencies *httpLatencies, offset, length int64, validate, latest bool) (int64, error) {
472+
func getTraceDiscard(proxyURL string, wo *workOrder, p *params) (int64, error) {
468473
var (
469474
hdrCksumValue string
470475
hdrCksumType string
471476
)
472-
req, err := newGetRequest(proxyURL, bck, objName, offset, length, latest)
477+
req, err := newGetRequest(proxyURL, wo, p)
473478
if err != nil {
474479
return 0, err
475480
}
@@ -486,21 +491,24 @@ func getTraceDiscard(proxyURL string, bck cmn.Bck, objName string, latencies *ht
486491
defer resp.Body.Close()
487492

488493
tctx.tr.tsHTTPEnd = time.Now()
489-
if validate {
494+
if p.verifyHash {
490495
hdrCksumValue = resp.Header.Get(apc.HdrObjCksumVal)
491496
hdrCksumType = resp.Header.Get(apc.HdrObjCksumType)
492497
}
493498

494-
src := "GET " + bck.Cname(objName)
495-
n, cksumValue, err := readDiscard(resp, src, hdrCksumType)
499+
n, cksumValue, err := readDiscard(resp, hdrCksumType)
496500
if err != nil {
497-
return 0, err
501+
tag := "GET " + p.bck.Cname(wo.objName)
502+
if wo.archpath != "" {
503+
tag += "/" + wo.archpath
504+
}
505+
return 0, fmt.Errorf("%s: %v", tag, err)
498506
}
499-
if validate && hdrCksumValue != cksumValue {
507+
if p.verifyHash && hdrCksumValue != cksumValue {
500508
err = cmn.NewErrInvalidCksum(hdrCksumValue, cksumValue)
501509
}
502510

503-
tctx.tr.set(latencies)
511+
tctx.tr.set(wo.latencies)
504512
return n, err
505513
}
506514

@@ -557,7 +565,7 @@ func listObjCallback(ctx *api.LsoCounter) {
557565
}
558566

559567
// listObjectNames returns a slice of object names of all objects that match the prefix in a bucket.
560-
func listObjectNames(p *params) ([]string, error) {
568+
func listObjectNames(p *params) (names []string, fcnt int /*num archived files*/, _ error) {
561569
var (
562570
bp = p.bp
563571
bck = p.bck
@@ -572,16 +580,37 @@ func listObjectNames(p *params) ([]string, error) {
572580
msg.Flags |= apc.LsNoDirs // aisloader's default (to override, use --list-dirs)
573581
}
574582
args := api.ListArgs{Callback: listObjCallback, CallAfter: longListTime}
583+
if p.archParams.pct > 0 {
584+
msg.Flags |= apc.LsArchDir
585+
}
586+
575587
lst, err := api.ListObjects(bp, bck, msg, args)
576588
if err != nil {
577-
return nil, err
578-
}
589+
return nil, 0, err
590+
}
591+
592+
names = make([]string, 0, len(lst.Entries))
593+
for _, en := range lst.Entries {
594+
if en.Flags&apc.EntryInArch != 0 {
595+
debug.Assert(msg.Flags&apc.LsArchDir != 0)
596+
objName, archPath := archive.SplitAtExtension(en.Name)
597+
if archPath != "" {
598+
names = append(names, encodeArchName(objName, archPath)) // with `archSep` delimiter
599+
fcnt++
600+
continue
601+
}
602+
}
603+
// skip archive/shard objects themselves
604+
if en.Flags&apc.EntryIsArchive != 0 {
605+
debug.Assert(msg.Flags&apc.LsArchDir != 0)
606+
continue
607+
}
579608

580-
objs := make([]string, 0, len(lst.Entries))
581-
for _, obj := range lst.Entries {
582-
objs = append(objs, obj.Name)
609+
// plain object or shard itself
610+
names = append(names, en.Name)
583611
}
584-
return objs, nil
612+
613+
return names, fcnt, nil
585614
}
586615

587616
func initS3Svc() error {
@@ -668,13 +697,13 @@ func s3ListObjects() ([]string, error) {
668697
return names, nil
669698
}
670699

671-
func readDiscard(r *http.Response, tag, cksumType string) (int64, string, error) {
700+
func readDiscard(r *http.Response, cksumType string) (int64, string, error) {
672701
if r.StatusCode >= http.StatusBadRequest {
673702
bytes, err := cos.ReadAll(r.Body)
674703
if err == nil {
675-
return 0, "", fmt.Errorf("bad status %d from %s, response: %s", r.StatusCode, tag, string(bytes))
704+
return 0, "", fmt.Errorf("bad status %d, response: %s", r.StatusCode, string(bytes))
676705
}
677-
return 0, "", fmt.Errorf("bad status %d from %s: %v", r.StatusCode, tag, err)
706+
return 0, "", fmt.Errorf("bad status %d: %v", r.StatusCode, err)
678707
}
679708

680709
n, cksum, err := cos.CopyAndChecksum(io.Discard, r.Body, nil, cksumType)

bench/tools/aisloader/params.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ type (
9090
minSz int64
9191
maxSz int64
9292

93-
use bool // whether to enable sharding across all supported workloads (PUT, GET, etc.)
93+
pct int // broadly: percentage of archive workload (affects both reading and writing when positive)
9494
}
9595

9696
// Object naming strategy and distribution
@@ -236,7 +236,7 @@ func addCmdLine(f *flag.FlagSet, p *params) {
236236
f.IntVar(&p.archParams.numFiles, "arch.num-files", 0, "number of archived files per shard (PUT only; default gets computed from sizes)")
237237
f.StringVar(&p.archParams.minSzStr, "arch.minsize", "", "minimum file size (with or without multiplicative suffix K, MB, GiB, etc.)")
238238
f.StringVar(&p.archParams.maxSzStr, "arch.maxsize", "", "maximum file size (with or without multiplicative suffix K, MB, GiB, etc.)")
239-
f.BoolVar(&p.archParams.use, "arch.use", false, "enable sharding across all workloads: PUT, GET, chunks, batches, etc.")
239+
f.IntVar(&p.archParams.pct, "arch.pct", 0, "when writing: percentage of shards vs plain objects; when reading: include archived files in the GET pool (ratio is ultimately determined by dataset)")
240240

241241
// ============ Naming ============
242242
f.StringVar(&p.subDir, "subdir", "", "For GET requests, '-subdir' is a prefix that may or may not be an actual _virtual directory_;\n"+
@@ -349,12 +349,20 @@ func initParams(p *params) (err error) {
349349
if p.putSizeUpperBound, err = _parseSize(p.putSizeUpperBoundStr, "totalputsize", 0); err != nil {
350350
return err
351351
}
352+
352353
if p.minSize, err = _parseSize(p.minSizeStr, "minsize", cos.MiB); err != nil {
353354
return err
354355
}
355356
if p.maxSize, err = _parseSize(p.maxSizeStr, "maxsize", cos.GiB); err != nil {
356357
return err
357358
}
359+
if p.minSizeStr == "" && p.maxSizeStr != "" {
360+
p.minSize = min(p.minSize, p.maxSize)
361+
}
362+
if p.maxSizeStr == "" && p.minSizeStr != "" {
363+
p.maxSize = max(p.minSize, p.maxSize)
364+
}
365+
358366
if p.readOff, err = _parseSize(p.readOffStr, "readoff", 0); err != nil {
359367
return err
360368
}
@@ -659,7 +667,21 @@ func (p *params) validate() error {
659667
}
660668

661669
// validate archive
662-
if p.archParams.use {
670+
if p.archParams.pct != 0 {
671+
if p.archParams.pct < 0 || p.archParams.pct > 100 {
672+
return fmt.Errorf("invalid option: arch.pct %d (must be 0-100)", p.archParams.pct)
673+
}
674+
if s3Endpoint != "" {
675+
return errors.New("archive operations require AIStore cluster; not supported with '-s3endpoint' (direct S3 access)")
676+
}
677+
678+
// TODO:
679+
// - currently, we create each part independently with fresh readers
680+
// - multipart upload of shards requires a special streaming chunk writer
681+
if p.multipartPct > 0 {
682+
return errors.New("multipart uploads and archive operations are mutually exclusive (not supported yet)")
683+
}
684+
663685
mime := cos.NonZero(p.archParams.format, archive.ExtTar)
664686
arch := &readers.Arch{
665687
Mime: mime,

bench/tools/aisloader/print.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,7 @@ type (
369369
Cleanup bool `json:"cleanup"`
370370
}
371371
ppArch struct {
372+
Pct int `json:"% workload"`
372373
Format string `json:"format"`
373374
Prefix string `json:"prefix,omitempty"`
374375
NumFiles int `json:"files per shard,omitempty"`
@@ -379,7 +380,7 @@ type (
379380

380381
func printRunParams(p *params) {
381382
var arch *ppArch
382-
if p.archParams.use {
383+
if p.archParams.pct != 0 {
383384
// temp readers.Arch to run Init() and get computed values
384385
// aisloader's default format: TAR
385386
mime := cos.NonZero(p.archParams.format, archive.ExtTar)
@@ -395,6 +396,7 @@ func printRunParams(p *params) {
395396

396397
numf := cos.Ternary(tmpArch.Num == readers.DynamicNumFiles, 0, p.archParams.numFiles)
397398
arch = &ppArch{
399+
Pct: p.archParams.pct,
398400
Format: tmpArch.Mime,
399401
Prefix: tmpArch.Prefix,
400402
NumFiles: numf,

bench/tools/aisloader/run.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ func Start(version, buildtime string) (err error) {
247247

248248
// note that "'-skiplist' option is ignored when '-filelist' is specified"
249249
case !runParams.skipList:
250-
names, err := listObjects()
250+
names, fcnt, err := listObjects()
251251
if err != nil {
252252
return err
253253
}
@@ -269,7 +269,15 @@ func Start(version, buildtime string) (err error) {
269269
// initialize global name-getter and announce
270270
objnameGetter = ng
271271
objnameGetter.Init(names, rnd)
272-
fmt.Printf("Found %s existing object%s\n\n", cos.FormatBigInt(l), cos.Plural(l))
272+
switch fcnt {
273+
case 0:
274+
fmt.Printf("Found %s existing object%s\n\n", cos.FormatBigInt(l), cos.Plural(l))
275+
case l:
276+
fmt.Printf("Found %s archived file%s\n\n", cos.FormatBigInt(l), cos.Plural(l))
277+
default:
278+
fmt.Printf("Found %s plain object%s and %s archived file%s\n\n",
279+
cos.FormatBigInt(l-fcnt), cos.Plural(l-fcnt), cos.FormatBigInt(fcnt), cos.Plural(fcnt))
280+
}
273281

274282
default:
275283
objnameGetter = &namegetter.Random{}
@@ -735,16 +743,16 @@ func objNamesFromFile() (names []string, err error) {
735743
return
736744
}
737745

738-
func listObjects() (names []string, err error) {
746+
func listObjects() (names []string, fcnt int, err error) {
739747
switch {
740748
case runParams.fileList != "":
741749
names, err = objNamesFromFile()
742750
case isDirectS3():
743751
names, err = s3ListObjects()
744752
default:
745-
names, err = listObjectNames(runParams)
753+
names, fcnt, err = listObjectNames(runParams)
746754
}
747-
return
755+
return names, fcnt, err
748756
}
749757

750758
func newNameGetter(names []string) (ng namegetter.Basic, isPermBased bool) {

0 commit comments

Comments
 (0)