Skip to content

Commit 9dc1f19

Browse files
committed
xs/blob: add periodic load throttling to workers
- introduce per-worker advice instance and counter for `adv.ShouldCheck` - throttle when disk load is high, similar to moss xaction pattern Signed-off-by: Tony Chen <a122774007@gmail.com>
1 parent 8ef6819 commit 9dc1f19

File tree

1 file changed

+25
-9
lines changed

1 file changed

+25
-9
lines changed

xact/xs/blob_download.go

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ type (
5555
vlabs map[string]string
5656
xlabs map[string]string
5757
workers []*worker
58+
config *cmn.Config
5859
pending map[int64]*chunkTask // map of pending tasks indexed by roff
5960
xact.Base
6061
wg sync.WaitGroup
@@ -74,7 +75,9 @@ type (
7475
// internal
7576
type (
7677
worker struct {
77-
parent *XactBlobDl
78+
parent *XactBlobDl
79+
adv load.Advice
80+
nchunks int64 // per-worker sequential counter for ShouldCheck
7881
}
7982
chunkTask struct {
8083
name string // for logging and debug assertions
@@ -252,13 +255,11 @@ func (p *blobFactory) Start() (err error) {
252255
)
253256

254257
// Admission check: assess load to determine if we can start
255-
config := cmn.GCO.Get()
258+
r.config = cmn.GCO.Get()
256259
r.adv.Init(load.FlMem|load.FlCla|load.FlDsk, &load.Extra{
257-
Mi: r.args.Lom.Mountpath(),
258-
Cfg: &config.Disk,
260+
Cfg: &r.config.Disk,
259261
RW: true, // data I/O operation
260262
})
261-
r.adv.Refresh()
262263

263264
if r.adv.MemLoad() == load.Critical {
264265
err := fmt.Errorf("%s: rejected due to resource pressure (%s) - not starting", r.Name(), r.adv.String())
@@ -279,10 +280,9 @@ func (p *blobFactory) Start() (err error) {
279280

280281
if r.numWorkers != nwpNone {
281282
r.workers = make([]*worker, r.numWorkers)
283+
r.adv.Refresh() // refresh advice one more time before copying to workers
282284
for i := range r.workers {
283-
r.workers[i] = &worker{
284-
parent: r,
285-
}
285+
r.workers[i] = r.newWorker()
286286
}
287287
// open channels
288288
r.workCh = make(chan *chunkTask, r.numWorkers)
@@ -371,7 +371,7 @@ func (r *XactBlobDl) runSerial() error {
371371
buf, slab := core.T.PageMM().AllocSize(r.chunkSize)
372372
defer slab.Free(buf)
373373

374-
worker := &worker{parent: r}
374+
worker := r.newWorker()
375375
tsk := &chunkTask{name: r.Name() + "_chunk_task", roff: r.nextRoff}
376376
debug.IncCounter(tsk.name)
377377
defer tsk.cleanup()
@@ -556,6 +556,11 @@ func (r *XactBlobDl) Abort(err error) bool {
556556
return true
557557
}
558558

559+
func (r *XactBlobDl) newWorker() *worker {
560+
w := &worker{parent: r, adv: r.adv} // note: advice is copied by value
561+
return w
562+
}
563+
559564
func (r *XactBlobDl) startWorkers() {
560565
for i := range r.workers {
561566
if r.nextRoff >= r.fullSize {
@@ -702,6 +707,17 @@ func (w *worker) do(tsk *chunkTask, buf []byte) (int, error) {
702707

703708
partNum := tsk.roff/chunkSize + 1
704709

710+
// Periodic load check using sequential counter
711+
w.nchunks++
712+
if w.adv.ShouldCheck(w.nchunks) {
713+
w.adv.Refresh()
714+
// Note: the disk load checks utilizations across all mountpaths
715+
if w.adv.DskLoad() >= load.High {
716+
debug.Assert(w.adv.Sleep > 0)
717+
time.Sleep(w.adv.Sleep)
718+
}
719+
}
720+
705721
// Get object range reader
706722
res := core.T.Backend(lom.Bck()).GetObjReader(context.Background(), lom, tsk.roff, chunkSize)
707723
if res.Err != nil || res.ErrCode == http.StatusRequestedRangeNotSatisfiable || res.R == nil {

0 commit comments

Comments
 (0)