Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: sealing: Remove sector copies from workers after snapdeals #8329

Merged
merged 5 commits into from Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions api/version.go
Expand Up @@ -57,8 +57,8 @@ var (
FullAPIVersion0 = newVer(1, 5, 0)
FullAPIVersion1 = newVer(2, 2, 0)

MinerAPIVersion0 = newVer(1, 4, 0)
WorkerAPIVersion0 = newVer(1, 5, 0)
MinerAPIVersion0 = newVer(1, 5, 0)
WorkerAPIVersion0 = newVer(1, 6, 0)
)

//nolint:varcheck,deadcode
Expand Down
43 changes: 26 additions & 17 deletions extern/sector-storage/manager.go
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/mitchellh/go-homedir"
"go.uber.org/multierr"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-state-types/abi"
Expand Down Expand Up @@ -589,15 +590,15 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect
return xerrors.Errorf("acquiring sector lock: %w", err)
}

fts := storiface.FTUnsealed
moveUnsealed := storiface.FTUnsealed
{
unsealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false)
if err != nil {
return xerrors.Errorf("finding unsealed sector: %w", err)
}

if len(unsealedStores) == 0 { // Is some edge-cases unsealed sector may not exist already, that's fine
fts = storiface.FTNone
moveUnsealed = storiface.FTNone
}
}

Expand All @@ -616,10 +617,10 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect
}
}

selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed|storiface.FTUpdate|storiface.FTUpdateCache, false)
selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTUpdateCache, false)

err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalizeReplicaUpdate, selector,
m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|storiface.FTUpdate|storiface.FTUpdateCache|fts, pathType, storiface.AcquireMove),
m.schedFetch(sector, storiface.FTCache|storiface.FTUpdateCache|moveUnsealed, pathType, storiface.AcquireMove),
func(ctx context.Context, w Worker) error {
_, err := m.waitSimpleCall(ctx)(w.FinalizeReplicaUpdate(ctx, sector, keepUnsealed))
return err
Expand All @@ -628,22 +629,30 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect
return err
}

fetchSel := newAllocSelector(m.index, storiface.FTCache|storiface.FTSealed|storiface.FTUpdate|storiface.FTUpdateCache, storiface.PathStorage)
moveUnsealed := fts
{
if len(keepUnsealed) == 0 {
moveUnsealed = storiface.FTNone
move := func(types storiface.SectorFileType) error {
fetchSel := newAllocSelector(m.index, types, storiface.PathStorage)
{
if len(keepUnsealed) == 0 {
moveUnsealed = storiface.FTNone
}
}

err = m.sched.Schedule(ctx, sector, sealtasks.TTFetch, fetchSel,
m.schedFetch(sector, types, storiface.PathStorage, storiface.AcquireMove),
func(ctx context.Context, w Worker) error {
_, err := m.waitSimpleCall(ctx)(w.MoveStorage(ctx, sector, types))
return err
})
if err != nil {
return xerrors.Errorf("moving sector to storage: %w", err)
}
return nil
}

err = m.sched.Schedule(ctx, sector, sealtasks.TTFetch, fetchSel,
m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|storiface.FTUpdate|storiface.FTUpdateCache|moveUnsealed, storiface.PathStorage, storiface.AcquireMove),
func(ctx context.Context, w Worker) error {
_, err := m.waitSimpleCall(ctx)(w.MoveStorage(ctx, sector, storiface.FTCache|storiface.FTSealed|storiface.FTUpdate|storiface.FTUpdateCache|moveUnsealed))
return err
})
if err != nil {
return xerrors.Errorf("moving sector to storage: %w", err)
err = multierr.Append(move(storiface.FTUpdate|storiface.FTUpdateCache), move(storiface.FTCache))
err = multierr.Append(err, move(storiface.FTSealed)) // Sealed separate from cache just in case ReleaseSectorKey was already called
if moveUnsealed != storiface.FTNone {
err = multierr.Append(err, move(moveUnsealed))
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion extern/sector-storage/stores/http_handler.go
Expand Up @@ -172,7 +172,7 @@ func (handler *FetchHandler) remoteDeleteSector(w http.ResponseWriter, r *http.R
return
}

if err := handler.Local.Remove(r.Context(), id, ft, false, []ID{ID(r.FormValue("keep"))}); err != nil {
if err := handler.Local.Remove(r.Context(), id, ft, false, ParseIDList(r.FormValue("keep"))); err != nil {
log.Errorf("%+v", err)
w.WriteHeader(500)
return
Expand Down
22 changes: 22 additions & 0 deletions extern/sector-storage/stores/index.go
Expand Up @@ -7,6 +7,7 @@ import (
"net/url"
gopath "path"
"sort"
"strings"
"sync"
"time"

Expand All @@ -29,6 +30,27 @@ var SkippedHeartbeatThresh = HeartbeatInterval * 5
// filesystem, local or networked / shared by multiple machines
type ID string

const IDSep = "."

type IDList []ID

func (il IDList) String() string {
l := make([]string, len(il))
for i, id := range il {
l[i] = string(id)
}
return strings.Join(l, IDSep)
}

func ParseIDList(s string) IDList {
strs := strings.Split(s, IDSep)
out := make([]ID, len(strs))
for i, str := range strs {
out[i] = ID(str)
}
return out
}

type Group = string

type StorageInfo struct {
Expand Down
42 changes: 33 additions & 9 deletions extern/sector-storage/stores/remote.go
Expand Up @@ -44,12 +44,36 @@ type Remote struct {
pfHandler PartialFileHandler
}

func (r *Remote) RemoveCopies(ctx context.Context, s abi.SectorID, types storiface.SectorFileType) error {
// TODO: do this on remotes too
// (not that we really need to do that since it's always called by the
// worker which pulled the copy)
func (r *Remote) RemoveCopies(ctx context.Context, s abi.SectorID, typ storiface.SectorFileType) error {
if bits.OnesCount(uint(typ)) != 1 {
return xerrors.New("RemoveCopies expects one file type")
}

if err := r.local.RemoveCopies(ctx, s, typ); err != nil {
return xerrors.Errorf("removing local copies: %w", err)
}

si, err := r.index.StorageFindSector(ctx, s, typ, 0, false)
if err != nil {
return xerrors.Errorf("finding existing sector %d(t:%d) failed: %w", s, typ, err)
}

var hasPrimary bool
var keep []ID
for _, info := range si {
if info.Primary {
hasPrimary = true
keep = append(keep, info.ID)
break
}
}

if !hasPrimary {
log.Warnf("remote RemoveCopies: no primary copies of sector %v (%s), not removing anything", s, typ)
return nil
}

return r.local.RemoveCopies(ctx, s, types)
return r.Remove(ctx, s, typ, true, keep)
}

func NewRemote(local Store, index SectorIndex, auth http.Header, fetchLimit int, pfHandler PartialFileHandler) *Remote {
Expand Down Expand Up @@ -156,7 +180,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s storage.SectorRef, existin

if op == storiface.AcquireMove {
id := ID(storageID)
if err := r.deleteFromRemote(ctx, url, &id); err != nil {
if err := r.deleteFromRemote(ctx, url, []ID{id}); err != nil {
log.Warnf("deleting sector %v from %s (delete %s): %+v", s, storageID, url, err)
}
}
Expand Down Expand Up @@ -355,7 +379,7 @@ storeLoop:
}
}
for _, url := range info.URLs {
if err := r.deleteFromRemote(ctx, url, nil); err != nil {
if err := r.deleteFromRemote(ctx, url, keepIn); err != nil {
log.Warnf("remove %s: %+v", url, err)
continue
}
Expand All @@ -366,9 +390,9 @@ storeLoop:
return nil
}

func (r *Remote) deleteFromRemote(ctx context.Context, url string, keepIn *ID) error {
func (r *Remote) deleteFromRemote(ctx context.Context, url string, keepIn IDList) error {
if keepIn != nil {
url = url + "?keep=" + string(*keepIn)
url = url + "?keep=" + keepIn.String()
}

log.Infof("Delete %s", url)
Expand Down
15 changes: 14 additions & 1 deletion extern/sector-storage/worker_local.go
Expand Up @@ -516,7 +516,20 @@ func (l *LocalWorker) Remove(ctx context.Context, sector abi.SectorID) error {

func (l *LocalWorker) MoveStorage(ctx context.Context, sector storage.SectorRef, types storiface.SectorFileType) (storiface.CallID, error) {
return l.asyncCall(ctx, sector, MoveStorage, func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
return nil, l.storage.MoveStorage(ctx, sector, types)
if err := l.storage.MoveStorage(ctx, sector, types); err != nil {
return nil, xerrors.Errorf("move to storage: %w", err)
}

for _, fileType := range storiface.PathTypes {
if fileType&types == 0 {
continue
}

if err := l.storage.RemoveCopies(ctx, sector.ID, fileType); err != nil {
return nil, xerrors.Errorf("rm copies (t:%s, s:%v): %w", fileType, sector, err)
}
}
return nil, nil
})
}

Expand Down