From cb16ec1703b3cef6b56e80196873aa02fd5839bb Mon Sep 17 00:00:00 2001 From: PJ Date: Fri, 3 Nov 2023 14:36:20 +0100 Subject: [PATCH] worker: overpay (migration) downloads on low health slabs --- api/autopilot.go | 4 ++ api/settings.go | 6 ++ api/worker.go | 4 -- autopilot/alerts.go | 7 +- worker/download.go | 155 +++++++++++++++++++++++++++++--------------- worker/gouging.go | 15 +++-- worker/rhpv2.go | 10 +++ worker/rhpv3.go | 23 +++++-- worker/worker.go | 4 +- 9 files changed, 154 insertions(+), 74 deletions(-) diff --git a/api/autopilot.go b/api/autopilot.go index 3c4ceb688..784ba536b 100644 --- a/api/autopilot.go +++ b/api/autopilot.go @@ -139,6 +139,10 @@ func (sb HostScoreBreakdown) String() string { return fmt.Sprintf("Age: %v, Col: %v, Int: %v, SR: %v, UT: %v, V: %v, Pr: %v", sb.Age, sb.Collateral, sb.Interactions, sb.StorageRemaining, sb.Uptime, sb.Version, sb.Prices) } +func (hgb HostGougingBreakdown) DownloadGouging() bool { + return hgb.V3.DownloadErr != "" +} + func (hgb HostGougingBreakdown) Gouging() bool { return hgb.V2.Gouging() || hgb.V3.Gouging() } diff --git a/api/settings.go b/api/settings.go index 16b2ec938..50230d7d8 100644 --- a/api/settings.go +++ b/api/settings.go @@ -64,6 +64,12 @@ type ( // MinMaxEphemeralAccountBalance is the minimum accepted value for // `MaxEphemeralAccountBalance` in the host's price settings. MinMaxEphemeralAccountBalance types.Currency `json:"minMaxEphemeralAccountBalance"` + + // MigrationSurchargeMultiplier is the multiplier applied to the + // 'MaxDownloadPrice' when checking whether a host is too expensive, + // this multiplier is only applied for when trying to migrate critically + // low-health slabs. + MigrationSurchargeMultiplier uint64 `json:"migrationSurchargeMultiplier"` } // RedundancySettings contain settings that dictate an object's redundancy. diff --git a/api/worker.go b/api/worker.go index ab8cbe843..11c9ca59d 100644 --- a/api/worker.go +++ b/api/worker.go @@ -21,10 +21,6 @@ var ( // ErrContractSetNotSpecified is returned by the worker API by endpoints that // need a contract set to be able to upload data. ErrContractSetNotSpecified = errors.New("contract set is not specified") - - // ErrGougingPreventedDownload is returned by the worker API when a download - // failed because a critical number of hosts were price gouging. - ErrGougingPreventedDownload = errors.New("gouging settings prevented download from succeeding") ) type ( diff --git a/autopilot/alerts.go b/autopilot/alerts.go index cfba3dee7..1bfb97ed7 100644 --- a/autopilot/alerts.go +++ b/autopilot/alerts.go @@ -141,11 +141,6 @@ func newSlabMigrationFailedAlert(slabKey object.EncryptionKey, health float64, e severity = alerts.SeverityCritical } - hint := "Migration failures can be temporary, but if they persist it can eventually lead to data loss and should therefor be taken very seriously." - if isErr(err, api.ErrGougingPreventedDownload) { - hint += " In this particular case, one or more hosts were considered to be price gouging. It might be necessary to adjust your price gouging settings." - } - return alerts.Alert{ ID: alertIDForSlab(alertMigrationID, slabKey), Severity: severity, @@ -154,7 +149,7 @@ func newSlabMigrationFailedAlert(slabKey object.EncryptionKey, health float64, e "error": err, "health": health, "slabKey": slabKey.String(), - "hint": hint, + "hint": "Migration failures can be temporary, but if they persist it can eventually lead to data loss and should therefor be taken very seriously.", }, Timestamp: time.Now(), } diff --git a/worker/download.go b/worker/download.go index 3ac4669e6..9fea6be89 100644 --- a/worker/download.go +++ b/worker/download.go @@ -23,9 +23,10 @@ import ( ) const ( - downloadOverheadB = 284 - maxConcurrentSectorsPerHost = 3 - maxConcurrentSlabsPerDownload = 3 + downloadOverheadB = 284 + downloadOverpayHealthThreshold = 0.25 + maxConcurrentSectorsPerHost = 3 + maxConcurrentSlabsPerDownload = 3 ) type ( @@ -87,6 +88,7 @@ type ( minShards int length uint32 offset uint32 + overpay bool mu sync.Mutex lastOverdrive time.Time @@ -94,6 +96,8 @@ type ( numInflight uint64 numLaunched uint64 numOverdriving uint64 + numOverpaid uint64 + numRelaunched uint64 curr types.PublicKey hostToSectors map[types.PublicKey][]sectorInfo @@ -104,9 +108,10 @@ type ( } slabDownloadResponse struct { - shards [][]byte - index int - err error + overpaid bool + shards [][]byte + index int + err error } sectorDownloadReq struct { @@ -117,18 +122,16 @@ type ( root types.Hash256 hk types.PublicKey + overpay bool overdrive bool sectorIndex int resps *sectorResponses } sectorDownloadResp struct { - overdrive bool - hk types.PublicKey - root types.Hash256 - sectorIndex int - sector []byte - err error + req *sectorDownloadReq + sector []byte + err error } sectorResponses struct { @@ -306,7 +309,7 @@ func (mgr *downloadManager) DownloadObject(ctx context.Context, w io.Writer, o o // launch the download wg.Add(1) go func(index int) { - mgr.downloadSlab(ctx, id, next.SlabSlice, index, responseChan, nextSlabChan) + mgr.downloadSlab(ctx, id, next.SlabSlice, index, false, responseChan, nextSlabChan) wg.Done() }(slabIndex) atomic.AddUint64(&concurrentSlabs, 1) @@ -422,7 +425,7 @@ func (mgr *downloadManager) DownloadSlab(ctx context.Context, slab object.Slab, Length: uint32(slab.MinShards) * rhpv2.SectorSize, } go func() { - mgr.downloadSlab(ctx, id, slice, 0, responseChan, nextSlabChan) + mgr.downloadSlab(ctx, id, slice, 0, true, responseChan, nextSlabChan) // NOTE: when downloading 1 slab we can simply close both channels close(responseChan) close(nextSlabChan) @@ -530,7 +533,7 @@ func (mgr *downloadManager) refreshDownloaders(contracts []api.ContractMetadata) } } -func (mgr *downloadManager) newSlabDownload(ctx context.Context, dID id, slice object.SlabSlice, slabIndex int) *slabDownload { +func (mgr *downloadManager) newSlabDownload(ctx context.Context, dID id, slice object.SlabSlice, slabIndex int, migration bool) *slabDownload { // create slab id var sID slabID frand.Read(sID[:]) @@ -556,6 +559,7 @@ func (mgr *downloadManager) newSlabDownload(ctx context.Context, dID id, slice o minShards: int(slice.MinShards), offset: offset, length: length, + overpay: migration && slice.Health <= downloadOverpayHealthThreshold, hostToSectors: hostToSectors, used: make(map[types.PublicKey]struct{}), @@ -564,17 +568,17 @@ func (mgr *downloadManager) newSlabDownload(ctx context.Context, dID id, slice o } } -func (mgr *downloadManager) downloadSlab(ctx context.Context, dID id, slice object.SlabSlice, index int, responseChan chan *slabDownloadResponse, nextSlabChan chan struct{}) { +func (mgr *downloadManager) downloadSlab(ctx context.Context, dID id, slice object.SlabSlice, index int, migration bool, responseChan chan *slabDownloadResponse, nextSlabChan chan struct{}) { // add tracing ctx, span := tracing.Tracer.Start(ctx, "downloadSlab") defer span.End() // prepare the slab download - slab := mgr.newSlabDownload(ctx, dID, slice, index) + slab := mgr.newSlabDownload(ctx, dID, slice, index, migration) // download shards resp := &slabDownloadResponse{index: index} - resp.shards, resp.err = slab.downloadShards(ctx, nextSlabChan) + resp.shards, resp.overpaid, resp.err = slab.downloadShards(ctx, nextSlabChan) // send the response select { @@ -816,7 +820,7 @@ func (d *downloader) execute(req *sectorDownloadReq) (err error) { // download the sector buf := bytes.NewBuffer(make([]byte, 0, req.length)) - err = d.host.DownloadSector(req.ctx, buf, req.root, req.offset, req.length) + err = d.host.DownloadSector(req.ctx, buf, req.root, req.offset, req.length, req.overpay) if err != nil { req.fail(err) return err @@ -832,20 +836,15 @@ func (d *downloader) execute(req *sectorDownloadReq) (err error) { func (req *sectorDownloadReq) succeed(sector []byte) { req.resps.Add(§orDownloadResp{ - hk: req.hk, - root: req.root, - overdrive: req.overdrive, - sectorIndex: req.sectorIndex, - sector: sector, + req: req, + sector: sector, }) } func (req *sectorDownloadReq) fail(err error) { req.resps.Add(§orDownloadResp{ - err: err, - hk: req.hk, - root: req.root, - overdrive: req.overdrive, + req: req, + err: err, }) } @@ -970,13 +969,19 @@ func (s *slabDownload) nextRequest(ctx context.Context, resps *sectorResponses, root: sector.Root, hk: sector.Host, + // overpay is set to 'true' when a request is retried after the slab + // download failed, and we realise that it might have succeeded if we + // allowed overpaying for this download, we only do this for migrations + // and when the slab is below a certain health threshold + overpay: false, + overdrive: overdrive, sectorIndex: sector.index, resps: resps, } } -func (s *slabDownload) downloadShards(ctx context.Context, nextSlabChan chan struct{}) ([][]byte, error) { +func (s *slabDownload) downloadShards(ctx context.Context, nextSlabChan chan struct{}) ([][]byte, bool, error) { // cancel any sector downloads once the download is done ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -998,22 +1003,28 @@ func (s *slabDownload) downloadShards(ctx context.Context, nextSlabChan chan str for i := 0; i < int(s.minShards); { req := s.nextRequest(ctx, resps, false) if req == nil { - return nil, fmt.Errorf("no hosts available") + return nil, false, fmt.Errorf("no hosts available") } else if err := s.launch(req); err == nil { i++ } } + // collect requests that failed due to gouging + var gouging []*sectorDownloadReq + // collect responses var done bool var next bool + var lastResort bool var triggered bool + +loop: for s.inflight() > 0 && !done { select { case <-s.mgr.stopChan: - return nil, errors.New("download stopped") + return nil, false, errors.New("download stopped") case <-ctx.Done(): - return nil, ctx.Err() + return nil, false, ctx.Err() case <-resps.c: resetOverdrive() } @@ -1024,15 +1035,14 @@ func (s *slabDownload) downloadShards(ctx context.Context, nextSlabChan chan str break } - if isSectorNotFound(resp.err) { - if err := s.slm.DeleteHostSector(ctx, resp.hk, resp.root); err != nil { - s.mgr.logger.Errorw("failed to mark sector as lost", "hk", resp.hk, "root", resp.root, "err", err) - } else { - s.mgr.logger.Infow("successfully marked sector as lost", "hk", resp.hk, "root", resp.root) - } + done, next = s.receive(*resp) + + // only receive responses in last-resort mode + if lastResort { + continue } - done, next = s.receive(*resp) + // launch overdrive requests on failure if !done && resp.err != nil { for { if req := s.nextRequest(ctx, resps, true); req != nil { @@ -1043,6 +1053,8 @@ func (s *slabDownload) downloadShards(ctx context.Context, nextSlabChan chan str break } } + + // trigger next slab download if next && !triggered { select { case <-nextSlabChan: @@ -1050,7 +1062,29 @@ func (s *slabDownload) downloadShards(ctx context.Context, nextSlabChan chan str default: } } + + // handle lost sectors + if isSectorNotFound(resp.err) { + if err := s.slm.DeleteHostSector(ctx, resp.req.hk, resp.req.root); err != nil { + s.mgr.logger.Errorw("failed to mark sector as lost", "hk", resp.req.hk, "root", resp.req.root, "err", err) + } else { + s.mgr.logger.Infow("successfully marked sector as lost", "hk", resp.req.hk, "root", resp.req.root) + } + } else if isPriceTableGouging(resp.err) && s.overpay { + resp.req.overpay = true + gouging = append(gouging, resp.req) + } + } + } + + if !done && !lastResort && len(gouging) >= s.missing() { + for _, req := range gouging { + if err := s.launch(req); err == nil { + s.errs.Remove(req.hk) + } } + lastResort = true + goto loop } // track stats @@ -1063,7 +1097,7 @@ func (s *slabDownload) overdrivePct() float64 { s.mu.Lock() defer s.mu.Unlock() - numOverdrive := int(s.numLaunched) - s.minShards + numOverdrive := (int(s.numLaunched) + int(s.numRelaunched)) - s.minShards if numOverdrive < 0 { numOverdrive = 0 } @@ -1083,7 +1117,7 @@ func (s *slabDownload) downloadSpeed() int64 { return int64(bytes) / ms } -func (s *slabDownload) finish() ([][]byte, error) { +func (s *slabDownload) finish() ([][]byte, bool, error) { s.mu.Lock() defer s.mu.Unlock() if s.numCompleted < s.minShards { @@ -1094,13 +1128,18 @@ func (s *slabDownload) finish() ([][]byte, error) { } } - err := fmt.Errorf("failed to download slab: completed=%d, inflight=%d, launched=%d downloaders=%d unused=%d errors=%d %v", s.numCompleted, s.numInflight, s.numLaunched, s.mgr.numDownloaders(), unused, len(s.errs), s.errs) - if s.numCompleted+s.errs.NumGouging() >= s.minShards { - err = fmt.Errorf("%w; %v", api.ErrGougingPreventedDownload, err) - } - return nil, err + return nil, s.numOverpaid > 0, fmt.Errorf("failed to download slab: completed=%d, inflight=%d, launched=%d, relaunched=%d, overpaid=%d, downloaders=%d unused=%d errors=%d %v", s.numCompleted, s.numInflight, s.numLaunched, s.numRelaunched, s.numOverpaid, s.mgr.numDownloaders(), unused, len(s.errs), s.errs) + } + return s.sectors, s.numOverpaid > 0, nil +} + +func (s *slabDownload) missing() int { + s.mu.Lock() + defer s.mu.Unlock() + if s.numCompleted < s.minShards { + return s.minShards - s.numCompleted } - return s.sectors, nil + return 0 } func (s *slabDownload) inflight() uint64 { @@ -1118,6 +1157,11 @@ func (s *slabDownload) launch(req *sectorDownloadReq) error { return errors.New("no request given") } + // check for completed sector + if len(s.sectors[req.sectorIndex]) > 0 { + return errors.New("sector already downloaded") + } + // launch the req err := s.mgr.launch(req) if err != nil { @@ -1129,10 +1173,14 @@ func (s *slabDownload) launch(req *sectorDownloadReq) error { // update the state s.numInflight++ - s.numLaunched++ if req.overdrive { s.numOverdriving++ } + if req.overpay { + s.numRelaunched++ + } else { + s.numLaunched++ + } return nil } @@ -1141,19 +1189,24 @@ func (s *slabDownload) receive(resp sectorDownloadResp) (finished bool, next boo defer s.mu.Unlock() // update num overdriving - if resp.overdrive { + if resp.req.overdrive { s.numOverdriving-- } // failed reqs can't complete the upload s.numInflight-- if resp.err != nil { - s.errs = append(s.errs, &HostError{resp.hk, resp.err}) + s.errs = append(s.errs, &HostError{resp.req.hk, resp.err}) return false, false } + // update num overpaid + if resp.req.overpay { + s.numOverpaid++ + } + // store the sector - s.sectors[resp.sectorIndex] = resp.sector + s.sectors[resp.req.sectorIndex] = resp.sector s.numCompleted++ return s.numCompleted >= s.minShards, s.numCompleted+int(s.mgr.maxOverdrive) >= s.minShards diff --git a/worker/gouging.go b/worker/gouging.go index 92c8d7329..d73fb71a8 100644 --- a/worker/gouging.go +++ b/worker/gouging.go @@ -33,7 +33,7 @@ const ( type ( GougingChecker interface { - Check(*rhpv2.HostSettings, *rhpv3.HostPriceTable) api.HostGougingBreakdown + Check(_ *rhpv2.HostSettings, _ *rhpv3.HostPriceTable) api.HostGougingBreakdown } gougingChecker struct { @@ -50,20 +50,25 @@ type ( var _ GougingChecker = gougingChecker{} -func GougingCheckerFromContext(ctx context.Context) (GougingChecker, error) { - gc, ok := ctx.Value(keyGougingChecker).(func() (GougingChecker, error)) +func GougingCheckerFromContext(ctx context.Context, applyMigrationSurchargeMultiplier bool) (GougingChecker, error) { + gc, ok := ctx.Value(keyGougingChecker).(func(applyMigrationSurchargeMultiplier bool) (GougingChecker, error)) if !ok { panic("no gouging checker attached to the context") // developer error } - return gc() + return gc(applyMigrationSurchargeMultiplier) } func WithGougingChecker(ctx context.Context, cs consensusState, gp api.GougingParams) context.Context { - return context.WithValue(ctx, keyGougingChecker, func() (GougingChecker, error) { + return context.WithValue(ctx, keyGougingChecker, func(applyMigrationSurchargeMultiplier bool) (GougingChecker, error) { consensusState, err := cs.ConsensusState(ctx) if err != nil { return gougingChecker{}, fmt.Errorf("failed to get consensus state: %w", err) } + + if applyMigrationSurchargeMultiplier && gp.GougingSettings.MigrationSurchargeMultiplier > 0 { + gp.GougingSettings.MaxDownloadPrice = gp.GougingSettings.MaxDownloadPrice.Mul64(gp.GougingSettings.MigrationSurchargeMultiplier) + } + return gougingChecker{ consensusState: consensusState, settings: gp.GougingSettings, diff --git a/worker/rhpv2.go b/worker/rhpv2.go index b4ef0414d..05b368323 100644 --- a/worker/rhpv2.go +++ b/worker/rhpv2.go @@ -79,6 +79,16 @@ func (hes HostErrorSet) NumGouging() (n int) { return } +// Remove removes all errors for the given host. +func (hes HostErrorSet) Remove(hk types.PublicKey) { + for i := 0; i < len(hes); i++ { + if hes[i].HostKey == hk { + hes = append(hes[:i], hes[i+1:]...) + i-- + } + } +} + // Error implements error. func (hes HostErrorSet) Error() string { strs := make([]string, len(hes)) diff --git a/worker/rhpv3.go b/worker/rhpv3.go index 3b04386e5..4f7d2ff08 100644 --- a/worker/rhpv3.go +++ b/worker/rhpv3.go @@ -89,6 +89,7 @@ func isClosedStream(err error) bool { } func isInsufficientFunds(err error) bool { return isError(err, ErrInsufficientFunds) } func isPriceTableExpired(err error) bool { return isError(err, errPriceTableExpired) } +func isPriceTableGouging(err error) bool { return isError(err, errPriceTableGouging) } func isPriceTableNotFound(err error) bool { return isError(err, errPriceTableNotFound) } func isSectorNotFound(err error) bool { return isError(err, errSectorNotFound) || isError(err, errSectorNotFoundOld) @@ -590,7 +591,7 @@ func (h *host) priceTable(ctx context.Context, rev *types.FileContractRevision) if err != nil { return rhpv3.HostPriceTable{}, err } - gc, err := GougingCheckerFromContext(ctx) + gc, err := GougingCheckerFromContext(ctx, false) if err != nil { return rhpv3.HostPriceTable{}, err } @@ -600,11 +601,21 @@ func (h *host) priceTable(ctx context.Context, rev *types.FileContractRevision) return pt.HostPriceTable, nil } -func (h *host) DownloadSector(ctx context.Context, w io.Writer, root types.Hash256, offset, length uint32) (err error) { - pt, err := h.priceTable(ctx, nil) +func (h *host) DownloadSector(ctx context.Context, w io.Writer, root types.Hash256, offset, length uint32, overpay bool) (err error) { + pt, err := h.priceTables.fetch(ctx, h.HostKey(), nil) + if err != nil { + return err + } + hpt := pt.HostPriceTable + + // check for download gouging specifically + gc, err := GougingCheckerFromContext(ctx, overpay) if err != nil { return err } + if breakdown := gc.Check(nil, &hpt); breakdown.DownloadGouging() { + return fmt.Errorf("%w: %v", errPriceTableGouging, breakdown) + } // return errBalanceInsufficient if balance insufficient defer func() { @@ -615,14 +626,14 @@ func (h *host) DownloadSector(ctx context.Context, w io.Writer, root types.Hash2 return h.acc.WithWithdrawal(ctx, func() (amount types.Currency, err error) { err = h.transportPool.withTransportV3(ctx, h.HostKey(), h.siamuxAddr, func(ctx context.Context, t *transportV3) error { - cost, err := readSectorCost(pt, uint64(length)) + cost, err := readSectorCost(hpt, uint64(length)) if err != nil { return err } var refund types.Currency payment := rhpv3.PayByEphemeralAccount(h.acc.id, cost, pt.HostBlockHeight+defaultWithdrawalExpiryBlocks, h.accountKey) - cost, refund, err = RPCReadSector(ctx, t, w, pt, &payment, offset, length, root) + cost, refund, err = RPCReadSector(ctx, t, w, hpt, &payment, offset, length, root) amount = cost.Sub(refund) return err }) @@ -1368,7 +1379,7 @@ func RPCRenew(ctx context.Context, rrr api.RHPRenewRequest, bus Bus, t *transpor } // Perform gouging checks. - gc, err := GougingCheckerFromContext(ctx) + gc, err := GougingCheckerFromContext(ctx, false) if err != nil { return rhpv2.ContractRevision{}, nil, fmt.Errorf("failed to get gouging checker: %w", err) } diff --git a/worker/worker.go b/worker/worker.go index d73c6f1d3..9df665329 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -215,7 +215,7 @@ type hostV2 interface { type hostV3 interface { hostV2 - DownloadSector(ctx context.Context, w io.Writer, root types.Hash256, offset, length uint32) error + DownloadSector(ctx context.Context, w io.Writer, root types.Hash256, offset, length uint32, overpay bool) error FetchPriceTable(ctx context.Context, rev *types.FileContractRevision) (hpt hostdb.HostPriceTable, err error) FetchRevision(ctx context.Context, fetchTimeout time.Duration, blockHeight uint64) (types.FileContractRevision, error) FundAccount(ctx context.Context, balance types.Currency, rev *types.FileContractRevision) error @@ -523,7 +523,7 @@ func (w *worker) rhpFormHandler(jc jape.Context) { // just used it to dial the host we know it's valid hostSettings.NetAddress = hostIP - gc, err := GougingCheckerFromContext(ctx) + gc, err := GougingCheckerFromContext(ctx, false) if err != nil { return err }