Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: lotus-provider: Storage reservations in SDR #11643

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions itests/harmonytask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ func (t *task1) Do(tID harmonytask.TaskID, stillOwned func() bool) (done bool, e
t.WorkCompleted = append(t.WorkCompleted, fmt.Sprintf("taskResult%d", t.myPersonalTable[tID]))
return true, nil
}
func (t *task1) CanAccept(list []harmonytask.TaskID, e *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
return &list[0], nil
func (t *task1) CanAccept(list []harmonytask.TaskID, e *harmonytask.TaskEngine) (*harmonytask.TaskID, harmonytask.AcceptData, error) {
return &list[0], nil, nil
}
func (t *task1) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{
Expand Down Expand Up @@ -107,7 +107,7 @@ type passthru struct {
func (t *passthru) Do(tID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
return t.do(tID, stillOwned)
}
func (t *passthru) CanAccept(list []harmonytask.TaskID, e *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
func (t *passthru) CanAccept(list []harmonytask.TaskID, e *harmonytask.TaskEngine) (*harmonytask.TaskID, harmonytask.AcceptData, error) {
return t.canAccept(list, e)
}
func (t *passthru) TypeDetails() harmonytask.TaskTypeDetails {
Expand Down
11 changes: 9 additions & 2 deletions lib/harmony/harmonytask/harmonytask.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ type TaskTypeDetails struct {
Follows map[string]func(TaskID, AddTaskFunc) (bool, error)
}

type AcceptData interface {
// NotClaimed is called by harmonytask when the task is not claimed by this node
NotClaimed() error
}

// TaskInterface must be implemented in order to have a task used by harmonytask.
type TaskInterface interface {
// Do the task assigned. Call stillOwned before making single-writer-only
Expand All @@ -49,13 +54,15 @@ type TaskInterface interface {
// ONLY be called by harmonytask.
// Indicate if the task no-longer needs scheduling with done=true including
// cases where it's past the deadline.
Do(taskID TaskID, stillOwned func() bool) (done bool, err error)
Do(taskID TaskID, acceptData AcceptData, stillOwned func() bool) (done bool, err error)

// CanAccept should return if the task can run on this machine. It should
// return null if the task type is not allowed on this machine.
// It should select the task it most wants to accomplish.
// It is also responsible for determining & reserving disk space (including scratch).
CanAccept([]TaskID, *TaskEngine) (*TaskID, error)
// AcceptData is an optional parameter which can be used to pass data to the Do function.
// If the task is not claimed on this node, harmonytask will call NotClaimed on the AcceptData.
CanAccept([]TaskID, *TaskEngine) (*TaskID, AcceptData, error)

// TypeDetails() returns static details about how this task behaves and
// how this machine will run it. Read once at the beginning.
Expand Down
18 changes: 16 additions & 2 deletions lib/harmony/harmonytask/task_type_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,21 @@ top:
}

// 3. What does the impl say?
tID, err := h.CanAccept(ids, h.TaskEngine)
tID, acceptData, err := h.CanAccept(ids, h.TaskEngine)
if err != nil {
log.Error(err)
return false
}

notClaimedAcceptData := acceptData
defer func() {
if notClaimedAcceptData != nil {
if err := notClaimedAcceptData.NotClaimed(); err != nil {
log.Errorw("NotClaimed() error", "error", err, "type", h.Name, "id", ids[0])
}
}
}()

if tID == nil {
log.Infow("did not accept task", "task_id", ids[0], "reason", "CanAccept() refused", "name", h.Name)
return false
Expand Down Expand Up @@ -144,7 +154,11 @@ top:
}
}()

done, doErr = h.Do(*tID, func() bool {
// set notClaimedAcceptData to nil so it doesn't get called in the defer
notClaimedAcceptData = nil

// Do the work
done, doErr = h.Do(*tID, acceptData, func() bool {
var owner int
// Background here because we don't want GracefulRestart to block this save.
err := h.TaskEngine.db.QueryRow(context.Background(),
Expand Down
149 changes: 139 additions & 10 deletions provider/lpffi/sdr_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/filecoin-project/lotus/lib/must"
"io"
"os"
"path/filepath"
"time"

"github.com/KarpelesLab/reflink"
"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -56,15 +58,45 @@ type storageProvider struct {
sindex paths.SectorIndex
}

func (l *storageProvider) AcquireSector(ctx context.Context, sector storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType) (storiface.SectorPaths, func(), error) {
type ReleaseStorageFunc func() // free storage reservation

type StorageReservation struct {
Release ReleaseStorageFunc
DeclareAllocated func()
Paths storiface.SectorPaths
PathIDs storiface.SectorPaths
}

type AcquireSettings struct {
release ReleaseStorageFunc
}

type AcquireOption func(*AcquireSettings)

func WithReservation(release ReleaseStorageFunc) AcquireOption {
return func(settings *AcquireSettings) {
settings.release = release
}
}

func (l *storageProvider) AcquireSector(ctx context.Context, sector storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType, opts ...AcquireOption) (storiface.SectorPaths, func(), error) {
settings := &AcquireSettings{}
for _, opt := range opts {
opt(settings)
}

paths, storageIDs, err := l.storage.AcquireSector(ctx, sector, existing, allocate, sealing, storiface.AcquireMove)
if err != nil {
return storiface.SectorPaths{}, nil, err
}

releaseStorage, err := l.localStore.Reserve(ctx, sector, allocate, storageIDs, storiface.FSOverheadSeal)
if err != nil {
return storiface.SectorPaths{}, nil, xerrors.Errorf("reserving storage space: %w", err)
var releaseStorage func() = settings.release

if releaseStorage == nil {
releaseStorage, err = l.localStore.Reserve(ctx, sector, allocate, storageIDs, storiface.FSOverheadSeal)
if err != nil {
return storiface.SectorPaths{}, nil, xerrors.Errorf("reserving storage space: %w", err)
}
}

log.Debugf("acquired sector %d (e:%d; a:%d): %v", sector, existing, allocate, paths)
Expand All @@ -85,13 +117,94 @@ func (l *storageProvider) AcquireSector(ctx context.Context, sector storiface.Se
}, nil
}

func (sb *SealCalls) GenerateSDR(ctx context.Context, sector storiface.SectorRef, ticket abi.SealRandomness, commKcid cid.Cid) error {
paths, releaseSector, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTNone, storiface.FTCache, storiface.PathSealing)
func (sb *SealCalls) ReserveSDRStorage(ctx context.Context, sector storiface.SectorRef) (*StorageReservation, error) {
// storage writelock sector
lkctx, cancel := context.WithCancel(ctx)

allocate := storiface.FTCache

lockAcquireTimuout := time.Second * 10
lockAcquireTimer := time.NewTimer(lockAcquireTimuout)

go func() {
defer cancel()

select {
case <-lockAcquireTimer.C:
case <-ctx.Done():
}
}()

if err := sb.sectors.sindex.StorageLock(lkctx, sector.ID, storiface.FTNone, allocate); err != nil {
// timer will expire
return nil, err
}

if !lockAcquireTimer.Stop() {
// timer expired, so lkctx is done, and that means the lock was acquired and dropped..
return nil, xerrors.Errorf("failed to acquire lock")
}
defer func() {
// make sure we release the sector lock
lockAcquireTimer.Reset(0)
}()

// find anywhere
// if found return nil, for now
s, err := sb.sectors.sindex.StorageFindSector(ctx, sector.ID, allocate, must.One(sector.ProofType.SectorSize()), false)
if err != nil {
return xerrors.Errorf("acquiring sector paths: %w", err)
return nil, err
}
defer releaseSector()

lp, err := sb.sectors.localStore.Local(ctx)
if err != nil {
return nil, err
}

// see if there are any non-local sector files in storage
for _, info := range s {
for _, l := range lp {
if l.ID == info.ID {
continue
}

return nil, xerrors.Errorf("sector cache already exists and isn't in local storage, can't reserve")
}
}

// acquire a path to make a reservation in
pathsFs, pathIDs, err := sb.sectors.localStore.AcquireSector(ctx, sector, storiface.FTNone, allocate, storiface.PathSealing, storiface.AcquireMove)
if err != nil {
return nil, err
}

// reserve the space
release, err := sb.sectors.localStore.Reserve(ctx, sector, allocate, pathIDs, storiface.FSOverheadSeal)
if err != nil {
return nil, err
}

declareAllocated := func() {
for _, fileType := range allocate.AllSet() {
sid := storiface.PathByType(pathIDs, fileType)
if err := sb.sectors.sindex.StorageDeclareSector(ctx, storiface.ID(sid), sector.ID, fileType, true); err != nil {
log.Errorf("declare sector error: %+v", err)
}
}
}

// note: we drop the sector writelock on return; THAT IS INTENTIONAL, this code runs in CanAccept, which doesn't
// guarantee that the work for this sector will happen on this node; SDR CanAccept just ensures that the node can
// run the job, harmonytask is what ensures that only one SDR runs at a time
return &StorageReservation{
Release: release,
DeclareAllocated: declareAllocated,
Paths: pathsFs,
PathIDs: pathIDs,
}, nil
}

func (sb *SealCalls) GenerateSDR(ctx context.Context, reservation *StorageReservation, sector storiface.SectorRef, ticket abi.SealRandomness, commKcid cid.Cid) error {
// prepare SDR params
commp, err := commcid.CIDToDataCommitmentV1(commKcid)
if err != nil {
Expand All @@ -103,14 +216,30 @@ func (sb *SealCalls) GenerateSDR(ctx context.Context, sector storiface.SectorRef
return xerrors.Errorf("computing replica id: %w", err)
}

// acquire sector paths
var sectorPaths storiface.SectorPaths
if reservation != nil {
sectorPaths = reservation.Paths
// note: GenerateSDR manages the reservation, so we only need to declare at the end
defer reservation.DeclareAllocated()
} else {
var releaseSector func()
var err error
sectorPaths, releaseSector, err = sb.sectors.AcquireSector(ctx, sector, storiface.FTNone, storiface.FTCache, storiface.PathSealing)
if err != nil {
return xerrors.Errorf("acquiring sector paths: %w", err)
}
defer releaseSector()
}

// generate new sector key
err = ffi.GenerateSDR(
sector.ProofType,
paths.Cache,
sectorPaths.Cache,
replicaID,
)
if err != nil {
return xerrors.Errorf("generating SDR %d (%s): %w", sector.ID.Number, paths.Unsealed, err)
return xerrors.Errorf("generating SDR %d (%s): %w", sector.ID.Number, sectorPaths.Unsealed, err)
}

return nil
Expand Down
10 changes: 5 additions & 5 deletions provider/lpmessage/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type SendTask struct {
db *harmonydb.DB
}

func (s *SendTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
func (s *SendTask) Do(taskID harmonytask.TaskID, data harmonytask.AcceptData, stillOwned func() bool) (done bool, err error) {
ctx := context.TODO()

// get message from db
Expand Down Expand Up @@ -219,18 +219,18 @@ func (s *SendTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done b
return true, nil
}

func (s *SendTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
func (s *SendTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, harmonytask.AcceptData, error) {
if len(ids) == 0 {
// probably can't happen, but panicking is bad
return nil, nil
return nil, nil, nil
}

if s.signer == nil {
// can't sign messages here
return nil, nil
return nil, nil, nil
}

return &ids[0], nil
return &ids[0], nil, nil
}

func (s *SendTask) TypeDetails() harmonytask.TaskTypeDetails {
Expand Down
12 changes: 6 additions & 6 deletions provider/lpseal/task_finalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func NewFinalizeTask(max int, sp *SealPoller, sc *lpffi.SealCalls, db *harmonydb
}
}

func (f *FinalizeTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
func (f *FinalizeTask) Do(taskID harmonytask.TaskID, data harmonytask.AcceptData, stillOwned func() bool) (done bool, err error) {
var tasks []struct {
SpID int64 `db:"sp_id"`
SectorNumber int64 `db:"sector_number"`
Expand Down Expand Up @@ -78,7 +78,7 @@ func (f *FinalizeTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (do
return true, nil
}

func (f *FinalizeTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
func (f *FinalizeTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, harmonytask.AcceptData, error) {
var tasks []struct {
TaskID harmonytask.TaskID `db:"task_id_finalize"`
SpID int64 `db:"sp_id"`
Expand All @@ -103,12 +103,12 @@ func (f *FinalizeTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.T
WHERE task_id_finalize = ANY ($1) AND l.sector_filetype = 4
`, indIDs)
if err != nil {
return nil, xerrors.Errorf("getting tasks: %w", err)
return nil, nil, xerrors.Errorf("getting tasks: %w", err)
}

ls, err := f.sc.LocalStorage(ctx)
if err != nil {
return nil, xerrors.Errorf("getting local storage: %w", err)
return nil, nil, xerrors.Errorf("getting local storage: %w", err)
}

acceptables := map[harmonytask.TaskID]bool{}
Expand All @@ -124,12 +124,12 @@ func (f *FinalizeTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.T

for _, l := range ls {
if string(l.ID) == t.StorageID {
return &t.TaskID, nil
return &t.TaskID, nil, nil
}
}
}

return nil, nil
return nil, nil, nil
}

func (f *FinalizeTask) TypeDetails() harmonytask.TaskTypeDetails {
Expand Down
Loading