diff --git a/extern/sector-storage/sched.go b/extern/sector-storage/sched.go index 757e89299f0..bb5fd39c976 100644 --- a/extern/sector-storage/sched.go +++ b/extern/sector-storage/sched.go @@ -2,6 +2,7 @@ package sectorstorage import ( "context" + "math" "math/rand" "sort" "sync" @@ -365,7 +366,7 @@ func (sh *scheduler) trySched() { } windows := make([]schedWindow, windowsLen) - acceptableWindows := make([][]int, queueLen) + acceptableWindows := make([][]int, queueLen) // QueueIndex -> []OpenWindowIndex // Step 1 throttle := make(chan struct{}, windowsLen) @@ -459,34 +460,57 @@ func (sh *scheduler) trySched() { // Step 2 scheduled := 0 rmQueue := make([]int, 0, queueLen) + workerUtil := map[storiface.WorkerID]float64{} for sqi := 0; sqi < queueLen; sqi++ { task := (*sh.schedQueue)[sqi] selectedWindow := -1 - for _, wnd := range acceptableWindows[task.indexHeap] { + var needRes storiface.Resources + var info storiface.WorkerInfo + var bestWid storiface.WorkerID + bestUtilization := math.MaxFloat64 // smaller = better + + for i, wnd := range acceptableWindows[task.indexHeap] { wid := sh.openWindows[wnd].worker - info := sh.workers[wid].info + w := sh.workers[wid] - log.Debugf("SCHED try assign sqi:%d sector %d to window %d", sqi, task.sector.ID.Number, wnd) + res := info.Resources.ResourceSpec(task.sector.ProofType, task.taskType) - needRes := info.Resources.ResourceSpec(task.sector.ProofType, task.taskType) + log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.sector.ID.Number, wnd, i) // TODO: allow bigger windows if !windows[wnd].allocated.canHandleRequest(needRes, wid, "schedAssign", info) { continue } - log.Debugf("SCHED ASSIGNED sqi:%d sector %d task %s to window %d", sqi, task.sector.ID.Number, task.taskType, wnd) - - windows[wnd].allocated.add(info.Resources, needRes) - // TODO: We probably want to re-sort acceptableWindows here based on new - // workerHandle.utilization + windows[wnd].allocated.utilization (workerHandle.utilization is used in all - // task selectors, but not in the same way, so need to figure out how to do that in a non-O(n^2 way), and - // without additional network roundtrips (O(n^2) could be avoided by turning acceptableWindows.[] into heaps)) + wu, found := workerUtil[wid] + if !found { + wu = w.utilization() + workerUtil[wid] = wu + } + if wu >= bestUtilization { + // acceptable worker list is initally sorted by utilization, and the initially-best workers + // will be assigned tasks first. This means that if we find a worker which isn't better, it + // probably means that the other workers aren't better either. + // + // utilization + // ^ + // | / + // | \ / + // | \ / + // | * + // #--------> acceptableWindow index + // + // * -> we're here + break + } + info = w.info + needRes = res + bestWid = wid selectedWindow = wnd - break + bestUtilization = wu } if selectedWindow < 0 { @@ -494,6 +518,15 @@ func (sh *scheduler) trySched() { continue } + log.Debugw("SCHED ASSIGNED", + "sqi", sqi, + "sector", task.sector.ID.Number, + "task", task.taskType, + "window", selectedWindow, + "worker", bestWid, + "utilization", bestUtilization) + + workerUtil[bestWid] += windows[selectedWindow].allocated.add(info.Resources, needRes) windows[selectedWindow].todo = append(windows[selectedWindow].todo, task) rmQueue = append(rmQueue, sqi) diff --git a/extern/sector-storage/sched_resources.go b/extern/sector-storage/sched_resources.go index 6034dc2e916..58a181d6bd0 100644 --- a/extern/sector-storage/sched_resources.go +++ b/extern/sector-storage/sched_resources.go @@ -33,13 +33,18 @@ func (a *activeResources) hasWorkWaiting() bool { return a.waiting > 0 } -func (a *activeResources) add(wr storiface.WorkerResources, r storiface.Resources) { +// add task resources to activeResources and return utilization difference +func (a *activeResources) add(wr storiface.WorkerResources, r storiface.Resources) float64 { + startUtil := a.utilization(wr) + if r.GPUUtilization > 0 { a.gpuUsed += r.GPUUtilization } a.cpuUse += r.Threads(wr.CPUs, len(wr.GPUs)) a.memUsedMin += r.MinMemory a.memUsedMax += r.MaxMemory + + return a.utilization(wr) - startUtil } func (a *activeResources) free(wr storiface.WorkerResources, r storiface.Resources) { @@ -104,6 +109,7 @@ func (a *activeResources) canHandleRequest(needRes storiface.Resources, wid stor return true } +// utilization returns a number in 0..1 range indicating fraction of used resources func (a *activeResources) utilization(wr storiface.WorkerResources) float64 { var max float64 @@ -129,6 +135,13 @@ func (a *activeResources) utilization(wr storiface.WorkerResources) float64 { max = memMax } + if len(wr.GPUs) > 0 { + gpuMax := a.gpuUsed / float64(len(wr.GPUs)) + if gpuMax > max { + max = gpuMax + } + } + return max }