From fb9a849762715fb6353af447e04784b29c40b01c Mon Sep 17 00:00:00 2001 From: PJ Date: Thu, 2 Nov 2023 21:50:50 +0100 Subject: [PATCH 01/12] autopilot: adjust hint if gouging prevented download --- api/worker.go | 4 ++ autopilot/alerts.go | 17 ++++++--- autopilot/migrator.go | 88 +++++++++++++++++++++++-------------------- worker/download.go | 7 +++- worker/rhpv2.go | 10 +++++ worker/rhpv3.go | 7 +++- 6 files changed, 83 insertions(+), 50 deletions(-) diff --git a/api/worker.go b/api/worker.go index 11c9ca59d..ab8cbe843 100644 --- a/api/worker.go +++ b/api/worker.go @@ -21,6 +21,10 @@ var ( // ErrContractSetNotSpecified is returned by the worker API by endpoints that // need a contract set to be able to upload data. ErrContractSetNotSpecified = errors.New("contract set is not specified") + + // ErrGougingPreventedDownload is returned by the worker API when a download + // failed because a critical number of hosts were price gouging. + ErrGougingPreventedDownload = errors.New("gouging settings prevented download from succeeding") ) type ( diff --git a/autopilot/alerts.go b/autopilot/alerts.go index 8561b7b30..cfba3dee7 100644 --- a/autopilot/alerts.go +++ b/autopilot/alerts.go @@ -28,8 +28,8 @@ func alertIDForContract(alertID [32]byte, contract api.ContractMetadata) types.H return types.HashBytes(append(alertID[:], contract.ID[:]...)) } -func alertIDForSlab(alertID [32]byte, slab object.Slab) types.Hash256 { - return types.HashBytes(append(alertID[:], []byte(slab.Key.String())...)) +func alertIDForSlab(alertID [32]byte, slabKey object.EncryptionKey) types.Hash256 { + return types.HashBytes(append(alertID[:], []byte(slabKey.String())...)) } func randomAlertID() types.Hash256 { @@ -135,21 +135,26 @@ func newOngoingMigrationsAlert(n int) alerts.Alert { } } -func newSlabMigrationFailedAlert(slab object.Slab, health float64, err error) alerts.Alert { +func newSlabMigrationFailedAlert(slabKey object.EncryptionKey, health float64, err error) alerts.Alert { severity := alerts.SeverityWarning if health < 0.5 { severity = alerts.SeverityCritical } + hint := "Migration failures can be temporary, but if they persist it can eventually lead to data loss and should therefor be taken very seriously." + if isErr(err, api.ErrGougingPreventedDownload) { + hint += " In this particular case, one or more hosts were considered to be price gouging. It might be necessary to adjust your price gouging settings." + } + return alerts.Alert{ - ID: alertIDForSlab(alertMigrationID, slab), + ID: alertIDForSlab(alertMigrationID, slabKey), Severity: severity, Message: "Slab migration failed", Data: map[string]interface{}{ "error": err, "health": health, - "slabKey": slab.Key.String(), - "hint": "Migration failures can be temporary, but if they persist it can eventually lead to data loss and should therefor be taken very seriously.", + "slabKey": slabKey.String(), + "hint": hint, }, Timestamp: time.Now(), } diff --git a/autopilot/migrator.go b/autopilot/migrator.go index 31b5c4135..a99dd27aa 100644 --- a/autopilot/migrator.go +++ b/autopilot/migrator.go @@ -2,6 +2,7 @@ package autopilot import ( "context" + "fmt" "math" "sort" "sync" @@ -17,16 +18,41 @@ const ( migratorBatchSize = math.MaxInt // TODO: change once we have a fix for the infinite loop ) -type migrator struct { - ap *Autopilot - logger *zap.SugaredLogger - healthCutoff float64 - parallelSlabsPerWorker uint64 - signalMaintenanceFinished chan struct{} +type ( + migrator struct { + ap *Autopilot + logger *zap.SugaredLogger + healthCutoff float64 + parallelSlabsPerWorker uint64 + signalMaintenanceFinished chan struct{} + + mu sync.Mutex + migrating bool + migratingLastStart time.Time + } + + job struct { + api.UnhealthySlab + slabIdx int + batchSize int + set string + + b Bus + } +) + +func (j *job) execute(ctx context.Context, w Worker) (_ api.MigrateSlabResponse, err error) { + slab, err := j.b.Slab(ctx, j.Key) + if err != nil { + return api.MigrateSlabResponse{}, fmt.Errorf("failed to fetch slab; %w", err) + } + + res, err := w.MigrateSlab(ctx, slab, j.set) + if err != nil { + return api.MigrateSlabResponse{}, fmt.Errorf("failed to migrate slab; %w", err) + } - mu sync.Mutex - migrating bool - migratingLastStart time.Time + return res, nil } func newMigrator(ap *Autopilot, healthCutoff float64, parallelSlabsPerWorker uint64) *migrator { @@ -75,16 +101,11 @@ func (m *migrator) tryPerformMigrations(ctx context.Context, wp *workerPool) { func (m *migrator) performMigrations(p *workerPool) { m.logger.Info("performing migrations") b := m.ap.bus + ctx, span := tracing.Tracer.Start(context.Background(), "migrator.performMigrations") defer span.End() // prepare a channel to push work to the workers - type job struct { - api.UnhealthySlab - slabIdx int - batchSize int - set string - } jobs := make(chan job) var wg sync.WaitGroup defer func() { @@ -100,32 +121,23 @@ func (m *migrator) performMigrations(p *workerPool) { go func(w Worker) { defer wg.Done() + // fetch worker id once id, err := w.ID(ctx) if err != nil { m.logger.Errorf("failed to fetch worker id: %v", err) return } + // process jobs for j := range jobs { - slab, err := b.Slab(ctx, j.Key) + res, err := j.execute(ctx, w) if err != nil { - m.logger.Errorf("%v: failed to fetch slab for migration %d/%d, health: %v, err: %v", id, j.slabIdx+1, j.batchSize, j.Health, err) - continue - } - ap, err := b.Autopilot(ctx, m.ap.id) - if err != nil { - m.logger.Errorf("%v: failed to fetch autopilot settings for migration %d/%d, health: %v, err: %v", id, j.slabIdx+1, j.batchSize, j.Health, err) - continue - } - res, err := w.MigrateSlab(ctx, slab, ap.Config.Contracts.Set) - if err != nil { - m.ap.RegisterAlert(ctx, newSlabMigrationFailedAlert(slab, j.Health, err)) - m.logger.Errorf("%v: failed to migrate slab %d/%d, health: %v, err: %v", id, j.slabIdx+1, j.batchSize, j.Health, err) - continue + m.logger.Errorf("%v: migration %d/%d failed, key: %v, health: %v, err: failed to fetch autopilot settings; %v", id, j.slabIdx+1, j.batchSize, j.Key, j.Health, err) + m.ap.RegisterAlert(ctx, newSlabMigrationFailedAlert(j.Key, j.Health, err)) } else { - m.ap.DismissAlert(ctx, alertIDForSlab(alertMigrationID, slab)) + m.logger.Infof("%v: migration %d/%d succeeded, key: %v, health: %v, shards migrated: %v", id, j.slabIdx+1, j.batchSize, j.Key, j.Health, res.NumShardsMigrated) + m.ap.DismissAlert(ctx, alertIDForSlab(alertMigrationID, j.Key)) } - m.logger.Debugf("%v: successfully migrated slab (health: %v migrated shards: %d) %d/%d", id, j.Health, res.NumShardsMigrated, j.slabIdx+1, j.batchSize) } }(w) } @@ -151,10 +163,7 @@ OUTER: // recompute health. start := time.Now() if err := b.RefreshHealth(ctx); err != nil { - rerr := m.ap.alerts.RegisterAlert(ctx, newRefreshHealthFailedAlert(err)) - if rerr != nil { - m.logger.Errorf("failed to register alert: err %v", rerr) - } + m.ap.RegisterAlert(ctx, newRefreshHealthFailedAlert(err)) m.logger.Errorf("failed to recompute cached health before migration", err) return } @@ -193,7 +202,7 @@ OUTER: toMigrate = append(toMigrate, *slab) } - // sort the newsly added slabs by health + // sort the newly added slabs by health newSlabs := toMigrate[len(toMigrate)-len(migrateNewMap):] sort.Slice(newSlabs, func(i, j int) bool { return newSlabs[i].Health < newSlabs[j].Health @@ -205,10 +214,7 @@ OUTER: // register an alert to notify users about ongoing migrations. if len(toMigrate) > 0 { - err = m.ap.alerts.RegisterAlert(ctx, newOngoingMigrationsAlert(len(toMigrate))) - if err != nil { - m.logger.Errorf("failed to register alert: err %v", err) - } + m.ap.RegisterAlert(ctx, newOngoingMigrationsAlert(len(toMigrate))) } // return if there are no slabs to migrate @@ -223,7 +229,7 @@ OUTER: case <-m.signalMaintenanceFinished: m.logger.Info("migrations interrupted - updating slabs for migration") continue OUTER - case jobs <- job{slab, i, len(toMigrate), set}: + case jobs <- job{slab, i, len(toMigrate), set, b}: } } } diff --git a/worker/download.go b/worker/download.go index d24b03a07..3ac4669e6 100644 --- a/worker/download.go +++ b/worker/download.go @@ -1093,7 +1093,12 @@ func (s *slabDownload) finish() ([][]byte, error) { unused++ } } - return nil, fmt.Errorf("failed to download slab: completed=%d, inflight=%d, launched=%d downloaders=%d unused=%d errors=%d %w", s.numCompleted, s.numInflight, s.numLaunched, s.mgr.numDownloaders(), unused, len(s.errs), s.errs) + + err := fmt.Errorf("failed to download slab: completed=%d, inflight=%d, launched=%d downloaders=%d unused=%d errors=%d %v", s.numCompleted, s.numInflight, s.numLaunched, s.mgr.numDownloaders(), unused, len(s.errs), s.errs) + if s.numCompleted+s.errs.NumGouging() >= s.minShards { + err = fmt.Errorf("%w; %v", api.ErrGougingPreventedDownload, err) + } + return nil, err } return s.sectors, nil } diff --git a/worker/rhpv2.go b/worker/rhpv2.go index 65356ca58..b4ef0414d 100644 --- a/worker/rhpv2.go +++ b/worker/rhpv2.go @@ -69,6 +69,16 @@ func (he HostError) Unwrap() error { // A HostErrorSet is a collection of errors from various hosts. type HostErrorSet []*HostError +// NumGouging returns numbers of host that errored out due to price gouging. +func (hes HostErrorSet) NumGouging() (n int) { + for _, he := range hes { + if errors.Is(he.Err, errPriceTableGouging) { + n++ + } + } + return +} + // Error implements error. func (hes HostErrorSet) Error() string { strs := make([]string, len(hes)) diff --git a/worker/rhpv3.go b/worker/rhpv3.go index f20c40d19..3b04386e5 100644 --- a/worker/rhpv3.go +++ b/worker/rhpv3.go @@ -65,6 +65,9 @@ var ( // valid. errPriceTableExpired = errors.New("price table requested is expired") + // errPriceTableGouging is returned when the host is price gouging. + errPriceTableGouging = errors.New("host price table gouging") + // errPriceTableNotFound is returned by the host when it can not find a // price table that corresponds with the id we sent it. errPriceTableNotFound = errors.New("price table not found") @@ -85,7 +88,6 @@ func isClosedStream(err error) bool { return isError(err, mux.ErrClosedStream) || isError(err, net.ErrClosed) } func isInsufficientFunds(err error) bool { return isError(err, ErrInsufficientFunds) } -func isMaxRevisionReached(err error) bool { return isError(err, errMaxRevisionReached) } func isPriceTableExpired(err error) bool { return isError(err, errPriceTableExpired) } func isPriceTableNotFound(err error) bool { return isError(err, errPriceTableNotFound) } func isSectorNotFound(err error) bool { @@ -593,7 +595,7 @@ func (h *host) priceTable(ctx context.Context, rev *types.FileContractRevision) return rhpv3.HostPriceTable{}, err } if breakdown := gc.Check(nil, &pt.HostPriceTable); breakdown.Gouging() { - return rhpv3.HostPriceTable{}, fmt.Errorf("host price table gouging: %v", breakdown) + return rhpv3.HostPriceTable{}, fmt.Errorf("%w: %v", errPriceTableGouging, breakdown) } return pt.HostPriceTable, nil } @@ -603,6 +605,7 @@ func (h *host) DownloadSector(ctx context.Context, w io.Writer, root types.Hash2 if err != nil { return err } + // return errBalanceInsufficient if balance insufficient defer func() { if isBalanceInsufficient(err) { From cb16ec1703b3cef6b56e80196873aa02fd5839bb Mon Sep 17 00:00:00 2001 From: PJ Date: Fri, 3 Nov 2023 14:36:20 +0100 Subject: [PATCH 02/12] worker: overpay (migration) downloads on low health slabs --- api/autopilot.go | 4 ++ api/settings.go | 6 ++ api/worker.go | 4 -- autopilot/alerts.go | 7 +- worker/download.go | 155 +++++++++++++++++++++++++++++--------------- worker/gouging.go | 15 +++-- worker/rhpv2.go | 10 +++ worker/rhpv3.go | 23 +++++-- worker/worker.go | 4 +- 9 files changed, 154 insertions(+), 74 deletions(-) diff --git a/api/autopilot.go b/api/autopilot.go index 3c4ceb688..784ba536b 100644 --- a/api/autopilot.go +++ b/api/autopilot.go @@ -139,6 +139,10 @@ func (sb HostScoreBreakdown) String() string { return fmt.Sprintf("Age: %v, Col: %v, Int: %v, SR: %v, UT: %v, V: %v, Pr: %v", sb.Age, sb.Collateral, sb.Interactions, sb.StorageRemaining, sb.Uptime, sb.Version, sb.Prices) } +func (hgb HostGougingBreakdown) DownloadGouging() bool { + return hgb.V3.DownloadErr != "" +} + func (hgb HostGougingBreakdown) Gouging() bool { return hgb.V2.Gouging() || hgb.V3.Gouging() } diff --git a/api/settings.go b/api/settings.go index 16b2ec938..50230d7d8 100644 --- a/api/settings.go +++ b/api/settings.go @@ -64,6 +64,12 @@ type ( // MinMaxEphemeralAccountBalance is the minimum accepted value for // `MaxEphemeralAccountBalance` in the host's price settings. MinMaxEphemeralAccountBalance types.Currency `json:"minMaxEphemeralAccountBalance"` + + // MigrationSurchargeMultiplier is the multiplier applied to the + // 'MaxDownloadPrice' when checking whether a host is too expensive, + // this multiplier is only applied for when trying to migrate critically + // low-health slabs. + MigrationSurchargeMultiplier uint64 `json:"migrationSurchargeMultiplier"` } // RedundancySettings contain settings that dictate an object's redundancy. diff --git a/api/worker.go b/api/worker.go index ab8cbe843..11c9ca59d 100644 --- a/api/worker.go +++ b/api/worker.go @@ -21,10 +21,6 @@ var ( // ErrContractSetNotSpecified is returned by the worker API by endpoints that // need a contract set to be able to upload data. ErrContractSetNotSpecified = errors.New("contract set is not specified") - - // ErrGougingPreventedDownload is returned by the worker API when a download - // failed because a critical number of hosts were price gouging. - ErrGougingPreventedDownload = errors.New("gouging settings prevented download from succeeding") ) type ( diff --git a/autopilot/alerts.go b/autopilot/alerts.go index cfba3dee7..1bfb97ed7 100644 --- a/autopilot/alerts.go +++ b/autopilot/alerts.go @@ -141,11 +141,6 @@ func newSlabMigrationFailedAlert(slabKey object.EncryptionKey, health float64, e severity = alerts.SeverityCritical } - hint := "Migration failures can be temporary, but if they persist it can eventually lead to data loss and should therefor be taken very seriously." - if isErr(err, api.ErrGougingPreventedDownload) { - hint += " In this particular case, one or more hosts were considered to be price gouging. It might be necessary to adjust your price gouging settings." - } - return alerts.Alert{ ID: alertIDForSlab(alertMigrationID, slabKey), Severity: severity, @@ -154,7 +149,7 @@ func newSlabMigrationFailedAlert(slabKey object.EncryptionKey, health float64, e "error": err, "health": health, "slabKey": slabKey.String(), - "hint": hint, + "hint": "Migration failures can be temporary, but if they persist it can eventually lead to data loss and should therefor be taken very seriously.", }, Timestamp: time.Now(), } diff --git a/worker/download.go b/worker/download.go index 3ac4669e6..9fea6be89 100644 --- a/worker/download.go +++ b/worker/download.go @@ -23,9 +23,10 @@ import ( ) const ( - downloadOverheadB = 284 - maxConcurrentSectorsPerHost = 3 - maxConcurrentSlabsPerDownload = 3 + downloadOverheadB = 284 + downloadOverpayHealthThreshold = 0.25 + maxConcurrentSectorsPerHost = 3 + maxConcurrentSlabsPerDownload = 3 ) type ( @@ -87,6 +88,7 @@ type ( minShards int length uint32 offset uint32 + overpay bool mu sync.Mutex lastOverdrive time.Time @@ -94,6 +96,8 @@ type ( numInflight uint64 numLaunched uint64 numOverdriving uint64 + numOverpaid uint64 + numRelaunched uint64 curr types.PublicKey hostToSectors map[types.PublicKey][]sectorInfo @@ -104,9 +108,10 @@ type ( } slabDownloadResponse struct { - shards [][]byte - index int - err error + overpaid bool + shards [][]byte + index int + err error } sectorDownloadReq struct { @@ -117,18 +122,16 @@ type ( root types.Hash256 hk types.PublicKey + overpay bool overdrive bool sectorIndex int resps *sectorResponses } sectorDownloadResp struct { - overdrive bool - hk types.PublicKey - root types.Hash256 - sectorIndex int - sector []byte - err error + req *sectorDownloadReq + sector []byte + err error } sectorResponses struct { @@ -306,7 +309,7 @@ func (mgr *downloadManager) DownloadObject(ctx context.Context, w io.Writer, o o // launch the download wg.Add(1) go func(index int) { - mgr.downloadSlab(ctx, id, next.SlabSlice, index, responseChan, nextSlabChan) + mgr.downloadSlab(ctx, id, next.SlabSlice, index, false, responseChan, nextSlabChan) wg.Done() }(slabIndex) atomic.AddUint64(&concurrentSlabs, 1) @@ -422,7 +425,7 @@ func (mgr *downloadManager) DownloadSlab(ctx context.Context, slab object.Slab, Length: uint32(slab.MinShards) * rhpv2.SectorSize, } go func() { - mgr.downloadSlab(ctx, id, slice, 0, responseChan, nextSlabChan) + mgr.downloadSlab(ctx, id, slice, 0, true, responseChan, nextSlabChan) // NOTE: when downloading 1 slab we can simply close both channels close(responseChan) close(nextSlabChan) @@ -530,7 +533,7 @@ func (mgr *downloadManager) refreshDownloaders(contracts []api.ContractMetadata) } } -func (mgr *downloadManager) newSlabDownload(ctx context.Context, dID id, slice object.SlabSlice, slabIndex int) *slabDownload { +func (mgr *downloadManager) newSlabDownload(ctx context.Context, dID id, slice object.SlabSlice, slabIndex int, migration bool) *slabDownload { // create slab id var sID slabID frand.Read(sID[:]) @@ -556,6 +559,7 @@ func (mgr *downloadManager) newSlabDownload(ctx context.Context, dID id, slice o minShards: int(slice.MinShards), offset: offset, length: length, + overpay: migration && slice.Health <= downloadOverpayHealthThreshold, hostToSectors: hostToSectors, used: make(map[types.PublicKey]struct{}), @@ -564,17 +568,17 @@ func (mgr *downloadManager) newSlabDownload(ctx context.Context, dID id, slice o } } -func (mgr *downloadManager) downloadSlab(ctx context.Context, dID id, slice object.SlabSlice, index int, responseChan chan *slabDownloadResponse, nextSlabChan chan struct{}) { +func (mgr *downloadManager) downloadSlab(ctx context.Context, dID id, slice object.SlabSlice, index int, migration bool, responseChan chan *slabDownloadResponse, nextSlabChan chan struct{}) { // add tracing ctx, span := tracing.Tracer.Start(ctx, "downloadSlab") defer span.End() // prepare the slab download - slab := mgr.newSlabDownload(ctx, dID, slice, index) + slab := mgr.newSlabDownload(ctx, dID, slice, index, migration) // download shards resp := &slabDownloadResponse{index: index} - resp.shards, resp.err = slab.downloadShards(ctx, nextSlabChan) + resp.shards, resp.overpaid, resp.err = slab.downloadShards(ctx, nextSlabChan) // send the response select { @@ -816,7 +820,7 @@ func (d *downloader) execute(req *sectorDownloadReq) (err error) { // download the sector buf := bytes.NewBuffer(make([]byte, 0, req.length)) - err = d.host.DownloadSector(req.ctx, buf, req.root, req.offset, req.length) + err = d.host.DownloadSector(req.ctx, buf, req.root, req.offset, req.length, req.overpay) if err != nil { req.fail(err) return err @@ -832,20 +836,15 @@ func (d *downloader) execute(req *sectorDownloadReq) (err error) { func (req *sectorDownloadReq) succeed(sector []byte) { req.resps.Add(§orDownloadResp{ - hk: req.hk, - root: req.root, - overdrive: req.overdrive, - sectorIndex: req.sectorIndex, - sector: sector, + req: req, + sector: sector, }) } func (req *sectorDownloadReq) fail(err error) { req.resps.Add(§orDownloadResp{ - err: err, - hk: req.hk, - root: req.root, - overdrive: req.overdrive, + req: req, + err: err, }) } @@ -970,13 +969,19 @@ func (s *slabDownload) nextRequest(ctx context.Context, resps *sectorResponses, root: sector.Root, hk: sector.Host, + // overpay is set to 'true' when a request is retried after the slab + // download failed, and we realise that it might have succeeded if we + // allowed overpaying for this download, we only do this for migrations + // and when the slab is below a certain health threshold + overpay: false, + overdrive: overdrive, sectorIndex: sector.index, resps: resps, } } -func (s *slabDownload) downloadShards(ctx context.Context, nextSlabChan chan struct{}) ([][]byte, error) { +func (s *slabDownload) downloadShards(ctx context.Context, nextSlabChan chan struct{}) ([][]byte, bool, error) { // cancel any sector downloads once the download is done ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -998,22 +1003,28 @@ func (s *slabDownload) downloadShards(ctx context.Context, nextSlabChan chan str for i := 0; i < int(s.minShards); { req := s.nextRequest(ctx, resps, false) if req == nil { - return nil, fmt.Errorf("no hosts available") + return nil, false, fmt.Errorf("no hosts available") } else if err := s.launch(req); err == nil { i++ } } + // collect requests that failed due to gouging + var gouging []*sectorDownloadReq + // collect responses var done bool var next bool + var lastResort bool var triggered bool + +loop: for s.inflight() > 0 && !done { select { case <-s.mgr.stopChan: - return nil, errors.New("download stopped") + return nil, false, errors.New("download stopped") case <-ctx.Done(): - return nil, ctx.Err() + return nil, false, ctx.Err() case <-resps.c: resetOverdrive() } @@ -1024,15 +1035,14 @@ func (s *slabDownload) downloadShards(ctx context.Context, nextSlabChan chan str break } - if isSectorNotFound(resp.err) { - if err := s.slm.DeleteHostSector(ctx, resp.hk, resp.root); err != nil { - s.mgr.logger.Errorw("failed to mark sector as lost", "hk", resp.hk, "root", resp.root, "err", err) - } else { - s.mgr.logger.Infow("successfully marked sector as lost", "hk", resp.hk, "root", resp.root) - } + done, next = s.receive(*resp) + + // only receive responses in last-resort mode + if lastResort { + continue } - done, next = s.receive(*resp) + // launch overdrive requests on failure if !done && resp.err != nil { for { if req := s.nextRequest(ctx, resps, true); req != nil { @@ -1043,6 +1053,8 @@ func (s *slabDownload) downloadShards(ctx context.Context, nextSlabChan chan str break } } + + // trigger next slab download if next && !triggered { select { case <-nextSlabChan: @@ -1050,7 +1062,29 @@ func (s *slabDownload) downloadShards(ctx context.Context, nextSlabChan chan str default: } } + + // handle lost sectors + if isSectorNotFound(resp.err) { + if err := s.slm.DeleteHostSector(ctx, resp.req.hk, resp.req.root); err != nil { + s.mgr.logger.Errorw("failed to mark sector as lost", "hk", resp.req.hk, "root", resp.req.root, "err", err) + } else { + s.mgr.logger.Infow("successfully marked sector as lost", "hk", resp.req.hk, "root", resp.req.root) + } + } else if isPriceTableGouging(resp.err) && s.overpay { + resp.req.overpay = true + gouging = append(gouging, resp.req) + } + } + } + + if !done && !lastResort && len(gouging) >= s.missing() { + for _, req := range gouging { + if err := s.launch(req); err == nil { + s.errs.Remove(req.hk) + } } + lastResort = true + goto loop } // track stats @@ -1063,7 +1097,7 @@ func (s *slabDownload) overdrivePct() float64 { s.mu.Lock() defer s.mu.Unlock() - numOverdrive := int(s.numLaunched) - s.minShards + numOverdrive := (int(s.numLaunched) + int(s.numRelaunched)) - s.minShards if numOverdrive < 0 { numOverdrive = 0 } @@ -1083,7 +1117,7 @@ func (s *slabDownload) downloadSpeed() int64 { return int64(bytes) / ms } -func (s *slabDownload) finish() ([][]byte, error) { +func (s *slabDownload) finish() ([][]byte, bool, error) { s.mu.Lock() defer s.mu.Unlock() if s.numCompleted < s.minShards { @@ -1094,13 +1128,18 @@ func (s *slabDownload) finish() ([][]byte, error) { } } - err := fmt.Errorf("failed to download slab: completed=%d, inflight=%d, launched=%d downloaders=%d unused=%d errors=%d %v", s.numCompleted, s.numInflight, s.numLaunched, s.mgr.numDownloaders(), unused, len(s.errs), s.errs) - if s.numCompleted+s.errs.NumGouging() >= s.minShards { - err = fmt.Errorf("%w; %v", api.ErrGougingPreventedDownload, err) - } - return nil, err + return nil, s.numOverpaid > 0, fmt.Errorf("failed to download slab: completed=%d, inflight=%d, launched=%d, relaunched=%d, overpaid=%d, downloaders=%d unused=%d errors=%d %v", s.numCompleted, s.numInflight, s.numLaunched, s.numRelaunched, s.numOverpaid, s.mgr.numDownloaders(), unused, len(s.errs), s.errs) + } + return s.sectors, s.numOverpaid > 0, nil +} + +func (s *slabDownload) missing() int { + s.mu.Lock() + defer s.mu.Unlock() + if s.numCompleted < s.minShards { + return s.minShards - s.numCompleted } - return s.sectors, nil + return 0 } func (s *slabDownload) inflight() uint64 { @@ -1118,6 +1157,11 @@ func (s *slabDownload) launch(req *sectorDownloadReq) error { return errors.New("no request given") } + // check for completed sector + if len(s.sectors[req.sectorIndex]) > 0 { + return errors.New("sector already downloaded") + } + // launch the req err := s.mgr.launch(req) if err != nil { @@ -1129,10 +1173,14 @@ func (s *slabDownload) launch(req *sectorDownloadReq) error { // update the state s.numInflight++ - s.numLaunched++ if req.overdrive { s.numOverdriving++ } + if req.overpay { + s.numRelaunched++ + } else { + s.numLaunched++ + } return nil } @@ -1141,19 +1189,24 @@ func (s *slabDownload) receive(resp sectorDownloadResp) (finished bool, next boo defer s.mu.Unlock() // update num overdriving - if resp.overdrive { + if resp.req.overdrive { s.numOverdriving-- } // failed reqs can't complete the upload s.numInflight-- if resp.err != nil { - s.errs = append(s.errs, &HostError{resp.hk, resp.err}) + s.errs = append(s.errs, &HostError{resp.req.hk, resp.err}) return false, false } + // update num overpaid + if resp.req.overpay { + s.numOverpaid++ + } + // store the sector - s.sectors[resp.sectorIndex] = resp.sector + s.sectors[resp.req.sectorIndex] = resp.sector s.numCompleted++ return s.numCompleted >= s.minShards, s.numCompleted+int(s.mgr.maxOverdrive) >= s.minShards diff --git a/worker/gouging.go b/worker/gouging.go index 92c8d7329..d73fb71a8 100644 --- a/worker/gouging.go +++ b/worker/gouging.go @@ -33,7 +33,7 @@ const ( type ( GougingChecker interface { - Check(*rhpv2.HostSettings, *rhpv3.HostPriceTable) api.HostGougingBreakdown + Check(_ *rhpv2.HostSettings, _ *rhpv3.HostPriceTable) api.HostGougingBreakdown } gougingChecker struct { @@ -50,20 +50,25 @@ type ( var _ GougingChecker = gougingChecker{} -func GougingCheckerFromContext(ctx context.Context) (GougingChecker, error) { - gc, ok := ctx.Value(keyGougingChecker).(func() (GougingChecker, error)) +func GougingCheckerFromContext(ctx context.Context, applyMigrationSurchargeMultiplier bool) (GougingChecker, error) { + gc, ok := ctx.Value(keyGougingChecker).(func(applyMigrationSurchargeMultiplier bool) (GougingChecker, error)) if !ok { panic("no gouging checker attached to the context") // developer error } - return gc() + return gc(applyMigrationSurchargeMultiplier) } func WithGougingChecker(ctx context.Context, cs consensusState, gp api.GougingParams) context.Context { - return context.WithValue(ctx, keyGougingChecker, func() (GougingChecker, error) { + return context.WithValue(ctx, keyGougingChecker, func(applyMigrationSurchargeMultiplier bool) (GougingChecker, error) { consensusState, err := cs.ConsensusState(ctx) if err != nil { return gougingChecker{}, fmt.Errorf("failed to get consensus state: %w", err) } + + if applyMigrationSurchargeMultiplier && gp.GougingSettings.MigrationSurchargeMultiplier > 0 { + gp.GougingSettings.MaxDownloadPrice = gp.GougingSettings.MaxDownloadPrice.Mul64(gp.GougingSettings.MigrationSurchargeMultiplier) + } + return gougingChecker{ consensusState: consensusState, settings: gp.GougingSettings, diff --git a/worker/rhpv2.go b/worker/rhpv2.go index b4ef0414d..05b368323 100644 --- a/worker/rhpv2.go +++ b/worker/rhpv2.go @@ -79,6 +79,16 @@ func (hes HostErrorSet) NumGouging() (n int) { return } +// Remove removes all errors for the given host. +func (hes HostErrorSet) Remove(hk types.PublicKey) { + for i := 0; i < len(hes); i++ { + if hes[i].HostKey == hk { + hes = append(hes[:i], hes[i+1:]...) + i-- + } + } +} + // Error implements error. func (hes HostErrorSet) Error() string { strs := make([]string, len(hes)) diff --git a/worker/rhpv3.go b/worker/rhpv3.go index 3b04386e5..4f7d2ff08 100644 --- a/worker/rhpv3.go +++ b/worker/rhpv3.go @@ -89,6 +89,7 @@ func isClosedStream(err error) bool { } func isInsufficientFunds(err error) bool { return isError(err, ErrInsufficientFunds) } func isPriceTableExpired(err error) bool { return isError(err, errPriceTableExpired) } +func isPriceTableGouging(err error) bool { return isError(err, errPriceTableGouging) } func isPriceTableNotFound(err error) bool { return isError(err, errPriceTableNotFound) } func isSectorNotFound(err error) bool { return isError(err, errSectorNotFound) || isError(err, errSectorNotFoundOld) @@ -590,7 +591,7 @@ func (h *host) priceTable(ctx context.Context, rev *types.FileContractRevision) if err != nil { return rhpv3.HostPriceTable{}, err } - gc, err := GougingCheckerFromContext(ctx) + gc, err := GougingCheckerFromContext(ctx, false) if err != nil { return rhpv3.HostPriceTable{}, err } @@ -600,11 +601,21 @@ func (h *host) priceTable(ctx context.Context, rev *types.FileContractRevision) return pt.HostPriceTable, nil } -func (h *host) DownloadSector(ctx context.Context, w io.Writer, root types.Hash256, offset, length uint32) (err error) { - pt, err := h.priceTable(ctx, nil) +func (h *host) DownloadSector(ctx context.Context, w io.Writer, root types.Hash256, offset, length uint32, overpay bool) (err error) { + pt, err := h.priceTables.fetch(ctx, h.HostKey(), nil) + if err != nil { + return err + } + hpt := pt.HostPriceTable + + // check for download gouging specifically + gc, err := GougingCheckerFromContext(ctx, overpay) if err != nil { return err } + if breakdown := gc.Check(nil, &hpt); breakdown.DownloadGouging() { + return fmt.Errorf("%w: %v", errPriceTableGouging, breakdown) + } // return errBalanceInsufficient if balance insufficient defer func() { @@ -615,14 +626,14 @@ func (h *host) DownloadSector(ctx context.Context, w io.Writer, root types.Hash2 return h.acc.WithWithdrawal(ctx, func() (amount types.Currency, err error) { err = h.transportPool.withTransportV3(ctx, h.HostKey(), h.siamuxAddr, func(ctx context.Context, t *transportV3) error { - cost, err := readSectorCost(pt, uint64(length)) + cost, err := readSectorCost(hpt, uint64(length)) if err != nil { return err } var refund types.Currency payment := rhpv3.PayByEphemeralAccount(h.acc.id, cost, pt.HostBlockHeight+defaultWithdrawalExpiryBlocks, h.accountKey) - cost, refund, err = RPCReadSector(ctx, t, w, pt, &payment, offset, length, root) + cost, refund, err = RPCReadSector(ctx, t, w, hpt, &payment, offset, length, root) amount = cost.Sub(refund) return err }) @@ -1368,7 +1379,7 @@ func RPCRenew(ctx context.Context, rrr api.RHPRenewRequest, bus Bus, t *transpor } // Perform gouging checks. - gc, err := GougingCheckerFromContext(ctx) + gc, err := GougingCheckerFromContext(ctx, false) if err != nil { return rhpv2.ContractRevision{}, nil, fmt.Errorf("failed to get gouging checker: %w", err) } diff --git a/worker/worker.go b/worker/worker.go index d73c6f1d3..9df665329 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -215,7 +215,7 @@ type hostV2 interface { type hostV3 interface { hostV2 - DownloadSector(ctx context.Context, w io.Writer, root types.Hash256, offset, length uint32) error + DownloadSector(ctx context.Context, w io.Writer, root types.Hash256, offset, length uint32, overpay bool) error FetchPriceTable(ctx context.Context, rev *types.FileContractRevision) (hpt hostdb.HostPriceTable, err error) FetchRevision(ctx context.Context, fetchTimeout time.Duration, blockHeight uint64) (types.FileContractRevision, error) FundAccount(ctx context.Context, balance types.Currency, rev *types.FileContractRevision) error @@ -523,7 +523,7 @@ func (w *worker) rhpFormHandler(jc jape.Context) { // just used it to dial the host we know it's valid hostSettings.NetAddress = hostIP - gc, err := GougingCheckerFromContext(ctx) + gc, err := GougingCheckerFromContext(ctx, false) if err != nil { return err } From 9227fce942497314c9e419082abba6c830581e50 Mon Sep 17 00:00:00 2001 From: PJ Date: Tue, 7 Nov 2023 22:07:15 +0100 Subject: [PATCH 03/12] worker: refactor download --- stores/metadata.go | 3 +- worker/download.go | 172 +++++++++++++++++++++++---------------------- 2 files changed, 91 insertions(+), 84 deletions(-) diff --git a/stores/metadata.go b/stores/metadata.go index 5e6a6f63a..85834fbab 100644 --- a/stores/metadata.go +++ b/stores/metadata.go @@ -1,6 +1,7 @@ package stores import ( + "bytes" "context" "errors" "fmt" @@ -1580,7 +1581,7 @@ func (ss *SQLStore) UpdateSlab(ctx context.Context, s object.Slab, contractSet s // make sure the roots stay the same. for i, shard := range s.Shards { - if shard.Root != types.Hash256(slab.Shards[i].Root) { + if bytes.Equal(shard.Root[:], slab.Shards[i].Root) { return fmt.Errorf("%w: shard %v has changed root from %v to %v", errShardRootChanged, i, slab.Shards[i].Root, shard.Root) } } diff --git a/worker/download.go b/worker/download.go index 9fea6be89..8c9b6ac8b 100644 --- a/worker/download.go +++ b/worker/download.go @@ -81,14 +81,16 @@ type ( mgr *downloadManager slm sectorLostMarker - dID id - sID slabID - created time.Time index int minShards int - length uint32 offset uint32 - overpay bool + length uint32 + + nextSlabChan chan struct{} + nextSlabTriggered bool + + created time.Time + overpay bool mu sync.Mutex lastOverdrive time.Time @@ -533,11 +535,7 @@ func (mgr *downloadManager) refreshDownloaders(contracts []api.ContractMetadata) } } -func (mgr *downloadManager) newSlabDownload(ctx context.Context, dID id, slice object.SlabSlice, slabIndex int, migration bool) *slabDownload { - // create slab id - var sID slabID - frand.Read(sID[:]) - +func (mgr *downloadManager) newSlabDownload(ctx context.Context, dID id, slice object.SlabSlice, slabIndex int, migration bool, nextSlabChan chan struct{}) *slabDownload { // calculate the offset and length offset, length := slice.SectorRegion() @@ -552,14 +550,15 @@ func (mgr *downloadManager) newSlabDownload(ctx context.Context, dID id, slice o mgr: mgr, slm: mgr.slm, - dID: dID, - sID: sID, - created: time.Now(), + nextSlabChan: nextSlabChan, + index: slabIndex, minShards: int(slice.MinShards), offset: offset, length: length, - overpay: migration && slice.Health <= downloadOverpayHealthThreshold, + + created: time.Now(), + overpay: migration && slice.Health <= downloadOverpayHealthThreshold, hostToSectors: hostToSectors, used: make(map[types.PublicKey]struct{}), @@ -573,17 +572,22 @@ func (mgr *downloadManager) downloadSlab(ctx context.Context, dID id, slice obje ctx, span := tracing.Tracer.Start(ctx, "downloadSlab") defer span.End() - // prepare the slab download - slab := mgr.newSlabDownload(ctx, dID, slice, index, migration) - - // download shards - resp := &slabDownloadResponse{index: index} - resp.shards, resp.overpaid, resp.err = slab.downloadShards(ctx, nextSlabChan) + // prepare new download + slab := mgr.newSlabDownload(ctx, dID, slice, index, migration, nextSlabChan) - // send the response - select { - case <-ctx.Done(): - case responseChan <- resp: + // start downloading + shards, overpaid, err := slab.download(ctx) + if err != nil { + responseChan <- &slabDownloadResponse{ + index: index, + err: err, + } + } else { + responseChan <- &slabDownloadResponse{ + index: index, + shards: shards, + overpaid: overpaid, + } } } @@ -981,13 +985,13 @@ func (s *slabDownload) nextRequest(ctx context.Context, resps *sectorResponses, } } -func (s *slabDownload) downloadShards(ctx context.Context, nextSlabChan chan struct{}) ([][]byte, bool, error) { +func (s *slabDownload) download(ctx context.Context) ([][]byte, bool, error) { // cancel any sector downloads once the download is done ctx, cancel := context.WithCancel(ctx) defer cancel() // add tracing - ctx, span := tracing.Tracer.Start(ctx, "downloadShards") + ctx, span := tracing.Tracer.Start(ctx, "download") defer span.End() // create the responses queue @@ -1013,13 +1017,10 @@ func (s *slabDownload) downloadShards(ctx context.Context, nextSlabChan chan str var gouging []*sectorDownloadReq // collect responses - var done bool - var next bool var lastResort bool - var triggered bool loop: - for s.inflight() > 0 && !done { + for s.ongoing() { select { case <-s.mgr.stopChan: return nil, false, errors.New("download stopped") @@ -1029,61 +1030,46 @@ loop: resetOverdrive() } - for { - resp := resps.Next() - if resp == nil { - break - } - - done, next = s.receive(*resp) - - // only receive responses in last-resort mode - if lastResort { - continue + resps.Foreach(func(resp *sectorDownloadResp) { + if done := s.receive(*resp); done { + return } - // launch overdrive requests on failure - if !done && resp.err != nil { - for { - if req := s.nextRequest(ctx, resps, true); req != nil { - if err := s.launch(req); err != nil { - continue // try the next request if this fails to launch + if resp.err != nil { + // launch overdrive requests + if !lastResort { + for { + if req := s.nextRequest(ctx, resps, true); req != nil { + if err := s.launch(req); err != nil { + continue // try the next request if this fails to launch + } } + break } - break } - } - - // trigger next slab download - if next && !triggered { - select { - case <-nextSlabChan: - triggered = true - default: - } - } - // handle lost sectors - if isSectorNotFound(resp.err) { - if err := s.slm.DeleteHostSector(ctx, resp.req.hk, resp.req.root); err != nil { - s.mgr.logger.Errorw("failed to mark sector as lost", "hk", resp.req.hk, "root", resp.req.root, "err", err) - } else { - s.mgr.logger.Infow("successfully marked sector as lost", "hk", resp.req.hk, "root", resp.req.root) + // handle lost sectors + if isSectorNotFound(resp.err) { + if err := s.slm.DeleteHostSector(ctx, resp.req.hk, resp.req.root); err != nil { + s.mgr.logger.Errorw("failed to mark sector as lost", "hk", resp.req.hk, "root", resp.req.root, "err", err) + } else { + s.mgr.logger.Infow("successfully marked sector as lost", "hk", resp.req.hk, "root", resp.req.root) + } + } else if isPriceTableGouging(resp.err) && s.overpay { + resp.req.overpay = true + gouging = append(gouging, resp.req) } - } else if isPriceTableGouging(resp.err) && s.overpay { - resp.req.overpay = true - gouging = append(gouging, resp.req) } - } + }) } - if !done && !lastResort && len(gouging) >= s.missing() { + if !lastResort && !s.completed() && len(gouging) >= s.missing() { + lastResort = true for _, req := range gouging { if err := s.launch(req); err == nil { s.errs.Remove(req.hk) } } - lastResort = true goto loop } @@ -1142,10 +1128,16 @@ func (s *slabDownload) missing() int { return 0 } -func (s *slabDownload) inflight() uint64 { +func (s *slabDownload) completed() bool { + s.mu.Lock() + defer s.mu.Unlock() + return s.numCompleted >= s.minShards +} + +func (s *slabDownload) ongoing() bool { s.mu.Lock() defer s.mu.Unlock() - return s.numInflight + return s.numCompleted < s.minShards || s.numInflight > 0 } func (s *slabDownload) launch(req *sectorDownloadReq) error { @@ -1184,7 +1176,7 @@ func (s *slabDownload) launch(req *sectorDownloadReq) error { return nil } -func (s *slabDownload) receive(resp sectorDownloadResp) (finished bool, next bool) { +func (s *slabDownload) receive(resp sectorDownloadResp) bool { s.mu.Lock() defer s.mu.Unlock() @@ -1197,7 +1189,7 @@ func (s *slabDownload) receive(resp sectorDownloadResp) (finished bool, next boo s.numInflight-- if resp.err != nil { s.errs = append(s.errs, &HostError{resp.req.hk, resp.err}) - return false, false + return false } // update num overpaid @@ -1209,7 +1201,16 @@ func (s *slabDownload) receive(resp sectorDownloadResp) (finished bool, next boo s.sectors[resp.req.sectorIndex] = resp.sector s.numCompleted++ - return s.numCompleted >= s.minShards, s.numCompleted+int(s.mgr.maxOverdrive) >= s.minShards + // try trigger next slab + if !s.nextSlabTriggered && s.numCompleted+int(s.mgr.maxOverdrive) >= s.minShards { + select { + case <-s.nextSlabChan: + s.nextSlabTriggered = true + default: + } + } + + return s.numCompleted >= s.minShards } func (mgr *downloadManager) fastest(hosts []types.PublicKey) (fastest types.PublicKey) { @@ -1319,13 +1320,18 @@ func (sr *sectorResponses) Close() error { return nil } -func (sr *sectorResponses) Next() *sectorDownloadResp { - sr.mu.Lock() - defer sr.mu.Unlock() - if len(sr.responses) == 0 { - return nil +func (sr *sectorResponses) Foreach(fn func(res *sectorDownloadResp)) { + for { + sr.mu.Lock() + if len(sr.responses) == 0 { + sr.mu.Unlock() + return + } + + resp := sr.responses[0] + sr.responses = sr.responses[1:] + sr.mu.Unlock() + + fn(resp) } - resp := sr.responses[0] - sr.responses = sr.responses[1:] - return resp } From fdd14137a49832ac9cc654fca3bba9376879b7cb Mon Sep 17 00:00:00 2001 From: PJ Date: Tue, 7 Nov 2023 22:20:53 +0100 Subject: [PATCH 04/12] stores: default MigrationSurchargeMultiplier --- stores/migrations.go | 54 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/stores/migrations.go b/stores/migrations.go index 82fdaa449..2d04365aa 100644 --- a/stores/migrations.go +++ b/stores/migrations.go @@ -2,6 +2,8 @@ package stores import ( "context" + "encoding/json" + "errors" "fmt" "reflect" "strings" @@ -10,6 +12,7 @@ import ( "go.sia.tech/renterd/api" "go.uber.org/zap" "gorm.io/gorm" + "gorm.io/gorm/clause" ) var ( @@ -273,6 +276,12 @@ func performMigrations(db *gorm.DB, logger *zap.SugaredLogger) error { return performMigration00022_extendObjectID(tx, logger) }, }, + { + ID: "00023_defaultMigrationSurchargeMultiplier", + Migrate: func(tx *gorm.DB) error { + return performMigration00023_defaultMigrationSurchargeMultiplier(tx, logger) + }, + }, } // Create migrator. m := gormigrate.New(db, gormigrate.DefaultOptions, migrations) @@ -995,3 +1004,48 @@ func performMigration00022_extendObjectID(txn *gorm.DB, logger *zap.SugaredLogge logger.Info("migration 00022_extendObjectID complete") return nil } + +func performMigration00023_defaultMigrationSurchargeMultiplier(txn *gorm.DB, logger *zap.SugaredLogger) error { + logger.Info("performing migration 00023_defaultMigrationSurchargeMultiplier") + + // fetch setting + var entry dbSetting + if err := txn. + Where(&dbSetting{Key: api.SettingGouging}). + Take(&entry). + Error; errors.Is(err, gorm.ErrRecordNotFound) { + logger.Debugf("no gouging settings found, skipping migration") + return nil + } else if err != nil { + return fmt.Errorf("failed to fetch gouging settings: %w", err) + } + + // unmarshal setting into gouging settings + var gs api.GougingSettings + if err := json.Unmarshal([]byte(entry.Value), &gs); err != nil { + return err + } + + // set default value + if gs.MigrationSurchargeMultiplier == 0 { + gs.MigrationSurchargeMultiplier = 10 + } + + // update setting + if err := gs.Validate(); err != nil { + return err + } else if bytes, err := json.Marshal(gs); err != nil { + return err + } else if err := txn.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "key"}}, + DoUpdates: clause.AssignmentColumns([]string{"value"}), + }).Create(&dbSetting{ + Key: api.SettingGouging, + Value: string(bytes), + }).Error; err != nil { + return err + } + + logger.Info("migration 00023_defaultMigrationSurchargeMultiplier complete") + return nil +} From d77650a10fe2892a73d8e73493d6ebdd49f946ac Mon Sep 17 00:00:00 2001 From: PJ Date: Wed, 8 Nov 2023 11:58:19 +0100 Subject: [PATCH 05/12] worker: expose overpaid --- api/worker.go | 4 +++- autopilot/alerts.go | 17 ++++++++++++++++- autopilot/migrator.go | 11 +++++++++-- worker/download.go | 14 ++++++++------ worker/gouging.go | 18 ++++++++++++------ worker/migrations.go | 18 +++++++++--------- worker/worker.go | 21 +++++++++++++++++---- 7 files changed, 74 insertions(+), 29 deletions(-) diff --git a/api/worker.go b/api/worker.go index 11c9ca59d..3659eb3a4 100644 --- a/api/worker.go +++ b/api/worker.go @@ -53,7 +53,9 @@ type ( // MigrateSlabResponse is the response type for the /slab/migrate endpoint. MigrateSlabResponse struct { - NumShardsMigrated int `json:"numShardsMigrated"` + NumShardsMigrated int `json:"numShardsMigrated"` + Overpaid bool `json:"overpaid"` + Error string `json:"error,omitempty"` } // RHPFormRequest is the request type for the /rhp/form endpoint. diff --git a/autopilot/alerts.go b/autopilot/alerts.go index ab7ac67f2..16a981247 100644 --- a/autopilot/alerts.go +++ b/autopilot/alerts.go @@ -140,7 +140,22 @@ func newOngoingMigrationsAlert(n int) alerts.Alert { } } -func newSlabMigrationFailedAlert(slabKey object.EncryptionKey, health float64, err error) alerts.Alert { +func newCriticalMigrationFailedAlert(slabKey object.EncryptionKey, health float64, err error) alerts.Alert { + return alerts.Alert{ + ID: alertIDForSlab(alertMigrationID, slabKey), + Severity: alerts.SeverityCritical, + Message: "Critical slab migration failed", + Data: map[string]interface{}{ + "error": err.Error(), + "health": health, + "slabKey": slabKey.String(), + "hint": "If migrations of low-health slabs fail, it might be necessary to increase the MigrationSurchargeMultiplier in the gouging settings to ensure it has every chance of succeeding.", + }, + Timestamp: time.Now(), + } +} + +func newMigrationFailedAlert(slabKey object.EncryptionKey, health float64, err error) alerts.Alert { severity := alerts.SeverityError if health < 0.25 { severity = alerts.SeverityCritical diff --git a/autopilot/migrator.go b/autopilot/migrator.go index a99dd27aa..341b518e7 100644 --- a/autopilot/migrator.go +++ b/autopilot/migrator.go @@ -2,6 +2,7 @@ package autopilot import ( "context" + "errors" "fmt" "math" "sort" @@ -50,6 +51,8 @@ func (j *job) execute(ctx context.Context, w Worker) (_ api.MigrateSlabResponse, res, err := w.MigrateSlab(ctx, slab, j.set) if err != nil { return api.MigrateSlabResponse{}, fmt.Errorf("failed to migrate slab; %w", err) + } else if res.Error != "" { + return res, fmt.Errorf("failed to migrate slab; %w", errors.New(res.Error)) } return res, nil @@ -132,8 +135,12 @@ func (m *migrator) performMigrations(p *workerPool) { for j := range jobs { res, err := j.execute(ctx, w) if err != nil { - m.logger.Errorf("%v: migration %d/%d failed, key: %v, health: %v, err: failed to fetch autopilot settings; %v", id, j.slabIdx+1, j.batchSize, j.Key, j.Health, err) - m.ap.RegisterAlert(ctx, newSlabMigrationFailedAlert(j.Key, j.Health, err)) + m.logger.Errorf("%v: migration %d/%d failed, key: %v, health: %v, overpaid: %v, err: %v", id, j.slabIdx+1, j.batchSize, j.Key, j.Health, res.Overpaid, err) + if res.Overpaid { + m.ap.RegisterAlert(ctx, newCriticalMigrationFailedAlert(j.Key, j.Health, err)) + } else { + m.ap.RegisterAlert(ctx, newMigrationFailedAlert(j.Key, j.Health, err)) + } } else { m.logger.Infof("%v: migration %d/%d succeeded, key: %v, health: %v, shards migrated: %v", id, j.slabIdx+1, j.batchSize, j.Key, j.Health, res.NumShardsMigrated) m.ap.DismissAlert(ctx, alertIDForSlab(alertMigrationID, j.Key)) diff --git a/worker/download.go b/worker/download.go index 8c9b6ac8b..050b092eb 100644 --- a/worker/download.go +++ b/worker/download.go @@ -344,6 +344,8 @@ outer: if resp.err != nil { mgr.logger.Errorf("download slab %v failed: %v", resp.index, resp.err) return resp.err + } else if resp.overpaid { + mgr.logger.Debugf("download slab %v succeeded by overpaying", resp.index) } responses[resp.index] = resp @@ -392,7 +394,7 @@ outer: return nil } -func (mgr *downloadManager) DownloadSlab(ctx context.Context, slab object.Slab, contracts []api.ContractMetadata) ([][]byte, error) { +func (mgr *downloadManager) DownloadSlab(ctx context.Context, slab object.Slab, contracts []api.ContractMetadata) ([][]byte, bool, error) { // refresh the downloaders mgr.refreshDownloaders(contracts) @@ -412,7 +414,7 @@ func (mgr *downloadManager) DownloadSlab(ctx context.Context, slab object.Slab, // check if we have enough shards if availableShards < slab.MinShards { - return nil, fmt.Errorf("not enough hosts available to download the slab: %v/%v", availableShards, slab.MinShards) + return nil, false, fmt.Errorf("not enough hosts available to download the slab: %v/%v", availableShards, slab.MinShards) } // create identifier @@ -437,10 +439,10 @@ func (mgr *downloadManager) DownloadSlab(ctx context.Context, slab object.Slab, var resp *slabDownloadResponse select { case <-ctx.Done(): - return nil, ctx.Err() + return nil, false, ctx.Err() case resp = <-responseChan: if resp.err != nil { - return nil, resp.err + return nil, false, resp.err } } @@ -448,10 +450,10 @@ func (mgr *downloadManager) DownloadSlab(ctx context.Context, slab object.Slab, slice.Decrypt(resp.shards) err := slice.Reconstruct(resp.shards) if err != nil { - return nil, err + return nil, false, err } - return resp.shards, err + return resp.shards, resp.overpaid, err } func (mgr *downloadManager) Stats() downloadManagerStats { diff --git a/worker/gouging.go b/worker/gouging.go index d73fb71a8..ef26fd1d7 100644 --- a/worker/gouging.go +++ b/worker/gouging.go @@ -50,23 +50,29 @@ type ( var _ GougingChecker = gougingChecker{} -func GougingCheckerFromContext(ctx context.Context, applyMigrationSurchargeMultiplier bool) (GougingChecker, error) { - gc, ok := ctx.Value(keyGougingChecker).(func(applyMigrationSurchargeMultiplier bool) (GougingChecker, error)) +func GougingCheckerFromContext(ctx context.Context, criticalMigration bool) (GougingChecker, error) { + gc, ok := ctx.Value(keyGougingChecker).(func(bool) (GougingChecker, error)) if !ok { panic("no gouging checker attached to the context") // developer error } - return gc(applyMigrationSurchargeMultiplier) + return gc(criticalMigration) } func WithGougingChecker(ctx context.Context, cs consensusState, gp api.GougingParams) context.Context { - return context.WithValue(ctx, keyGougingChecker, func(applyMigrationSurchargeMultiplier bool) (GougingChecker, error) { + return context.WithValue(ctx, keyGougingChecker, func(criticalMigration bool) (GougingChecker, error) { consensusState, err := cs.ConsensusState(ctx) if err != nil { return gougingChecker{}, fmt.Errorf("failed to get consensus state: %w", err) } - if applyMigrationSurchargeMultiplier && gp.GougingSettings.MigrationSurchargeMultiplier > 0 { - gp.GougingSettings.MaxDownloadPrice = gp.GougingSettings.MaxDownloadPrice.Mul64(gp.GougingSettings.MigrationSurchargeMultiplier) + // adjust the max download price if we are dealing with a critical + // migration that might be failing due to gouging checks + if criticalMigration && gp.GougingSettings.MigrationSurchargeMultiplier > 0 { + if adjustedMaxDownloadPrice, overflow := gp.GougingSettings.MaxDownloadPrice.Mul64WithOverflow(gp.GougingSettings.MigrationSurchargeMultiplier); overflow { + return gougingChecker{}, errors.New("failed to apply the 'MigrationSurchargeMultiplier', overflow detected") + } else { + gp.GougingSettings.MaxDownloadPrice = adjustedMaxDownloadPrice + } } return gougingChecker{ diff --git a/worker/migrations.go b/worker/migrations.go index 1870013ce..212b6eb62 100644 --- a/worker/migrations.go +++ b/worker/migrations.go @@ -11,7 +11,7 @@ import ( "go.uber.org/zap" ) -func migrateSlab(ctx context.Context, d *downloadManager, u *uploadManager, s *object.Slab, dlContracts, ulContracts []api.ContractMetadata, bh uint64, logger *zap.SugaredLogger) (map[types.PublicKey]types.FileContractID, int, error) { +func migrateSlab(ctx context.Context, d *downloadManager, u *uploadManager, s *object.Slab, dlContracts, ulContracts []api.ContractMetadata, bh uint64, logger *zap.SugaredLogger) (map[types.PublicKey]types.FileContractID, int, bool, error) { ctx, span := tracing.Tracer.Start(ctx, "migrateSlab") defer span.End() @@ -52,7 +52,7 @@ func migrateSlab(ctx context.Context, d *downloadManager, u *uploadManager, s *o for _, shard := range s.Shards { used[shard.Host] = h2c[shard.Host] } - return used, 0, nil + return used, 0, false, nil } // calculate the number of missing shards and take into account hosts for @@ -67,16 +67,16 @@ func migrateSlab(ctx context.Context, d *downloadManager, u *uploadManager, s *o // perform some sanity checks if len(ulContracts) < int(s.MinShards) { - return nil, 0, fmt.Errorf("not enough hosts to repair unhealthy shard to minimum redundancy, %d<%d", len(ulContracts), int(s.MinShards)) + return nil, 0, false, fmt.Errorf("not enough hosts to repair unhealthy shard to minimum redundancy, %d<%d", len(ulContracts), int(s.MinShards)) } if len(s.Shards)-missingShards < int(s.MinShards) { - return nil, 0, fmt.Errorf("not enough hosts to download unhealthy shard, %d<%d", len(s.Shards)-len(shardIndices), int(s.MinShards)) + return nil, 0, false, fmt.Errorf("not enough hosts to download unhealthy shard, %d<%d", len(s.Shards)-len(shardIndices), int(s.MinShards)) } // download the slab - shards, err := d.DownloadSlab(ctx, *s, dlContracts) + shards, overpaid, err := d.DownloadSlab(ctx, *s, dlContracts) if err != nil { - return nil, 0, fmt.Errorf("failed to download slab for migration: %w", err) + return nil, 0, false, fmt.Errorf("failed to download slab for migration: %w", err) } s.Encrypt(shards) @@ -97,7 +97,7 @@ func migrateSlab(ctx context.Context, d *downloadManager, u *uploadManager, s *o // migrate the shards uploaded, used, err := u.UploadShards(ctx, shards, allowed, bh, lockingPriorityUpload) if err != nil { - return nil, 0, fmt.Errorf("failed to upload slab for migration: %w", err) + return nil, 0, overpaid, fmt.Errorf("failed to upload slab for migration: %w", err) } // overwrite the unhealthy shards with the newly migrated ones @@ -111,12 +111,12 @@ func migrateSlab(ctx context.Context, d *downloadManager, u *uploadManager, s *o _, exists := used[sector.Host] if !exists { if fcid, exists := h2c[sector.Host]; !exists { - return nil, 0, fmt.Errorf("couldn't find contract for host %v", sector.Host) + return nil, 0, overpaid, fmt.Errorf("couldn't find contract for host %v", sector.Host) } else { used[sector.Host] = fcid } } } - return used, len(shards), nil + return used, len(shards), overpaid, nil } diff --git a/worker/worker.go b/worker/worker.go index 9df665329..bb0de45f0 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -906,17 +906,30 @@ func (w *worker) slabMigrateHandler(jc jape.Context) { } // migrate the slab - used, numShardsMigrated, err := migrateSlab(ctx, w.downloadManager, w.uploadManager, &slab, dlContracts, ulContracts, up.CurrentHeight, w.logger) - if jc.Check("couldn't migrate slabs", err) != nil { + used, numShardsMigrated, overpaid, err := migrateSlab(ctx, w.downloadManager, w.uploadManager, &slab, dlContracts, ulContracts, up.CurrentHeight, w.logger) + if err != nil { + jc.Encode(api.MigrateSlabResponse{ + NumShardsMigrated: numShardsMigrated, + Overpaid: overpaid, + Error: err.Error(), + }) return } // update the slab - if jc.Check("couldn't update slab", w.bus.UpdateSlab(ctx, slab, up.ContractSet, used)) != nil { + if err := w.bus.UpdateSlab(ctx, slab, up.ContractSet, used); err != nil { + jc.Encode(api.MigrateSlabResponse{ + NumShardsMigrated: numShardsMigrated, + Overpaid: overpaid, + Error: err.Error(), + }) return } - jc.Encode(api.MigrateSlabResponse{NumShardsMigrated: numShardsMigrated}) + jc.Encode(api.MigrateSlabResponse{ + NumShardsMigrated: numShardsMigrated, + Overpaid: overpaid, + }) } func (w *worker) downloadsStatsHandlerGET(jc jape.Context) { From 6b75878f7fa9837399766333295dc62cf383feb9 Mon Sep 17 00:00:00 2001 From: PJ Date: Wed, 8 Nov 2023 12:09:03 +0100 Subject: [PATCH 06/12] worker: revert download refactor --- autopilot/alerts.go | 15 +++- autopilot/migrator.go | 15 +++- internal/testing/gouging_test.go | 6 +- stores/metadata.go | 4 +- worker/download.go | 126 ++++++++++++++----------------- worker/rhpv2.go | 41 +++------- worker/upload.go | 2 +- worker/worker.go | 4 +- 8 files changed, 104 insertions(+), 109 deletions(-) diff --git a/autopilot/alerts.go b/autopilot/alerts.go index 16a981247..8e4e6881c 100644 --- a/autopilot/alerts.go +++ b/autopilot/alerts.go @@ -140,11 +140,24 @@ func newOngoingMigrationsAlert(n int) alerts.Alert { } } +func newCriticalMigrationSucceededAlert(slabKey object.EncryptionKey) alerts.Alert { + return alerts.Alert{ + ID: alertIDForSlab(alertMigrationID, slabKey), + Severity: alerts.SeverityInfo, + Message: "Critical migration succeeded", + Data: map[string]interface{}{ + "slabKey": slabKey.String(), + "hint": "This migration succeeded thanks to the MigrationSurchargeMultiplier in the gouging settings that allowed overpaying hosts on some critical sector downloads", + }, + Timestamp: time.Now(), + } +} + func newCriticalMigrationFailedAlert(slabKey object.EncryptionKey, health float64, err error) alerts.Alert { return alerts.Alert{ ID: alertIDForSlab(alertMigrationID, slabKey), Severity: alerts.SeverityCritical, - Message: "Critical slab migration failed", + Message: "Critical migration failed", Data: map[string]interface{}{ "error": err.Error(), "health": health, diff --git a/autopilot/migrator.go b/autopilot/migrator.go index 341b518e7..054cf8ca8 100644 --- a/autopilot/migrator.go +++ b/autopilot/migrator.go @@ -141,9 +141,16 @@ func (m *migrator) performMigrations(p *workerPool) { } else { m.ap.RegisterAlert(ctx, newMigrationFailedAlert(j.Key, j.Health, err)) } - } else { - m.logger.Infof("%v: migration %d/%d succeeded, key: %v, health: %v, shards migrated: %v", id, j.slabIdx+1, j.batchSize, j.Key, j.Health, res.NumShardsMigrated) - m.ap.DismissAlert(ctx, alertIDForSlab(alertMigrationID, j.Key)) + continue + } + + m.logger.Infof("%v: migration %d/%d succeeded, key: %v, health: %v, overpaid: %v, shards migrated: %v", id, j.slabIdx+1, j.batchSize, j.Key, j.Health, res.Overpaid, res.NumShardsMigrated) + m.ap.DismissAlert(ctx, alertIDForSlab(alertMigrationID, j.Key)) + if res.Overpaid { + // this alert confirms the user his gouging settings + // are working, it will be dismissed automatically + // the next time this slab is successfully migrated + m.ap.RegisterAlert(ctx, newCriticalMigrationSucceededAlert(j.Key)) } } }(w) @@ -239,5 +246,7 @@ OUTER: case jobs <- job{slab, i, len(toMigrate), set, b}: } } + + return } } diff --git a/internal/testing/gouging_test.go b/internal/testing/gouging_test.go index ef390fb1b..199959cb1 100644 --- a/internal/testing/gouging_test.go +++ b/internal/testing/gouging_test.go @@ -22,7 +22,7 @@ func TestGouging(t *testing.T) { // create a new test cluster cluster := newTestCluster(t, testClusterOptions{ hosts: int(testAutopilotConfig.Contracts.Amount), - logger: newTestLoggerCustom(zapcore.DebugLevel), + logger: newTestLoggerCustom(zapcore.ErrorLevel), }) defer cluster.Shutdown() @@ -87,7 +87,9 @@ func TestGouging(t *testing.T) { // download the data - should fail buffer.Reset() - if err := w.DownloadObject(context.Background(), &buffer, api.DefaultBucketName, path, api.DownloadObjectOptions{}); err == nil { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + if err := w.DownloadObject(ctx, &buffer, api.DefaultBucketName, path, api.DownloadObjectOptions{}); err == nil { t.Fatal("expected download to fail") } } diff --git a/stores/metadata.go b/stores/metadata.go index 85834fbab..1e186a57b 100644 --- a/stores/metadata.go +++ b/stores/metadata.go @@ -1581,8 +1581,8 @@ func (ss *SQLStore) UpdateSlab(ctx context.Context, s object.Slab, contractSet s // make sure the roots stay the same. for i, shard := range s.Shards { - if bytes.Equal(shard.Root[:], slab.Shards[i].Root) { - return fmt.Errorf("%w: shard %v has changed root from %v to %v", errShardRootChanged, i, slab.Shards[i].Root, shard.Root) + if !bytes.Equal(shard.Root[:], slab.Shards[i].Root) { + return fmt.Errorf("%w: shard %v has changed root from %v to %v", errShardRootChanged, i, slab.Shards[i].Root, shard.Root[:]) } } diff --git a/worker/download.go b/worker/download.go index 050b092eb..3d4171f0d 100644 --- a/worker/download.go +++ b/worker/download.go @@ -566,6 +566,7 @@ func (mgr *downloadManager) newSlabDownload(ctx context.Context, dID id, slice o used: make(map[types.PublicKey]struct{}), sectors: make([][]byte, len(slice.Shards)), + errs: make(HostErrorSet), } } @@ -577,19 +578,14 @@ func (mgr *downloadManager) downloadSlab(ctx context.Context, dID id, slice obje // prepare new download slab := mgr.newSlabDownload(ctx, dID, slice, index, migration, nextSlabChan) - // start downloading - shards, overpaid, err := slab.download(ctx) - if err != nil { - responseChan <- &slabDownloadResponse{ - index: index, - err: err, - } - } else { - responseChan <- &slabDownloadResponse{ - index: index, - shards: shards, - overpaid: overpaid, - } + // execute download + resp := &slabDownloadResponse{index: index} + resp.shards, resp.overpaid, resp.err = slab.download(ctx) + + // send the response + select { + case <-ctx.Done(): + case responseChan <- resp: } } @@ -976,9 +972,10 @@ func (s *slabDownload) nextRequest(ctx context.Context, resps *sectorResponses, hk: sector.Host, // overpay is set to 'true' when a request is retried after the slab - // download failed, and we realise that it might have succeeded if we - // allowed overpaying for this download, we only do this for migrations - // and when the slab is below a certain health threshold + // download failed and we realise that it might have succeeded if we + // allowed overpaying for certain sectors, we only do this when trying + // to migrate a critically low-health slab that might otherwise be + // unrecoverable overpay: false, overdrive: overdrive, @@ -1019,10 +1016,10 @@ func (s *slabDownload) download(ctx context.Context) ([][]byte, bool, error) { var gouging []*sectorDownloadReq // collect responses - var lastResort bool + var done bool loop: - for s.ongoing() { + for s.inflight() > 0 && !done { select { case <-s.mgr.stopChan: return nil, false, errors.New("download stopped") @@ -1032,22 +1029,28 @@ loop: resetOverdrive() } - resps.Foreach(func(resp *sectorDownloadResp) { - if done := s.receive(*resp); done { - return + for { + resp := resps.Next() + if resp == nil { + break + } + + // receive the response + done = s.receive(*resp) + if done { + break } + // handle errors if resp.err != nil { // launch overdrive requests - if !lastResort { - for { - if req := s.nextRequest(ctx, resps, true); req != nil { - if err := s.launch(req); err != nil { - continue // try the next request if this fails to launch - } + for { + if req := s.nextRequest(ctx, resps, true); req != nil { + if err := s.launch(req); err != nil { + continue } - break } + break } // handle lost sectors @@ -1057,21 +1060,19 @@ loop: } else { s.mgr.logger.Infow("successfully marked sector as lost", "hk", resp.req.hk, "root", resp.req.root) } - } else if isPriceTableGouging(resp.err) && s.overpay { - resp.req.overpay = true + } else if isPriceTableGouging(resp.err) && s.overpay && !resp.req.overpay { + resp.req.overpay = true // ensures we don't retry the same request over and over again gouging = append(gouging, resp.req) } } - }) + } } - if !lastResort && !s.completed() && len(gouging) >= s.missing() { - lastResort = true + if !done && len(gouging) >= s.missing() { for _, req := range gouging { - if err := s.launch(req); err == nil { - s.errs.Remove(req.hk) - } + _ = s.launch(req) // ignore error } + gouging = nil goto loop } @@ -1130,16 +1131,10 @@ func (s *slabDownload) missing() int { return 0 } -func (s *slabDownload) completed() bool { - s.mu.Lock() - defer s.mu.Unlock() - return s.numCompleted >= s.minShards -} - -func (s *slabDownload) ongoing() bool { +func (s *slabDownload) inflight() uint64 { s.mu.Lock() defer s.mu.Unlock() - return s.numCompleted < s.minShards || s.numInflight > 0 + return s.numInflight } func (s *slabDownload) launch(req *sectorDownloadReq) error { @@ -1178,7 +1173,7 @@ func (s *slabDownload) launch(req *sectorDownloadReq) error { return nil } -func (s *slabDownload) receive(resp sectorDownloadResp) bool { +func (s *slabDownload) receive(resp sectorDownloadResp) (finished bool) { s.mu.Lock() defer s.mu.Unlock() @@ -1190,10 +1185,19 @@ func (s *slabDownload) receive(resp sectorDownloadResp) bool { // failed reqs can't complete the upload s.numInflight-- if resp.err != nil { - s.errs = append(s.errs, &HostError{resp.req.hk, resp.err}) + s.errs[resp.req.hk] = resp.err return false } + // try trigger next slab read + if !s.nextSlabTriggered && s.numCompleted+int(s.mgr.maxOverdrive) >= s.minShards { + select { + case <-s.nextSlabChan: + s.nextSlabTriggered = true + default: + } + } + // update num overpaid if resp.req.overpay { s.numOverpaid++ @@ -1203,15 +1207,6 @@ func (s *slabDownload) receive(resp sectorDownloadResp) bool { s.sectors[resp.req.sectorIndex] = resp.sector s.numCompleted++ - // try trigger next slab - if !s.nextSlabTriggered && s.numCompleted+int(s.mgr.maxOverdrive) >= s.minShards { - select { - case <-s.nextSlabChan: - s.nextSlabTriggered = true - default: - } - } - return s.numCompleted >= s.minShards } @@ -1322,18 +1317,13 @@ func (sr *sectorResponses) Close() error { return nil } -func (sr *sectorResponses) Foreach(fn func(res *sectorDownloadResp)) { - for { - sr.mu.Lock() - if len(sr.responses) == 0 { - sr.mu.Unlock() - return - } - - resp := sr.responses[0] - sr.responses = sr.responses[1:] - sr.mu.Unlock() - - fn(resp) +func (sr *sectorResponses) Next() *sectorDownloadResp { + sr.mu.Lock() + defer sr.mu.Unlock() + if len(sr.responses) == 0 { + return nil } + resp := sr.responses[0] + sr.responses = sr.responses[1:] + return resp } diff --git a/worker/rhpv2.go b/worker/rhpv2.go index 05b368323..3537858ce 100644 --- a/worker/rhpv2.go +++ b/worker/rhpv2.go @@ -50,51 +50,30 @@ var ( ErrContractFinalized = errors.New("contract cannot be revised further") ) -// A HostError associates an error with a given host. -type HostError struct { - HostKey types.PublicKey - Err error -} - -// Error implements error. -func (he HostError) Error() string { - return fmt.Sprintf("%x: %v", he.HostKey[:4], he.Err.Error()) -} - -// Unwrap returns the underlying error. -func (he HostError) Unwrap() error { - return he.Err -} - // A HostErrorSet is a collection of errors from various hosts. -type HostErrorSet []*HostError +type HostErrorSet map[types.PublicKey]error // NumGouging returns numbers of host that errored out due to price gouging. func (hes HostErrorSet) NumGouging() (n int) { for _, he := range hes { - if errors.Is(he.Err, errPriceTableGouging) { + if errors.Is(he, errPriceTableGouging) { n++ } } return } -// Remove removes all errors for the given host. -func (hes HostErrorSet) Remove(hk types.PublicKey) { - for i := 0; i < len(hes); i++ { - if hes[i].HostKey == hk { - hes = append(hes[:i], hes[i+1:]...) - i-- - } - } -} - // Error implements error. func (hes HostErrorSet) Error() string { - strs := make([]string, len(hes)) - for i := range strs { - strs[i] = hes[i].Error() + if len(hes) == 0 { + return "" } + + var strs []string + for hk, he := range hes { + strs = append(strs, fmt.Sprintf("%x: %v", hk[:4], he.Error())) + } + // include a leading newline so that the first error isn't printed on the // same line as the error context return "\n" + strings.Join(strs, "\n") diff --git a/worker/upload.go b/worker/upload.go index 37e217e41..fc56adc0c 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -1561,7 +1561,7 @@ func (s *slabUpload) receive(resp sectorUploadResp) (finished bool, next bool) { // failed reqs can't complete the upload s.numInflight-- if resp.err != nil { - s.errs = append(s.errs, &HostError{resp.req.hk, resp.err}) + s.errs[resp.req.hk] = resp.err return false, false } diff --git a/worker/worker.go b/worker/worker.go index bb0de45f0..3ffe16a04 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -396,6 +396,8 @@ func (w *worker) rhpScanHandler(jc jape.Context) { } func (w *worker) fetchContracts(ctx context.Context, metadatas []api.ContractMetadata, timeout time.Duration, blockHeight uint64) (contracts []api.Contract, errs HostErrorSet) { + errs = make(HostErrorSet) + // create requests channel reqs := make(chan api.ContractMetadata) @@ -410,7 +412,7 @@ func (w *worker) fetchContracts(ctx context.Context, metadatas []api.ContractMet }) mu.Lock() if err != nil { - errs = append(errs, &HostError{HostKey: md.HostKey, Err: err}) + errs[md.HostKey] = err contracts = append(contracts, api.Contract{ ContractMetadata: md, }) From a7311a7e58730c2e86a8af5d323f94c47441b926 Mon Sep 17 00:00:00 2001 From: PJ Date: Wed, 8 Nov 2023 14:54:34 +0100 Subject: [PATCH 07/12] testing: ignore context cancelled --- internal/testing/gouging_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/internal/testing/gouging_test.go b/internal/testing/gouging_test.go index 199959cb1..a078497c8 100644 --- a/internal/testing/gouging_test.go +++ b/internal/testing/gouging_test.go @@ -3,6 +3,7 @@ package testing import ( "bytes" "context" + "errors" "fmt" "testing" "time" @@ -89,7 +90,7 @@ func TestGouging(t *testing.T) { buffer.Reset() ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() - if err := w.DownloadObject(ctx, &buffer, api.DefaultBucketName, path, api.DownloadObjectOptions{}); err == nil { - t.Fatal("expected download to fail") + if err := w.DownloadObject(ctx, &buffer, api.DefaultBucketName, path, api.DownloadObjectOptions{}); err == nil || errors.Is(err, context.Canceled) { + t.Fatal("expected download to fail", err) } } From 6f4dbbe8f81b5dd6186d6e236fa30639e6932291 Mon Sep 17 00:00:00 2001 From: PJ Date: Wed, 8 Nov 2023 15:07:56 +0100 Subject: [PATCH 08/12] worker: fix nil panic --- worker/upload.go | 1 + 1 file changed, 1 insertion(+) diff --git a/worker/upload.go b/worker/upload.go index fc56adc0c..464411322 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -961,6 +961,7 @@ func (u *upload) newSlabUpload(ctx context.Context, shards [][]byte) (*slabUploa overdriving: make(map[int]int, len(shards)), remaining: make(map[int]sectorCtx, len(shards)), sectors: make([]object.Sector, len(shards)), + errs: make(HostErrorSet), } // prepare sector uploads From e9ab5fa97895856e60bee185a39f6fe59373b900 Mon Sep 17 00:00:00 2001 From: PJ Date: Fri, 10 Nov 2023 14:52:10 +0100 Subject: [PATCH 09/12] worker: implement MR remarks --- api/worker.go | 2 +- autopilot/migrator.go | 8 ++++---- internal/testing/gouging_test.go | 5 +---- stores/metadata.go | 3 +-- worker/download.go | 14 +++++++------- worker/migrations.go | 8 ++++---- worker/worker.go | 8 ++++---- 7 files changed, 22 insertions(+), 26 deletions(-) diff --git a/api/worker.go b/api/worker.go index 3659eb3a4..1a5e77558 100644 --- a/api/worker.go +++ b/api/worker.go @@ -54,7 +54,7 @@ type ( // MigrateSlabResponse is the response type for the /slab/migrate endpoint. MigrateSlabResponse struct { NumShardsMigrated int `json:"numShardsMigrated"` - Overpaid bool `json:"overpaid"` + SurchargeApplied bool `json:"surchargeApplied,omitempty"` Error string `json:"error,omitempty"` } diff --git a/autopilot/migrator.go b/autopilot/migrator.go index 054cf8ca8..05cfbd7e0 100644 --- a/autopilot/migrator.go +++ b/autopilot/migrator.go @@ -135,8 +135,8 @@ func (m *migrator) performMigrations(p *workerPool) { for j := range jobs { res, err := j.execute(ctx, w) if err != nil { - m.logger.Errorf("%v: migration %d/%d failed, key: %v, health: %v, overpaid: %v, err: %v", id, j.slabIdx+1, j.batchSize, j.Key, j.Health, res.Overpaid, err) - if res.Overpaid { + m.logger.Errorf("%v: migration %d/%d failed, key: %v, health: %v, overpaid: %v, err: %v", id, j.slabIdx+1, j.batchSize, j.Key, j.Health, res.SurchargeApplied, err) + if res.SurchargeApplied { m.ap.RegisterAlert(ctx, newCriticalMigrationFailedAlert(j.Key, j.Health, err)) } else { m.ap.RegisterAlert(ctx, newMigrationFailedAlert(j.Key, j.Health, err)) @@ -144,9 +144,9 @@ func (m *migrator) performMigrations(p *workerPool) { continue } - m.logger.Infof("%v: migration %d/%d succeeded, key: %v, health: %v, overpaid: %v, shards migrated: %v", id, j.slabIdx+1, j.batchSize, j.Key, j.Health, res.Overpaid, res.NumShardsMigrated) + m.logger.Infof("%v: migration %d/%d succeeded, key: %v, health: %v, overpaid: %v, shards migrated: %v", id, j.slabIdx+1, j.batchSize, j.Key, j.Health, res.SurchargeApplied, res.NumShardsMigrated) m.ap.DismissAlert(ctx, alertIDForSlab(alertMigrationID, j.Key)) - if res.Overpaid { + if res.SurchargeApplied { // this alert confirms the user his gouging settings // are working, it will be dismissed automatically // the next time this slab is successfully migrated diff --git a/internal/testing/gouging_test.go b/internal/testing/gouging_test.go index a078497c8..fe1f0dedb 100644 --- a/internal/testing/gouging_test.go +++ b/internal/testing/gouging_test.go @@ -3,7 +3,6 @@ package testing import ( "bytes" "context" - "errors" "fmt" "testing" "time" @@ -88,9 +87,7 @@ func TestGouging(t *testing.T) { // download the data - should fail buffer.Reset() - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - if err := w.DownloadObject(ctx, &buffer, api.DefaultBucketName, path, api.DownloadObjectOptions{}); err == nil || errors.Is(err, context.Canceled) { + if err := w.DownloadObject(context.Background(), &buffer, api.DefaultBucketName, path, api.DownloadObjectOptions{}); err == nil { t.Fatal("expected download to fail", err) } } diff --git a/stores/metadata.go b/stores/metadata.go index 1e186a57b..75e996b1e 100644 --- a/stores/metadata.go +++ b/stores/metadata.go @@ -1,7 +1,6 @@ package stores import ( - "bytes" "context" "errors" "fmt" @@ -1581,7 +1580,7 @@ func (ss *SQLStore) UpdateSlab(ctx context.Context, s object.Slab, contractSet s // make sure the roots stay the same. for i, shard := range s.Shards { - if !bytes.Equal(shard.Root[:], slab.Shards[i].Root) { + if shard.Root != types.Hash256(slab.Shards[i].Root) { return fmt.Errorf("%w: shard %v has changed root from %v to %v", errShardRootChanged, i, slab.Shards[i].Root, shard.Root[:]) } } diff --git a/worker/download.go b/worker/download.go index 3d4171f0d..00f586543 100644 --- a/worker/download.go +++ b/worker/download.go @@ -110,10 +110,10 @@ type ( } slabDownloadResponse struct { - overpaid bool - shards [][]byte - index int - err error + surchargeApplied bool + shards [][]byte + index int + err error } sectorDownloadReq struct { @@ -344,7 +344,7 @@ outer: if resp.err != nil { mgr.logger.Errorf("download slab %v failed: %v", resp.index, resp.err) return resp.err - } else if resp.overpaid { + } else if resp.surchargeApplied { mgr.logger.Debugf("download slab %v succeeded by overpaying", resp.index) } @@ -453,7 +453,7 @@ func (mgr *downloadManager) DownloadSlab(ctx context.Context, slab object.Slab, return nil, false, err } - return resp.shards, resp.overpaid, err + return resp.shards, resp.surchargeApplied, err } func (mgr *downloadManager) Stats() downloadManagerStats { @@ -580,7 +580,7 @@ func (mgr *downloadManager) downloadSlab(ctx context.Context, dID id, slice obje // execute download resp := &slabDownloadResponse{index: index} - resp.shards, resp.overpaid, resp.err = slab.download(ctx) + resp.shards, resp.surchargeApplied, resp.err = slab.download(ctx) // send the response select { diff --git a/worker/migrations.go b/worker/migrations.go index 212b6eb62..519d027e4 100644 --- a/worker/migrations.go +++ b/worker/migrations.go @@ -74,7 +74,7 @@ func migrateSlab(ctx context.Context, d *downloadManager, u *uploadManager, s *o } // download the slab - shards, overpaid, err := d.DownloadSlab(ctx, *s, dlContracts) + shards, surchargeApplied, err := d.DownloadSlab(ctx, *s, dlContracts) if err != nil { return nil, 0, false, fmt.Errorf("failed to download slab for migration: %w", err) } @@ -97,7 +97,7 @@ func migrateSlab(ctx context.Context, d *downloadManager, u *uploadManager, s *o // migrate the shards uploaded, used, err := u.UploadShards(ctx, shards, allowed, bh, lockingPriorityUpload) if err != nil { - return nil, 0, overpaid, fmt.Errorf("failed to upload slab for migration: %w", err) + return nil, 0, surchargeApplied, fmt.Errorf("failed to upload slab for migration: %w", err) } // overwrite the unhealthy shards with the newly migrated ones @@ -111,12 +111,12 @@ func migrateSlab(ctx context.Context, d *downloadManager, u *uploadManager, s *o _, exists := used[sector.Host] if !exists { if fcid, exists := h2c[sector.Host]; !exists { - return nil, 0, overpaid, fmt.Errorf("couldn't find contract for host %v", sector.Host) + return nil, 0, surchargeApplied, fmt.Errorf("couldn't find contract for host %v", sector.Host) } else { used[sector.Host] = fcid } } } - return used, len(shards), overpaid, nil + return used, len(shards), surchargeApplied, nil } diff --git a/worker/worker.go b/worker/worker.go index 3ffe16a04..657aa131c 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -908,11 +908,11 @@ func (w *worker) slabMigrateHandler(jc jape.Context) { } // migrate the slab - used, numShardsMigrated, overpaid, err := migrateSlab(ctx, w.downloadManager, w.uploadManager, &slab, dlContracts, ulContracts, up.CurrentHeight, w.logger) + used, numShardsMigrated, surchargeApplied, err := migrateSlab(ctx, w.downloadManager, w.uploadManager, &slab, dlContracts, ulContracts, up.CurrentHeight, w.logger) if err != nil { jc.Encode(api.MigrateSlabResponse{ NumShardsMigrated: numShardsMigrated, - Overpaid: overpaid, + SurchargeApplied: surchargeApplied, Error: err.Error(), }) return @@ -922,7 +922,7 @@ func (w *worker) slabMigrateHandler(jc jape.Context) { if err := w.bus.UpdateSlab(ctx, slab, up.ContractSet, used); err != nil { jc.Encode(api.MigrateSlabResponse{ NumShardsMigrated: numShardsMigrated, - Overpaid: overpaid, + SurchargeApplied: surchargeApplied, Error: err.Error(), }) return @@ -930,7 +930,7 @@ func (w *worker) slabMigrateHandler(jc jape.Context) { jc.Encode(api.MigrateSlabResponse{ NumShardsMigrated: numShardsMigrated, - Overpaid: overpaid, + SurchargeApplied: surchargeApplied, }) } From 5d3fe740e081b7b50e8b3015475ee27012d98541 Mon Sep 17 00:00:00 2001 From: PJ Date: Fri, 10 Nov 2023 14:55:07 +0100 Subject: [PATCH 10/12] worker: update logs --- worker/download.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/worker/download.go b/worker/download.go index 00f586543..29a1b6e2a 100644 --- a/worker/download.go +++ b/worker/download.go @@ -342,7 +342,7 @@ outer: atomic.AddUint64(&concurrentSlabs, ^uint64(0)) if resp.err != nil { - mgr.logger.Errorf("download slab %v failed: %v", resp.index, resp.err) + mgr.logger.Errorf("download slab %v failed, overpaid %v: %v", resp.index, resp.surchargeApplied, resp.err) return resp.err } else if resp.surchargeApplied { mgr.logger.Debugf("download slab %v succeeded by overpaying", resp.index) From 14480a1cbff874b2379be2923ad7fbdcffe5b899 Mon Sep 17 00:00:00 2001 From: PJ Date: Fri, 10 Nov 2023 14:55:38 +0100 Subject: [PATCH 11/12] worker: update logs --- worker/download.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/worker/download.go b/worker/download.go index 29a1b6e2a..259c90644 100644 --- a/worker/download.go +++ b/worker/download.go @@ -345,7 +345,7 @@ outer: mgr.logger.Errorf("download slab %v failed, overpaid %v: %v", resp.index, resp.surchargeApplied, resp.err) return resp.err } else if resp.surchargeApplied { - mgr.logger.Debugf("download slab %v succeeded by overpaying", resp.index) + mgr.logger.Warnf("download for slab %v had to overpay to succeed", resp.index) } responses[resp.index] = resp From 82f6c7af5300e55d999e26a3f2e63589489fe731 Mon Sep 17 00:00:00 2001 From: PJ Date: Wed, 15 Nov 2023 15:49:24 +0100 Subject: [PATCH 12/12] build: add default --- build/env_default.go | 1 + build/env_testnet.go | 1 + 2 files changed, 2 insertions(+) diff --git a/build/env_default.go b/build/env_default.go index 49e7fa099..8820ec6ae 100644 --- a/build/env_default.go +++ b/build/env_default.go @@ -32,6 +32,7 @@ var ( MinPriceTableValidity: 5 * time.Minute, // 5 minutes MinAccountExpiry: 24 * time.Hour, // 1 day MinMaxEphemeralAccountBalance: types.Siacoins(1), // 1 SC + MigrationSurchargeMultiplier: 10, // 10x } // DefaultUploadPackingSettings define the default upload packing settings diff --git a/build/env_testnet.go b/build/env_testnet.go index 8798d5127..1bc40d287 100644 --- a/build/env_testnet.go +++ b/build/env_testnet.go @@ -34,6 +34,7 @@ var ( MinPriceTableValidity: 5 * time.Minute, // 5 minutes MinAccountExpiry: 24 * time.Hour, // 1 day MinMaxEphemeralAccountBalance: types.Siacoins(1), // 1 SC + MigrationSurchargeMultiplier: 10, // 10x } // DefaultUploadPackingSettings define the default upload packing settings