Skip to content

Commit

Permalink
wip improve scheduling of ready work
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Sep 15, 2021
1 parent ef03314 commit 033a925
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 8 deletions.
3 changes: 2 additions & 1 deletion extern/sector-storage/sched.go
Expand Up @@ -117,7 +117,8 @@ type activeResources struct {
gpuUsed bool
cpuUse uint64

cond *sync.Cond
cond *sync.Cond
waiting int
}

type workerRequest struct {
Expand Down
14 changes: 11 additions & 3 deletions extern/sector-storage/sched_resources.go
Expand Up @@ -11,21 +11,25 @@ func (a *activeResources) withResources(id WorkerID, wr storiface.WorkerInfo, r
if a.cond == nil {
a.cond = sync.NewCond(locker)
}
a.waiting++
a.cond.Wait()
a.waiting--
}

a.add(wr.Resources, r)

err := cb()

a.free(wr.Resources, r)
if a.cond != nil {
a.cond.Broadcast()
}

return err
}

// must be called with the same lock as the one passed to withResources
func (a *activeResources) hasWorkWaiting() bool {
return a.waiting > 0
}

func (a *activeResources) add(wr storiface.WorkerResources, r Resources) {
if r.CanGPU {
a.gpuUsed = true
Expand All @@ -42,6 +46,10 @@ func (a *activeResources) free(wr storiface.WorkerResources, r Resources) {
a.cpuUse -= r.Threads(wr.CPUs)
a.memUsedMin -= r.MinMemory
a.memUsedMax -= r.MaxMemory

if a.cond != nil {
a.cond.Broadcast()
}
}

// canHandleRequest evaluates if the worker has enough available resources to
Expand Down
114 changes: 110 additions & 4 deletions extern/sector-storage/sched_worker.go
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"time"

"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"golang.org/x/xerrors"

"github.com/filecoin-project/lotus/extern/sector-storage/stores"
Expand Down Expand Up @@ -338,6 +339,11 @@ func (sw *schedWorker) workerCompactWindows() {
}

func (sw *schedWorker) processAssignedWindows() {
sw.assignReadyWork()
sw.assignPreparingWork()
}

func (sw *schedWorker) assignPreparingWork() {
worker := sw.worker

assignLoop:
Expand Down Expand Up @@ -366,7 +372,67 @@ assignLoop:
todo := firstWindow.todo[tidx]

log.Debugf("assign worker sector %d", todo.sector.ID.Number)
err := sw.startProcessingTask(sw.taskDone, todo)
err := sw.startProcessingTask(todo)

if err != nil {
log.Errorf("startProcessingTask error: %+v", err)
go todo.respond(xerrors.Errorf("startProcessingTask error: %w", err))
}

// Note: we're not freeing window.allocated resources here very much on purpose
copy(firstWindow.todo[tidx:], firstWindow.todo[tidx+1:])
firstWindow.todo[len(firstWindow.todo)-1] = nil
firstWindow.todo = firstWindow.todo[:len(firstWindow.todo)-1]
}

copy(worker.activeWindows, worker.activeWindows[1:])
worker.activeWindows[len(worker.activeWindows)-1] = nil
worker.activeWindows = worker.activeWindows[:len(worker.activeWindows)-1]

sw.windowsRequested--
}
}

func (sw *schedWorker) assignReadyWork() {
worker := sw.worker

worker.lk.Lock()
defer worker.lk.Unlock()

if worker.active.hasWorkWaiting() {
// prepared tasks have priority
return
}

assignLoop:
// process windows in order
for len(worker.activeWindows) > 0 {
firstWindow := worker.activeWindows[0]

// process tasks within a window, preferring tasks at lower indexes
for len(firstWindow.todo) > 0 {
tidx := -1

for t, todo := range firstWindow.todo {
if todo.taskType != sealtasks.TTCommit1 && todo.taskType != sealtasks.TTCommit2 { // todo put in task
continue
}

needRes := ResourceTable[todo.taskType][todo.sector.ProofType]
if worker.active.canHandleRequest(needRes, sw.wid, "startPreparing", worker.info) {
tidx = t
break
}
}

if tidx == -1 {
break assignLoop
}

todo := firstWindow.todo[tidx]

log.Debugf("assign worker sector %d (ready)", todo.sector.ID.Number)
err := sw.startProcessingReadyTask(todo)

if err != nil {
log.Errorf("startProcessingTask error: %+v", err)
Expand All @@ -387,7 +453,7 @@ assignLoop:
}
}

func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRequest) error {
func (sw *schedWorker) startProcessingTask(req *workerRequest) error {
w, sh := sw.worker, sw.sched

needRes := ResourceTable[req.taskType][req.sector.ProofType]
Expand All @@ -406,7 +472,7 @@ func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRe
w.lk.Unlock()

select {
case taskDone <- struct{}{}:
case sw.taskDone <- struct{}{}:
case <-sh.closing:
log.Warnf("scheduler closed while sending response (prepare error: %+v)", err)
}
Expand All @@ -428,7 +494,7 @@ func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRe
defer w.lk.Lock() // we MUST return locked from this function

select {
case taskDone <- struct{}{}:
case sw.taskDone <- struct{}{}:
case <-sh.closing:
}

Expand Down Expand Up @@ -457,6 +523,46 @@ func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRe
return nil
}

func (sw *schedWorker) startProcessingReadyTask(req *workerRequest) error {
w, sh := sw.worker, sw.sched

needRes := ResourceTable[req.taskType][req.sector.ProofType]

w.active.add(w.info.Resources, needRes)

go func() {
// Do the work!
err := req.work(req.ctx, sh.workTracker.worker(sw.wid, w.info, w.workerRpc))

select {
case req.ret <- workerResponse{err: err}:
case <-req.ctx.Done():
log.Warnf("request got cancelled before we could respond")
case <-sh.closing:
log.Warnf("scheduler closed while sending response")
}

w.lk.Lock()

w.active.free(w.info.Resources, needRes)

select {
case sw.taskDone <- struct{}{}:
case <-sh.closing:
log.Warnf("scheduler closed while sending response (prepare error: %+v)", err)
}

w.lk.Unlock()

// This error should always be nil, since nothing is setting it, but just to be safe:
if err != nil {
log.Errorf("error executing worker (ready): %+v", err)
}
}()

return nil
}

func (sh *scheduler) workerCleanup(wid WorkerID, w *workerHandle) {
select {
case <-w.closingMgr:
Expand Down

0 comments on commit 033a925

Please sign in to comment.