Skip to content

Commit

Permalink
fix: curio taskstorage: Don't try to free reservations by nulled Task…
Browse files Browse the repository at this point in the history
…ID (#12018)
  • Loading branch information
magik6k authored May 20, 2024
1 parent 1afe58d commit db105f1
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 21 deletions.
23 changes: 10 additions & 13 deletions curiosrc/ffi/task_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (t *TaskStorage) HasCapacity() bool {
return false // no path found
}

func (t *TaskStorage) Claim(taskID int) error {
func (t *TaskStorage) Claim(taskID int) (func() error, error) {
// TaskStorage Claim Attempts to reserve storage for the task
// A: Create a reservation for files to be allocated
// B: Create a reservation for existing files to be fetched into local storage
Expand All @@ -125,7 +125,7 @@ func (t *TaskStorage) Claim(taskID int) error {

sectorRef, err := t.taskToSectorRef(harmonytask.TaskID(taskID))
if err != nil {
return xerrors.Errorf("getting sector ref: %w", err)
return nil, xerrors.Errorf("getting sector ref: %w", err)
}

// storage writelock sector
Expand All @@ -147,12 +147,12 @@ func (t *TaskStorage) Claim(taskID int) error {

if err := t.sc.sectors.sindex.StorageLock(lkctx, sectorRef.ID(), storiface.FTNone, requestedTypes); err != nil {
// timer will expire
return xerrors.Errorf("claim StorageLock: %w", err)
return nil, xerrors.Errorf("claim StorageLock: %w", err)
}

if !lockAcquireTimer.Stop() {
// timer expired, so lkctx is done, and that means the lock was acquired and dropped..
return xerrors.Errorf("failed to acquire lock")
return nil, xerrors.Errorf("failed to acquire lock")
}
defer func() {
// make sure we release the sector lock
Expand All @@ -166,13 +166,13 @@ func (t *TaskStorage) Claim(taskID int) error {
// paths to be used.
pathsFs, pathIDs, err := t.sc.sectors.localStore.AcquireSector(ctx, sectorRef.Ref(), storiface.FTNone, requestedTypes, t.pathType, storiface.AcquireMove)
if err != nil {
return err
return nil, err
}

// reserve the space
release, err := t.sc.sectors.localStore.Reserve(ctx, sectorRef.Ref(), requestedTypes, pathIDs, storiface.FSOverheadSeal, t.MinFreeStoragePercentage)
if err != nil {
return err
return nil, err
}

var releaseOnce sync.Once
Expand All @@ -197,19 +197,16 @@ func (t *TaskStorage) Claim(taskID int) error {
// note: we drop the sector writelock on return; THAT IS INTENTIONAL, this code runs in CanAccept, which doesn't
// guarantee that the work for this sector will happen on this node; SDR CanAccept just ensures that the node can
// run the job, harmonytask is what ensures that only one SDR runs at a time
return nil
return func() error {
return t.markComplete(taskID, sectorRef)
}, nil
}

func (t *TaskStorage) MarkComplete(taskID int) error {
func (t *TaskStorage) markComplete(taskID int, sectorRef SectorRef) error {
// MarkComplete is ALWAYS called after the task is done or not scheduled
// If Claim is called and returns without errors, MarkComplete with the same
// taskID is guaranteed to eventually be called

sectorRef, err := t.taskToSectorRef(harmonytask.TaskID(taskID))
if err != nil {
return xerrors.Errorf("getting sector ref: %w", err)
}

sres, ok := t.sc.sectors.storageReservations.Load(harmonytask.TaskID(taskID))
if !ok {
return xerrors.Errorf("no reservation found for task %d", taskID)
Expand Down
5 changes: 3 additions & 2 deletions lib/harmony/harmonytask/task_type_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ canAcceptAgain:
releaseStorage := func() {
}
if h.TaskTypeDetails.Cost.Storage != nil {
if err = h.TaskTypeDetails.Cost.Storage.Claim(int(*tID)); err != nil {
markComplete, err := h.TaskTypeDetails.Cost.Storage.Claim(int(*tID))
if err != nil {
log.Infow("did not accept task", "task_id", strconv.Itoa(int(*tID)), "reason", "storage claim failed", "name", h.Name, "error", err)

if len(ids) > 1 {
Expand All @@ -122,7 +123,7 @@ canAcceptAgain:
return false
}
releaseStorage = func() {
if err := h.TaskTypeDetails.Cost.Storage.MarkComplete(int(*tID)); err != nil {
if err := markComplete(); err != nil {
log.Errorw("Could not release storage", "error", err)
}
}
Expand Down
8 changes: 2 additions & 6 deletions lib/harmony/resources/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,8 @@ type Resources struct {
type Storage interface {
HasCapacity() bool

// This allows some other system to claim space for this task.
Claim(taskID int) error

// This allows some other system to consider the task done.
// It's up to the caller to remove the data, if that applies.
MarkComplete(taskID int) error
// This allows some other system to claim space for this task. Returns a cleanup function
Claim(taskID int) (func() error, error)
}
type Reg struct {
Resources
Expand Down

0 comments on commit db105f1

Please sign in to comment.