diff --git a/api/autopilot.go b/api/autopilot.go index 7e3f9b7f7..c857b31e6 100644 --- a/api/autopilot.go +++ b/api/autopilot.go @@ -140,6 +140,10 @@ func (sb HostScoreBreakdown) String() string { return fmt.Sprintf("Age: %v, Col: %v, Int: %v, SR: %v, UT: %v, V: %v, Pr: %v", sb.Age, sb.Collateral, sb.Interactions, sb.StorageRemaining, sb.Uptime, sb.Version, sb.Prices) } +func (hgb HostGougingBreakdown) DownloadGouging() bool { + return hgb.V3.DownloadErr != "" +} + func (hgb HostGougingBreakdown) Gouging() bool { return hgb.V2.Gouging() || hgb.V3.Gouging() } diff --git a/api/settings.go b/api/settings.go index f435adf64..8d12f12d6 100644 --- a/api/settings.go +++ b/api/settings.go @@ -70,6 +70,12 @@ type ( // MinMaxEphemeralAccountBalance is the minimum accepted value for // `MaxEphemeralAccountBalance` in the host's price settings. MinMaxEphemeralAccountBalance types.Currency `json:"minMaxEphemeralAccountBalance"` + + // MigrationSurchargeMultiplier is the multiplier applied to the + // 'MaxDownloadPrice' when checking whether a host is too expensive, + // this multiplier is only applied for when trying to migrate critically + // low-health slabs. + MigrationSurchargeMultiplier uint64 `json:"migrationSurchargeMultiplier"` } // RedundancySettings contain settings that dictate an object's redundancy. diff --git a/api/worker.go b/api/worker.go index 11c9ca59d..1a5e77558 100644 --- a/api/worker.go +++ b/api/worker.go @@ -53,7 +53,9 @@ type ( // MigrateSlabResponse is the response type for the /slab/migrate endpoint. MigrateSlabResponse struct { - NumShardsMigrated int `json:"numShardsMigrated"` + NumShardsMigrated int `json:"numShardsMigrated"` + SurchargeApplied bool `json:"surchargeApplied,omitempty"` + Error string `json:"error,omitempty"` } // RHPFormRequest is the request type for the /rhp/form endpoint. diff --git a/autopilot/alerts.go b/autopilot/alerts.go index 91d33fda5..1f9d817f2 100644 --- a/autopilot/alerts.go +++ b/autopilot/alerts.go @@ -33,8 +33,8 @@ func alertIDForHost(alertID [32]byte, hk types.PublicKey) types.Hash256 { return types.HashBytes(append(alertID[:], hk[:]...)) } -func alertIDForSlab(alertID [32]byte, slab object.Slab) types.Hash256 { - return types.HashBytes(append(alertID[:], []byte(slab.Key.String())...)) +func alertIDForSlab(alertID [32]byte, slabKey object.EncryptionKey) types.Hash256 { + return types.HashBytes(append(alertID[:], []byte(slabKey.String())...)) } func randomAlertID() types.Hash256 { @@ -159,7 +159,35 @@ func newOngoingMigrationsAlert(n int) alerts.Alert { } } -func newSlabMigrationFailedAlert(slab object.Slab, health float64, err error) alerts.Alert { +func newCriticalMigrationSucceededAlert(slabKey object.EncryptionKey) alerts.Alert { + return alerts.Alert{ + ID: alertIDForSlab(alertMigrationID, slabKey), + Severity: alerts.SeverityInfo, + Message: "Critical migration succeeded", + Data: map[string]interface{}{ + "slabKey": slabKey.String(), + "hint": "This migration succeeded thanks to the MigrationSurchargeMultiplier in the gouging settings that allowed overpaying hosts on some critical sector downloads", + }, + Timestamp: time.Now(), + } +} + +func newCriticalMigrationFailedAlert(slabKey object.EncryptionKey, health float64, err error) alerts.Alert { + return alerts.Alert{ + ID: alertIDForSlab(alertMigrationID, slabKey), + Severity: alerts.SeverityCritical, + Message: "Critical migration failed", + Data: map[string]interface{}{ + "error": err.Error(), + "health": health, + "slabKey": slabKey.String(), + "hint": "If migrations of low-health slabs fail, it might be necessary to increase the MigrationSurchargeMultiplier in the gouging settings to ensure it has every chance of succeeding.", + }, + Timestamp: time.Now(), + } +} + +func newMigrationFailedAlert(slabKey object.EncryptionKey, health float64, err error) alerts.Alert { severity := alerts.SeverityError if health < 0.25 { severity = alerts.SeverityCritical @@ -168,13 +196,13 @@ func newSlabMigrationFailedAlert(slab object.Slab, health float64, err error) al } return alerts.Alert{ - ID: alertIDForSlab(alertMigrationID, slab), + ID: alertIDForSlab(alertMigrationID, slabKey), Severity: severity, Message: "Slab migration failed", Data: map[string]interface{}{ "error": err.Error(), "health": health, - "slabKey": slab.Key.String(), + "slabKey": slabKey.String(), "hint": "Migration failures can be temporary, but if they persist it can eventually lead to data loss and should therefor be taken very seriously.", }, Timestamp: time.Now(), diff --git a/autopilot/migrator.go b/autopilot/migrator.go index 31b5c4135..05cfbd7e0 100644 --- a/autopilot/migrator.go +++ b/autopilot/migrator.go @@ -2,6 +2,8 @@ package autopilot import ( "context" + "errors" + "fmt" "math" "sort" "sync" @@ -17,16 +19,43 @@ const ( migratorBatchSize = math.MaxInt // TODO: change once we have a fix for the infinite loop ) -type migrator struct { - ap *Autopilot - logger *zap.SugaredLogger - healthCutoff float64 - parallelSlabsPerWorker uint64 - signalMaintenanceFinished chan struct{} +type ( + migrator struct { + ap *Autopilot + logger *zap.SugaredLogger + healthCutoff float64 + parallelSlabsPerWorker uint64 + signalMaintenanceFinished chan struct{} + + mu sync.Mutex + migrating bool + migratingLastStart time.Time + } + + job struct { + api.UnhealthySlab + slabIdx int + batchSize int + set string - mu sync.Mutex - migrating bool - migratingLastStart time.Time + b Bus + } +) + +func (j *job) execute(ctx context.Context, w Worker) (_ api.MigrateSlabResponse, err error) { + slab, err := j.b.Slab(ctx, j.Key) + if err != nil { + return api.MigrateSlabResponse{}, fmt.Errorf("failed to fetch slab; %w", err) + } + + res, err := w.MigrateSlab(ctx, slab, j.set) + if err != nil { + return api.MigrateSlabResponse{}, fmt.Errorf("failed to migrate slab; %w", err) + } else if res.Error != "" { + return res, fmt.Errorf("failed to migrate slab; %w", errors.New(res.Error)) + } + + return res, nil } func newMigrator(ap *Autopilot, healthCutoff float64, parallelSlabsPerWorker uint64) *migrator { @@ -75,16 +104,11 @@ func (m *migrator) tryPerformMigrations(ctx context.Context, wp *workerPool) { func (m *migrator) performMigrations(p *workerPool) { m.logger.Info("performing migrations") b := m.ap.bus + ctx, span := tracing.Tracer.Start(context.Background(), "migrator.performMigrations") defer span.End() // prepare a channel to push work to the workers - type job struct { - api.UnhealthySlab - slabIdx int - batchSize int - set string - } jobs := make(chan job) var wg sync.WaitGroup defer func() { @@ -100,32 +124,34 @@ func (m *migrator) performMigrations(p *workerPool) { go func(w Worker) { defer wg.Done() + // fetch worker id once id, err := w.ID(ctx) if err != nil { m.logger.Errorf("failed to fetch worker id: %v", err) return } + // process jobs for j := range jobs { - slab, err := b.Slab(ctx, j.Key) - if err != nil { - m.logger.Errorf("%v: failed to fetch slab for migration %d/%d, health: %v, err: %v", id, j.slabIdx+1, j.batchSize, j.Health, err) - continue - } - ap, err := b.Autopilot(ctx, m.ap.id) + res, err := j.execute(ctx, w) if err != nil { - m.logger.Errorf("%v: failed to fetch autopilot settings for migration %d/%d, health: %v, err: %v", id, j.slabIdx+1, j.batchSize, j.Health, err) + m.logger.Errorf("%v: migration %d/%d failed, key: %v, health: %v, overpaid: %v, err: %v", id, j.slabIdx+1, j.batchSize, j.Key, j.Health, res.SurchargeApplied, err) + if res.SurchargeApplied { + m.ap.RegisterAlert(ctx, newCriticalMigrationFailedAlert(j.Key, j.Health, err)) + } else { + m.ap.RegisterAlert(ctx, newMigrationFailedAlert(j.Key, j.Health, err)) + } continue } - res, err := w.MigrateSlab(ctx, slab, ap.Config.Contracts.Set) - if err != nil { - m.ap.RegisterAlert(ctx, newSlabMigrationFailedAlert(slab, j.Health, err)) - m.logger.Errorf("%v: failed to migrate slab %d/%d, health: %v, err: %v", id, j.slabIdx+1, j.batchSize, j.Health, err) - continue - } else { - m.ap.DismissAlert(ctx, alertIDForSlab(alertMigrationID, slab)) + + m.logger.Infof("%v: migration %d/%d succeeded, key: %v, health: %v, overpaid: %v, shards migrated: %v", id, j.slabIdx+1, j.batchSize, j.Key, j.Health, res.SurchargeApplied, res.NumShardsMigrated) + m.ap.DismissAlert(ctx, alertIDForSlab(alertMigrationID, j.Key)) + if res.SurchargeApplied { + // this alert confirms the user his gouging settings + // are working, it will be dismissed automatically + // the next time this slab is successfully migrated + m.ap.RegisterAlert(ctx, newCriticalMigrationSucceededAlert(j.Key)) } - m.logger.Debugf("%v: successfully migrated slab (health: %v migrated shards: %d) %d/%d", id, j.Health, res.NumShardsMigrated, j.slabIdx+1, j.batchSize) } }(w) } @@ -151,10 +177,7 @@ OUTER: // recompute health. start := time.Now() if err := b.RefreshHealth(ctx); err != nil { - rerr := m.ap.alerts.RegisterAlert(ctx, newRefreshHealthFailedAlert(err)) - if rerr != nil { - m.logger.Errorf("failed to register alert: err %v", rerr) - } + m.ap.RegisterAlert(ctx, newRefreshHealthFailedAlert(err)) m.logger.Errorf("failed to recompute cached health before migration", err) return } @@ -193,7 +216,7 @@ OUTER: toMigrate = append(toMigrate, *slab) } - // sort the newsly added slabs by health + // sort the newly added slabs by health newSlabs := toMigrate[len(toMigrate)-len(migrateNewMap):] sort.Slice(newSlabs, func(i, j int) bool { return newSlabs[i].Health < newSlabs[j].Health @@ -205,10 +228,7 @@ OUTER: // register an alert to notify users about ongoing migrations. if len(toMigrate) > 0 { - err = m.ap.alerts.RegisterAlert(ctx, newOngoingMigrationsAlert(len(toMigrate))) - if err != nil { - m.logger.Errorf("failed to register alert: err %v", err) - } + m.ap.RegisterAlert(ctx, newOngoingMigrationsAlert(len(toMigrate))) } // return if there are no slabs to migrate @@ -223,8 +243,10 @@ OUTER: case <-m.signalMaintenanceFinished: m.logger.Info("migrations interrupted - updating slabs for migration") continue OUTER - case jobs <- job{slab, i, len(toMigrate), set}: + case jobs <- job{slab, i, len(toMigrate), set, b}: } } + + return } } diff --git a/build/env_default.go b/build/env_default.go index 49e7fa099..8820ec6ae 100644 --- a/build/env_default.go +++ b/build/env_default.go @@ -32,6 +32,7 @@ var ( MinPriceTableValidity: 5 * time.Minute, // 5 minutes MinAccountExpiry: 24 * time.Hour, // 1 day MinMaxEphemeralAccountBalance: types.Siacoins(1), // 1 SC + MigrationSurchargeMultiplier: 10, // 10x } // DefaultUploadPackingSettings define the default upload packing settings diff --git a/build/env_testnet.go b/build/env_testnet.go index 8798d5127..1bc40d287 100644 --- a/build/env_testnet.go +++ b/build/env_testnet.go @@ -34,6 +34,7 @@ var ( MinPriceTableValidity: 5 * time.Minute, // 5 minutes MinAccountExpiry: 24 * time.Hour, // 1 day MinMaxEphemeralAccountBalance: types.Siacoins(1), // 1 SC + MigrationSurchargeMultiplier: 10, // 10x } // DefaultUploadPackingSettings define the default upload packing settings diff --git a/internal/testing/gouging_test.go b/internal/testing/gouging_test.go index ef390fb1b..fe1f0dedb 100644 --- a/internal/testing/gouging_test.go +++ b/internal/testing/gouging_test.go @@ -22,7 +22,7 @@ func TestGouging(t *testing.T) { // create a new test cluster cluster := newTestCluster(t, testClusterOptions{ hosts: int(testAutopilotConfig.Contracts.Amount), - logger: newTestLoggerCustom(zapcore.DebugLevel), + logger: newTestLoggerCustom(zapcore.ErrorLevel), }) defer cluster.Shutdown() @@ -88,6 +88,6 @@ func TestGouging(t *testing.T) { // download the data - should fail buffer.Reset() if err := w.DownloadObject(context.Background(), &buffer, api.DefaultBucketName, path, api.DownloadObjectOptions{}); err == nil { - t.Fatal("expected download to fail") + t.Fatal("expected download to fail", err) } } diff --git a/stores/metadata.go b/stores/metadata.go index 61b28a278..5a1858bff 100644 --- a/stores/metadata.go +++ b/stores/metadata.go @@ -1692,7 +1692,7 @@ func (ss *SQLStore) UpdateSlab(ctx context.Context, s object.Slab, contractSet s // make sure the roots stay the same. for i, shard := range s.Shards { if shard.Root != types.Hash256(slab.Shards[i].Root) { - return fmt.Errorf("%w: shard %v has changed root from %v to %v", errShardRootChanged, i, slab.Shards[i].Root, shard.Root) + return fmt.Errorf("%w: shard %v has changed root from %v to %v", errShardRootChanged, i, slab.Shards[i].Root, shard.Root[:]) } } diff --git a/stores/migrations.go b/stores/migrations.go index 9de6e84d0..7d0703a51 100644 --- a/stores/migrations.go +++ b/stores/migrations.go @@ -2,6 +2,8 @@ package stores import ( "context" + "encoding/json" + "errors" "fmt" "reflect" "strings" @@ -10,6 +12,7 @@ import ( "go.sia.tech/renterd/api" "go.uber.org/zap" "gorm.io/gorm" + "gorm.io/gorm/clause" ) var ( @@ -321,6 +324,12 @@ func performMigrations(db *gorm.DB, logger *zap.SugaredLogger) error { return performMigration00029_contractPrice(tx, logger) }, }, + { + ID: "00030_defaultMigrationSurchargeMultiplier", + Migrate: func(tx *gorm.DB) error { + return performMigration00030_defaultMigrationSurchargeMultiplier(tx, logger) + }, + }, } // Create migrator. m := gormigrate.New(db, gormigrate.DefaultOptions, migrations) @@ -1306,3 +1315,48 @@ func performMigration00029_contractPrice(txn *gorm.DB, logger *zap.SugaredLogger logger.Info("migration 00029_contractPrice complete") return nil } + +func performMigration00030_defaultMigrationSurchargeMultiplier(txn *gorm.DB, logger *zap.SugaredLogger) error { + logger.Info("performing migration 00030_defaultMigrationSurchargeMultiplier") + + // fetch setting + var entry dbSetting + if err := txn. + Where(&dbSetting{Key: api.SettingGouging}). + Take(&entry). + Error; errors.Is(err, gorm.ErrRecordNotFound) { + logger.Debugf("no gouging settings found, skipping migration") + return nil + } else if err != nil { + return fmt.Errorf("failed to fetch gouging settings: %w", err) + } + + // unmarshal setting into gouging settings + var gs api.GougingSettings + if err := json.Unmarshal([]byte(entry.Value), &gs); err != nil { + return err + } + + // set default value + if gs.MigrationSurchargeMultiplier == 0 { + gs.MigrationSurchargeMultiplier = 10 + } + + // update setting + if err := gs.Validate(); err != nil { + return err + } else if bytes, err := json.Marshal(gs); err != nil { + return err + } else if err := txn.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "key"}}, + DoUpdates: clause.AssignmentColumns([]string{"value"}), + }).Create(&dbSetting{ + Key: api.SettingGouging, + Value: string(bytes), + }).Error; err != nil { + return err + } + + logger.Info("migration 00030_defaultMigrationSurchargeMultiplier complete") + return nil +} diff --git a/worker/download.go b/worker/download.go index d24b03a07..259c90644 100644 --- a/worker/download.go +++ b/worker/download.go @@ -23,9 +23,10 @@ import ( ) const ( - downloadOverheadB = 284 - maxConcurrentSectorsPerHost = 3 - maxConcurrentSlabsPerDownload = 3 + downloadOverheadB = 284 + downloadOverpayHealthThreshold = 0.25 + maxConcurrentSectorsPerHost = 3 + maxConcurrentSlabsPerDownload = 3 ) type ( @@ -80,13 +81,16 @@ type ( mgr *downloadManager slm sectorLostMarker - dID id - sID slabID - created time.Time index int minShards int - length uint32 offset uint32 + length uint32 + + nextSlabChan chan struct{} + nextSlabTriggered bool + + created time.Time + overpay bool mu sync.Mutex lastOverdrive time.Time @@ -94,6 +98,8 @@ type ( numInflight uint64 numLaunched uint64 numOverdriving uint64 + numOverpaid uint64 + numRelaunched uint64 curr types.PublicKey hostToSectors map[types.PublicKey][]sectorInfo @@ -104,9 +110,10 @@ type ( } slabDownloadResponse struct { - shards [][]byte - index int - err error + surchargeApplied bool + shards [][]byte + index int + err error } sectorDownloadReq struct { @@ -117,18 +124,16 @@ type ( root types.Hash256 hk types.PublicKey + overpay bool overdrive bool sectorIndex int resps *sectorResponses } sectorDownloadResp struct { - overdrive bool - hk types.PublicKey - root types.Hash256 - sectorIndex int - sector []byte - err error + req *sectorDownloadReq + sector []byte + err error } sectorResponses struct { @@ -306,7 +311,7 @@ func (mgr *downloadManager) DownloadObject(ctx context.Context, w io.Writer, o o // launch the download wg.Add(1) go func(index int) { - mgr.downloadSlab(ctx, id, next.SlabSlice, index, responseChan, nextSlabChan) + mgr.downloadSlab(ctx, id, next.SlabSlice, index, false, responseChan, nextSlabChan) wg.Done() }(slabIndex) atomic.AddUint64(&concurrentSlabs, 1) @@ -337,8 +342,10 @@ outer: atomic.AddUint64(&concurrentSlabs, ^uint64(0)) if resp.err != nil { - mgr.logger.Errorf("download slab %v failed: %v", resp.index, resp.err) + mgr.logger.Errorf("download slab %v failed, overpaid %v: %v", resp.index, resp.surchargeApplied, resp.err) return resp.err + } else if resp.surchargeApplied { + mgr.logger.Warnf("download for slab %v had to overpay to succeed", resp.index) } responses[resp.index] = resp @@ -387,7 +394,7 @@ outer: return nil } -func (mgr *downloadManager) DownloadSlab(ctx context.Context, slab object.Slab, contracts []api.ContractMetadata) ([][]byte, error) { +func (mgr *downloadManager) DownloadSlab(ctx context.Context, slab object.Slab, contracts []api.ContractMetadata) ([][]byte, bool, error) { // refresh the downloaders mgr.refreshDownloaders(contracts) @@ -407,7 +414,7 @@ func (mgr *downloadManager) DownloadSlab(ctx context.Context, slab object.Slab, // check if we have enough shards if availableShards < slab.MinShards { - return nil, fmt.Errorf("not enough hosts available to download the slab: %v/%v", availableShards, slab.MinShards) + return nil, false, fmt.Errorf("not enough hosts available to download the slab: %v/%v", availableShards, slab.MinShards) } // create identifier @@ -422,7 +429,7 @@ func (mgr *downloadManager) DownloadSlab(ctx context.Context, slab object.Slab, Length: uint32(slab.MinShards) * rhpv2.SectorSize, } go func() { - mgr.downloadSlab(ctx, id, slice, 0, responseChan, nextSlabChan) + mgr.downloadSlab(ctx, id, slice, 0, true, responseChan, nextSlabChan) // NOTE: when downloading 1 slab we can simply close both channels close(responseChan) close(nextSlabChan) @@ -432,10 +439,10 @@ func (mgr *downloadManager) DownloadSlab(ctx context.Context, slab object.Slab, var resp *slabDownloadResponse select { case <-ctx.Done(): - return nil, ctx.Err() + return nil, false, ctx.Err() case resp = <-responseChan: if resp.err != nil { - return nil, resp.err + return nil, false, resp.err } } @@ -443,10 +450,10 @@ func (mgr *downloadManager) DownloadSlab(ctx context.Context, slab object.Slab, slice.Decrypt(resp.shards) err := slice.Reconstruct(resp.shards) if err != nil { - return nil, err + return nil, false, err } - return resp.shards, err + return resp.shards, resp.surchargeApplied, err } func (mgr *downloadManager) Stats() downloadManagerStats { @@ -530,11 +537,7 @@ func (mgr *downloadManager) refreshDownloaders(contracts []api.ContractMetadata) } } -func (mgr *downloadManager) newSlabDownload(ctx context.Context, dID id, slice object.SlabSlice, slabIndex int) *slabDownload { - // create slab id - var sID slabID - frand.Read(sID[:]) - +func (mgr *downloadManager) newSlabDownload(ctx context.Context, dID id, slice object.SlabSlice, slabIndex int, migration bool, nextSlabChan chan struct{}) *slabDownload { // calculate the offset and length offset, length := slice.SectorRegion() @@ -549,32 +552,35 @@ func (mgr *downloadManager) newSlabDownload(ctx context.Context, dID id, slice o mgr: mgr, slm: mgr.slm, - dID: dID, - sID: sID, - created: time.Now(), + nextSlabChan: nextSlabChan, + index: slabIndex, minShards: int(slice.MinShards), offset: offset, length: length, + created: time.Now(), + overpay: migration && slice.Health <= downloadOverpayHealthThreshold, + hostToSectors: hostToSectors, used: make(map[types.PublicKey]struct{}), sectors: make([][]byte, len(slice.Shards)), + errs: make(HostErrorSet), } } -func (mgr *downloadManager) downloadSlab(ctx context.Context, dID id, slice object.SlabSlice, index int, responseChan chan *slabDownloadResponse, nextSlabChan chan struct{}) { +func (mgr *downloadManager) downloadSlab(ctx context.Context, dID id, slice object.SlabSlice, index int, migration bool, responseChan chan *slabDownloadResponse, nextSlabChan chan struct{}) { // add tracing ctx, span := tracing.Tracer.Start(ctx, "downloadSlab") defer span.End() - // prepare the slab download - slab := mgr.newSlabDownload(ctx, dID, slice, index) + // prepare new download + slab := mgr.newSlabDownload(ctx, dID, slice, index, migration, nextSlabChan) - // download shards + // execute download resp := &slabDownloadResponse{index: index} - resp.shards, resp.err = slab.downloadShards(ctx, nextSlabChan) + resp.shards, resp.surchargeApplied, resp.err = slab.download(ctx) // send the response select { @@ -816,7 +822,7 @@ func (d *downloader) execute(req *sectorDownloadReq) (err error) { // download the sector buf := bytes.NewBuffer(make([]byte, 0, req.length)) - err = d.host.DownloadSector(req.ctx, buf, req.root, req.offset, req.length) + err = d.host.DownloadSector(req.ctx, buf, req.root, req.offset, req.length, req.overpay) if err != nil { req.fail(err) return err @@ -832,20 +838,15 @@ func (d *downloader) execute(req *sectorDownloadReq) (err error) { func (req *sectorDownloadReq) succeed(sector []byte) { req.resps.Add(§orDownloadResp{ - hk: req.hk, - root: req.root, - overdrive: req.overdrive, - sectorIndex: req.sectorIndex, - sector: sector, + req: req, + sector: sector, }) } func (req *sectorDownloadReq) fail(err error) { req.resps.Add(§orDownloadResp{ - err: err, - hk: req.hk, - root: req.root, - overdrive: req.overdrive, + req: req, + err: err, }) } @@ -970,19 +971,26 @@ func (s *slabDownload) nextRequest(ctx context.Context, resps *sectorResponses, root: sector.Root, hk: sector.Host, + // overpay is set to 'true' when a request is retried after the slab + // download failed and we realise that it might have succeeded if we + // allowed overpaying for certain sectors, we only do this when trying + // to migrate a critically low-health slab that might otherwise be + // unrecoverable + overpay: false, + overdrive: overdrive, sectorIndex: sector.index, resps: resps, } } -func (s *slabDownload) downloadShards(ctx context.Context, nextSlabChan chan struct{}) ([][]byte, error) { +func (s *slabDownload) download(ctx context.Context) ([][]byte, bool, error) { // cancel any sector downloads once the download is done ctx, cancel := context.WithCancel(ctx) defer cancel() // add tracing - ctx, span := tracing.Tracer.Start(ctx, "downloadShards") + ctx, span := tracing.Tracer.Start(ctx, "download") defer span.End() // create the responses queue @@ -998,22 +1006,25 @@ func (s *slabDownload) downloadShards(ctx context.Context, nextSlabChan chan str for i := 0; i < int(s.minShards); { req := s.nextRequest(ctx, resps, false) if req == nil { - return nil, fmt.Errorf("no hosts available") + return nil, false, fmt.Errorf("no hosts available") } else if err := s.launch(req); err == nil { i++ } } + // collect requests that failed due to gouging + var gouging []*sectorDownloadReq + // collect responses var done bool - var next bool - var triggered bool + +loop: for s.inflight() > 0 && !done { select { case <-s.mgr.stopChan: - return nil, errors.New("download stopped") + return nil, false, errors.New("download stopped") case <-ctx.Done(): - return nil, ctx.Err() + return nil, false, ctx.Err() case <-resps.c: resetOverdrive() } @@ -1024,35 +1035,47 @@ func (s *slabDownload) downloadShards(ctx context.Context, nextSlabChan chan str break } - if isSectorNotFound(resp.err) { - if err := s.slm.DeleteHostSector(ctx, resp.hk, resp.root); err != nil { - s.mgr.logger.Errorw("failed to mark sector as lost", "hk", resp.hk, "root", resp.root, "err", err) - } else { - s.mgr.logger.Infow("successfully marked sector as lost", "hk", resp.hk, "root", resp.root) - } + // receive the response + done = s.receive(*resp) + if done { + break } - done, next = s.receive(*resp) - if !done && resp.err != nil { + // handle errors + if resp.err != nil { + // launch overdrive requests for { if req := s.nextRequest(ctx, resps, true); req != nil { if err := s.launch(req); err != nil { - continue // try the next request if this fails to launch + continue } } break } - } - if next && !triggered { - select { - case <-nextSlabChan: - triggered = true - default: + + // handle lost sectors + if isSectorNotFound(resp.err) { + if err := s.slm.DeleteHostSector(ctx, resp.req.hk, resp.req.root); err != nil { + s.mgr.logger.Errorw("failed to mark sector as lost", "hk", resp.req.hk, "root", resp.req.root, "err", err) + } else { + s.mgr.logger.Infow("successfully marked sector as lost", "hk", resp.req.hk, "root", resp.req.root) + } + } else if isPriceTableGouging(resp.err) && s.overpay && !resp.req.overpay { + resp.req.overpay = true // ensures we don't retry the same request over and over again + gouging = append(gouging, resp.req) } } } } + if !done && len(gouging) >= s.missing() { + for _, req := range gouging { + _ = s.launch(req) // ignore error + } + gouging = nil + goto loop + } + // track stats s.mgr.statsOverdrivePct.Track(s.overdrivePct()) s.mgr.statsSlabDownloadSpeedBytesPerMS.Track(float64(s.downloadSpeed())) @@ -1063,7 +1086,7 @@ func (s *slabDownload) overdrivePct() float64 { s.mu.Lock() defer s.mu.Unlock() - numOverdrive := int(s.numLaunched) - s.minShards + numOverdrive := (int(s.numLaunched) + int(s.numRelaunched)) - s.minShards if numOverdrive < 0 { numOverdrive = 0 } @@ -1083,7 +1106,7 @@ func (s *slabDownload) downloadSpeed() int64 { return int64(bytes) / ms } -func (s *slabDownload) finish() ([][]byte, error) { +func (s *slabDownload) finish() ([][]byte, bool, error) { s.mu.Lock() defer s.mu.Unlock() if s.numCompleted < s.minShards { @@ -1093,9 +1116,19 @@ func (s *slabDownload) finish() ([][]byte, error) { unused++ } } - return nil, fmt.Errorf("failed to download slab: completed=%d, inflight=%d, launched=%d downloaders=%d unused=%d errors=%d %w", s.numCompleted, s.numInflight, s.numLaunched, s.mgr.numDownloaders(), unused, len(s.errs), s.errs) + + return nil, s.numOverpaid > 0, fmt.Errorf("failed to download slab: completed=%d, inflight=%d, launched=%d, relaunched=%d, overpaid=%d, downloaders=%d unused=%d errors=%d %v", s.numCompleted, s.numInflight, s.numLaunched, s.numRelaunched, s.numOverpaid, s.mgr.numDownloaders(), unused, len(s.errs), s.errs) + } + return s.sectors, s.numOverpaid > 0, nil +} + +func (s *slabDownload) missing() int { + s.mu.Lock() + defer s.mu.Unlock() + if s.numCompleted < s.minShards { + return s.minShards - s.numCompleted } - return s.sectors, nil + return 0 } func (s *slabDownload) inflight() uint64 { @@ -1113,6 +1146,11 @@ func (s *slabDownload) launch(req *sectorDownloadReq) error { return errors.New("no request given") } + // check for completed sector + if len(s.sectors[req.sectorIndex]) > 0 { + return errors.New("sector already downloaded") + } + // launch the req err := s.mgr.launch(req) if err != nil { @@ -1124,34 +1162,52 @@ func (s *slabDownload) launch(req *sectorDownloadReq) error { // update the state s.numInflight++ - s.numLaunched++ if req.overdrive { s.numOverdriving++ } + if req.overpay { + s.numRelaunched++ + } else { + s.numLaunched++ + } return nil } -func (s *slabDownload) receive(resp sectorDownloadResp) (finished bool, next bool) { +func (s *slabDownload) receive(resp sectorDownloadResp) (finished bool) { s.mu.Lock() defer s.mu.Unlock() // update num overdriving - if resp.overdrive { + if resp.req.overdrive { s.numOverdriving-- } // failed reqs can't complete the upload s.numInflight-- if resp.err != nil { - s.errs = append(s.errs, &HostError{resp.hk, resp.err}) - return false, false + s.errs[resp.req.hk] = resp.err + return false + } + + // try trigger next slab read + if !s.nextSlabTriggered && s.numCompleted+int(s.mgr.maxOverdrive) >= s.minShards { + select { + case <-s.nextSlabChan: + s.nextSlabTriggered = true + default: + } + } + + // update num overpaid + if resp.req.overpay { + s.numOverpaid++ } // store the sector - s.sectors[resp.sectorIndex] = resp.sector + s.sectors[resp.req.sectorIndex] = resp.sector s.numCompleted++ - return s.numCompleted >= s.minShards, s.numCompleted+int(s.mgr.maxOverdrive) >= s.minShards + return s.numCompleted >= s.minShards } func (mgr *downloadManager) fastest(hosts []types.PublicKey) (fastest types.PublicKey) { diff --git a/worker/gouging.go b/worker/gouging.go index 92c8d7329..ef26fd1d7 100644 --- a/worker/gouging.go +++ b/worker/gouging.go @@ -33,7 +33,7 @@ const ( type ( GougingChecker interface { - Check(*rhpv2.HostSettings, *rhpv3.HostPriceTable) api.HostGougingBreakdown + Check(_ *rhpv2.HostSettings, _ *rhpv3.HostPriceTable) api.HostGougingBreakdown } gougingChecker struct { @@ -50,20 +50,31 @@ type ( var _ GougingChecker = gougingChecker{} -func GougingCheckerFromContext(ctx context.Context) (GougingChecker, error) { - gc, ok := ctx.Value(keyGougingChecker).(func() (GougingChecker, error)) +func GougingCheckerFromContext(ctx context.Context, criticalMigration bool) (GougingChecker, error) { + gc, ok := ctx.Value(keyGougingChecker).(func(bool) (GougingChecker, error)) if !ok { panic("no gouging checker attached to the context") // developer error } - return gc() + return gc(criticalMigration) } func WithGougingChecker(ctx context.Context, cs consensusState, gp api.GougingParams) context.Context { - return context.WithValue(ctx, keyGougingChecker, func() (GougingChecker, error) { + return context.WithValue(ctx, keyGougingChecker, func(criticalMigration bool) (GougingChecker, error) { consensusState, err := cs.ConsensusState(ctx) if err != nil { return gougingChecker{}, fmt.Errorf("failed to get consensus state: %w", err) } + + // adjust the max download price if we are dealing with a critical + // migration that might be failing due to gouging checks + if criticalMigration && gp.GougingSettings.MigrationSurchargeMultiplier > 0 { + if adjustedMaxDownloadPrice, overflow := gp.GougingSettings.MaxDownloadPrice.Mul64WithOverflow(gp.GougingSettings.MigrationSurchargeMultiplier); overflow { + return gougingChecker{}, errors.New("failed to apply the 'MigrationSurchargeMultiplier', overflow detected") + } else { + gp.GougingSettings.MaxDownloadPrice = adjustedMaxDownloadPrice + } + } + return gougingChecker{ consensusState: consensusState, settings: gp.GougingSettings, diff --git a/worker/migrations.go b/worker/migrations.go index 1870013ce..519d027e4 100644 --- a/worker/migrations.go +++ b/worker/migrations.go @@ -11,7 +11,7 @@ import ( "go.uber.org/zap" ) -func migrateSlab(ctx context.Context, d *downloadManager, u *uploadManager, s *object.Slab, dlContracts, ulContracts []api.ContractMetadata, bh uint64, logger *zap.SugaredLogger) (map[types.PublicKey]types.FileContractID, int, error) { +func migrateSlab(ctx context.Context, d *downloadManager, u *uploadManager, s *object.Slab, dlContracts, ulContracts []api.ContractMetadata, bh uint64, logger *zap.SugaredLogger) (map[types.PublicKey]types.FileContractID, int, bool, error) { ctx, span := tracing.Tracer.Start(ctx, "migrateSlab") defer span.End() @@ -52,7 +52,7 @@ func migrateSlab(ctx context.Context, d *downloadManager, u *uploadManager, s *o for _, shard := range s.Shards { used[shard.Host] = h2c[shard.Host] } - return used, 0, nil + return used, 0, false, nil } // calculate the number of missing shards and take into account hosts for @@ -67,16 +67,16 @@ func migrateSlab(ctx context.Context, d *downloadManager, u *uploadManager, s *o // perform some sanity checks if len(ulContracts) < int(s.MinShards) { - return nil, 0, fmt.Errorf("not enough hosts to repair unhealthy shard to minimum redundancy, %d<%d", len(ulContracts), int(s.MinShards)) + return nil, 0, false, fmt.Errorf("not enough hosts to repair unhealthy shard to minimum redundancy, %d<%d", len(ulContracts), int(s.MinShards)) } if len(s.Shards)-missingShards < int(s.MinShards) { - return nil, 0, fmt.Errorf("not enough hosts to download unhealthy shard, %d<%d", len(s.Shards)-len(shardIndices), int(s.MinShards)) + return nil, 0, false, fmt.Errorf("not enough hosts to download unhealthy shard, %d<%d", len(s.Shards)-len(shardIndices), int(s.MinShards)) } // download the slab - shards, err := d.DownloadSlab(ctx, *s, dlContracts) + shards, surchargeApplied, err := d.DownloadSlab(ctx, *s, dlContracts) if err != nil { - return nil, 0, fmt.Errorf("failed to download slab for migration: %w", err) + return nil, 0, false, fmt.Errorf("failed to download slab for migration: %w", err) } s.Encrypt(shards) @@ -97,7 +97,7 @@ func migrateSlab(ctx context.Context, d *downloadManager, u *uploadManager, s *o // migrate the shards uploaded, used, err := u.UploadShards(ctx, shards, allowed, bh, lockingPriorityUpload) if err != nil { - return nil, 0, fmt.Errorf("failed to upload slab for migration: %w", err) + return nil, 0, surchargeApplied, fmt.Errorf("failed to upload slab for migration: %w", err) } // overwrite the unhealthy shards with the newly migrated ones @@ -111,12 +111,12 @@ func migrateSlab(ctx context.Context, d *downloadManager, u *uploadManager, s *o _, exists := used[sector.Host] if !exists { if fcid, exists := h2c[sector.Host]; !exists { - return nil, 0, fmt.Errorf("couldn't find contract for host %v", sector.Host) + return nil, 0, surchargeApplied, fmt.Errorf("couldn't find contract for host %v", sector.Host) } else { used[sector.Host] = fcid } } } - return used, len(shards), nil + return used, len(shards), surchargeApplied, nil } diff --git a/worker/rhpv2.go b/worker/rhpv2.go index bdaa5fa9e..11cdf8295 100644 --- a/worker/rhpv2.go +++ b/worker/rhpv2.go @@ -50,31 +50,30 @@ var ( ErrContractFinalized = errors.New("contract cannot be revised further") ) -// A HostError associates an error with a given host. -type HostError struct { - HostKey types.PublicKey - Err error -} - -// Error implements error. -func (he HostError) Error() string { - return fmt.Sprintf("%x: %v", he.HostKey[:4], he.Err.Error()) -} +// A HostErrorSet is a collection of errors from various hosts. +type HostErrorSet map[types.PublicKey]error -// Unwrap returns the underlying error. -func (he HostError) Unwrap() error { - return he.Err +// NumGouging returns numbers of host that errored out due to price gouging. +func (hes HostErrorSet) NumGouging() (n int) { + for _, he := range hes { + if errors.Is(he, errPriceTableGouging) { + n++ + } + } + return } -// A HostErrorSet is a collection of errors from various hosts. -type HostErrorSet []*HostError - // Error implements error. func (hes HostErrorSet) Error() string { - strs := make([]string, len(hes)) - for i := range strs { - strs[i] = hes[i].Error() + if len(hes) == 0 { + return "" } + + var strs []string + for hk, he := range hes { + strs = append(strs, fmt.Sprintf("%x: %v", hk[:4], he.Error())) + } + // include a leading newline so that the first error isn't printed on the // same line as the error context return "\n" + strings.Join(strs, "\n") diff --git a/worker/rhpv3.go b/worker/rhpv3.go index cab1ae277..b285cca47 100644 --- a/worker/rhpv3.go +++ b/worker/rhpv3.go @@ -65,6 +65,9 @@ var ( // valid. errPriceTableExpired = errors.New("price table requested is expired") + // errPriceTableGouging is returned when the host is price gouging. + errPriceTableGouging = errors.New("host price table gouging") + // errPriceTableNotFound is returned by the host when it can not find a // price table that corresponds with the id we sent it. errPriceTableNotFound = errors.New("price table not found") @@ -85,8 +88,8 @@ func isClosedStream(err error) bool { return isError(err, mux.ErrClosedStream) || isError(err, net.ErrClosed) } func isInsufficientFunds(err error) bool { return isError(err, ErrInsufficientFunds) } -func isMaxRevisionReached(err error) bool { return isError(err, errMaxRevisionReached) } func isPriceTableExpired(err error) bool { return isError(err, errPriceTableExpired) } +func isPriceTableGouging(err error) bool { return isError(err, errPriceTableGouging) } func isPriceTableNotFound(err error) bool { return isError(err, errPriceTableNotFound) } func isSectorNotFound(err error) bool { return isError(err, errSectorNotFound) || isError(err, errSectorNotFoundOld) @@ -588,21 +591,32 @@ func (h *host) priceTable(ctx context.Context, rev *types.FileContractRevision) if err != nil { return rhpv3.HostPriceTable{}, err } - gc, err := GougingCheckerFromContext(ctx) + gc, err := GougingCheckerFromContext(ctx, false) if err != nil { return rhpv3.HostPriceTable{}, err } if breakdown := gc.Check(nil, &pt.HostPriceTable); breakdown.Gouging() { - return rhpv3.HostPriceTable{}, fmt.Errorf("host price table gouging: %v", breakdown) + return rhpv3.HostPriceTable{}, fmt.Errorf("%w: %v", errPriceTableGouging, breakdown) } return pt.HostPriceTable, nil } -func (h *host) DownloadSector(ctx context.Context, w io.Writer, root types.Hash256, offset, length uint32) (err error) { - pt, err := h.priceTable(ctx, nil) +func (h *host) DownloadSector(ctx context.Context, w io.Writer, root types.Hash256, offset, length uint32, overpay bool) (err error) { + pt, err := h.priceTables.fetch(ctx, h.HostKey(), nil) + if err != nil { + return err + } + hpt := pt.HostPriceTable + + // check for download gouging specifically + gc, err := GougingCheckerFromContext(ctx, overpay) if err != nil { return err } + if breakdown := gc.Check(nil, &hpt); breakdown.DownloadGouging() { + return fmt.Errorf("%w: %v", errPriceTableGouging, breakdown) + } + // return errBalanceInsufficient if balance insufficient defer func() { if isBalanceInsufficient(err) { @@ -612,14 +626,14 @@ func (h *host) DownloadSector(ctx context.Context, w io.Writer, root types.Hash2 return h.acc.WithWithdrawal(ctx, func() (amount types.Currency, err error) { err = h.transportPool.withTransportV3(ctx, h.HostKey(), h.siamuxAddr, func(ctx context.Context, t *transportV3) error { - cost, err := readSectorCost(pt, uint64(length)) + cost, err := readSectorCost(hpt, uint64(length)) if err != nil { return err } var refund types.Currency payment := rhpv3.PayByEphemeralAccount(h.acc.id, cost, pt.HostBlockHeight+defaultWithdrawalExpiryBlocks, h.accountKey) - cost, refund, err = RPCReadSector(ctx, t, w, pt, &payment, offset, length, root) + cost, refund, err = RPCReadSector(ctx, t, w, hpt, &payment, offset, length, root) amount = cost.Sub(refund) return err }) @@ -1365,7 +1379,7 @@ func RPCRenew(ctx context.Context, rrr api.RHPRenewRequest, bus Bus, t *transpor } // Perform gouging checks. - gc, err := GougingCheckerFromContext(ctx) + gc, err := GougingCheckerFromContext(ctx, false) if err != nil { return rhpv2.ContractRevision{}, nil, fmt.Errorf("failed to get gouging checker: %w", err) } diff --git a/worker/upload.go b/worker/upload.go index 37e217e41..464411322 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -961,6 +961,7 @@ func (u *upload) newSlabUpload(ctx context.Context, shards [][]byte) (*slabUploa overdriving: make(map[int]int, len(shards)), remaining: make(map[int]sectorCtx, len(shards)), sectors: make([]object.Sector, len(shards)), + errs: make(HostErrorSet), } // prepare sector uploads @@ -1561,7 +1562,7 @@ func (s *slabUpload) receive(resp sectorUploadResp) (finished bool, next bool) { // failed reqs can't complete the upload s.numInflight-- if resp.err != nil { - s.errs = append(s.errs, &HostError{resp.req.hk, resp.err}) + s.errs[resp.req.hk] = resp.err return false, false } diff --git a/worker/worker.go b/worker/worker.go index c07bfc77f..755c80adb 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -215,7 +215,7 @@ type hostV2 interface { type hostV3 interface { hostV2 - DownloadSector(ctx context.Context, w io.Writer, root types.Hash256, offset, length uint32) error + DownloadSector(ctx context.Context, w io.Writer, root types.Hash256, offset, length uint32, overpay bool) error FetchPriceTable(ctx context.Context, rev *types.FileContractRevision) (hpt hostdb.HostPriceTable, err error) FetchRevision(ctx context.Context, fetchTimeout time.Duration, blockHeight uint64) (types.FileContractRevision, error) FundAccount(ctx context.Context, balance types.Currency, rev *types.FileContractRevision) error @@ -396,6 +396,8 @@ func (w *worker) rhpScanHandler(jc jape.Context) { } func (w *worker) fetchContracts(ctx context.Context, metadatas []api.ContractMetadata, timeout time.Duration, blockHeight uint64) (contracts []api.Contract, errs HostErrorSet) { + errs = make(HostErrorSet) + // create requests channel reqs := make(chan api.ContractMetadata) @@ -410,7 +412,7 @@ func (w *worker) fetchContracts(ctx context.Context, metadatas []api.ContractMet }) mu.Lock() if err != nil { - errs = append(errs, &HostError{HostKey: md.HostKey, Err: err}) + errs[md.HostKey] = err contracts = append(contracts, api.Contract{ ContractMetadata: md, }) @@ -523,7 +525,7 @@ func (w *worker) rhpFormHandler(jc jape.Context) { // just used it to dial the host we know it's valid hostSettings.NetAddress = hostIP - gc, err := GougingCheckerFromContext(ctx) + gc, err := GougingCheckerFromContext(ctx, false) if err != nil { return err } @@ -906,17 +908,30 @@ func (w *worker) slabMigrateHandler(jc jape.Context) { } // migrate the slab - used, numShardsMigrated, err := migrateSlab(ctx, w.downloadManager, w.uploadManager, &slab, dlContracts, ulContracts, up.CurrentHeight, w.logger) - if jc.Check("couldn't migrate slabs", err) != nil { + used, numShardsMigrated, surchargeApplied, err := migrateSlab(ctx, w.downloadManager, w.uploadManager, &slab, dlContracts, ulContracts, up.CurrentHeight, w.logger) + if err != nil { + jc.Encode(api.MigrateSlabResponse{ + NumShardsMigrated: numShardsMigrated, + SurchargeApplied: surchargeApplied, + Error: err.Error(), + }) return } // update the slab - if jc.Check("couldn't update slab", w.bus.UpdateSlab(ctx, slab, up.ContractSet, used)) != nil { + if err := w.bus.UpdateSlab(ctx, slab, up.ContractSet, used); err != nil { + jc.Encode(api.MigrateSlabResponse{ + NumShardsMigrated: numShardsMigrated, + SurchargeApplied: surchargeApplied, + Error: err.Error(), + }) return } - jc.Encode(api.MigrateSlabResponse{NumShardsMigrated: numShardsMigrated}) + jc.Encode(api.MigrateSlabResponse{ + NumShardsMigrated: numShardsMigrated, + SurchargeApplied: surchargeApplied, + }) } func (w *worker) downloadsStatsHandlerGET(jc jape.Context) {