Skip to content

Commit

Permalink
feat: sched: Improve worker assigning logic
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Apr 6, 2022
1 parent e0c4f06 commit b623f28
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 14 deletions.
59 changes: 46 additions & 13 deletions extern/sector-storage/sched.go
Expand Up @@ -2,6 +2,7 @@ package sectorstorage

import (
"context"
"math"
"math/rand"
"sort"
"sync"
Expand Down Expand Up @@ -365,7 +366,7 @@ func (sh *scheduler) trySched() {
}

windows := make([]schedWindow, windowsLen)
acceptableWindows := make([][]int, queueLen)
acceptableWindows := make([][]int, queueLen) // QueueIndex -> []OpenWindowIndex

// Step 1
throttle := make(chan struct{}, windowsLen)
Expand Down Expand Up @@ -459,41 +460,73 @@ func (sh *scheduler) trySched() {
// Step 2
scheduled := 0
rmQueue := make([]int, 0, queueLen)
workerUtil := map[storiface.WorkerID]float64{}

for sqi := 0; sqi < queueLen; sqi++ {
task := (*sh.schedQueue)[sqi]

selectedWindow := -1
for _, wnd := range acceptableWindows[task.indexHeap] {
var needRes storiface.Resources
var info storiface.WorkerInfo
var bestWid storiface.WorkerID
bestUtilization := math.MaxFloat64 // smaller = better

for i, wnd := range acceptableWindows[task.indexHeap] {
wid := sh.openWindows[wnd].worker
info := sh.workers[wid].info
w := sh.workers[wid]

log.Debugf("SCHED try assign sqi:%d sector %d to window %d", sqi, task.sector.ID.Number, wnd)
res := info.Resources.ResourceSpec(task.sector.ProofType, task.taskType)

needRes := info.Resources.ResourceSpec(task.sector.ProofType, task.taskType)
log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.sector.ID.Number, wnd, i)

// TODO: allow bigger windows
if !windows[wnd].allocated.canHandleRequest(needRes, wid, "schedAssign", info) {
continue
}

log.Debugf("SCHED ASSIGNED sqi:%d sector %d task %s to window %d", sqi, task.sector.ID.Number, task.taskType, wnd)

windows[wnd].allocated.add(info.Resources, needRes)
// TODO: We probably want to re-sort acceptableWindows here based on new
// workerHandle.utilization + windows[wnd].allocated.utilization (workerHandle.utilization is used in all
// task selectors, but not in the same way, so need to figure out how to do that in a non-O(n^2 way), and
// without additional network roundtrips (O(n^2) could be avoided by turning acceptableWindows.[] into heaps))
wu, found := workerUtil[wid]
if !found {
wu = w.utilization()
workerUtil[wid] = wu
}
if wu >= bestUtilization {
// acceptable worker list is initally sorted by utilization, and the initially-best workers
// will be assigned tasks first. This means that if we find a worker which isn't better, it
// probably means that the other workers aren't better either.
//
// utilization
// ^
// | /
// | \ /
// | \ /
// | *
// #--------> acceptableWindow index
//
// * -> we're here
break
}

info = w.info
needRes = res
bestWid = wid
selectedWindow = wnd
break
bestUtilization = wu
}

if selectedWindow < 0 {
// all windows full
continue
}

log.Debugw("SCHED ASSIGNED",
"sqi", sqi,
"sector", task.sector.ID.Number,
"task", task.taskType,
"window", selectedWindow,
"worker", bestWid,
"utilization", bestUtilization)

workerUtil[bestWid] += windows[selectedWindow].allocated.add(info.Resources, needRes)
windows[selectedWindow].todo = append(windows[selectedWindow].todo, task)

rmQueue = append(rmQueue, sqi)
Expand Down
15 changes: 14 additions & 1 deletion extern/sector-storage/sched_resources.go
Expand Up @@ -33,13 +33,18 @@ func (a *activeResources) hasWorkWaiting() bool {
return a.waiting > 0
}

func (a *activeResources) add(wr storiface.WorkerResources, r storiface.Resources) {
// add task resources to activeResources and return utilization difference
func (a *activeResources) add(wr storiface.WorkerResources, r storiface.Resources) float64 {
startUtil := a.utilization(wr)

if r.GPUUtilization > 0 {
a.gpuUsed += r.GPUUtilization
}
a.cpuUse += r.Threads(wr.CPUs, len(wr.GPUs))
a.memUsedMin += r.MinMemory
a.memUsedMax += r.MaxMemory

return a.utilization(wr) - startUtil
}

func (a *activeResources) free(wr storiface.WorkerResources, r storiface.Resources) {
Expand Down Expand Up @@ -104,6 +109,7 @@ func (a *activeResources) canHandleRequest(needRes storiface.Resources, wid stor
return true
}

// utilization returns a number in 0..1 range indicating fraction of used resources
func (a *activeResources) utilization(wr storiface.WorkerResources) float64 {
var max float64

Expand All @@ -129,6 +135,13 @@ func (a *activeResources) utilization(wr storiface.WorkerResources) float64 {
max = memMax
}

if len(wr.GPUs) > 0 {
gpuMax := a.gpuUsed / float64(len(wr.GPUs))
if gpuMax > max {
max = gpuMax
}
}

return max
}

Expand Down

0 comments on commit b623f28

Please sign in to comment.