Skip to content

Commit

Permalink
autopilot: adjust hint if gouging prevented download
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjan committed Nov 3, 2023
1 parent f5021f7 commit fb9a849
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 50 deletions.
4 changes: 4 additions & 0 deletions api/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ var (
// ErrContractSetNotSpecified is returned by the worker API by endpoints that
// need a contract set to be able to upload data.
ErrContractSetNotSpecified = errors.New("contract set is not specified")

// ErrGougingPreventedDownload is returned by the worker API when a download
// failed because a critical number of hosts were price gouging.
ErrGougingPreventedDownload = errors.New("gouging settings prevented download from succeeding")
)

type (
Expand Down
17 changes: 11 additions & 6 deletions autopilot/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func alertIDForContract(alertID [32]byte, contract api.ContractMetadata) types.H
return types.HashBytes(append(alertID[:], contract.ID[:]...))
}

func alertIDForSlab(alertID [32]byte, slab object.Slab) types.Hash256 {
return types.HashBytes(append(alertID[:], []byte(slab.Key.String())...))
func alertIDForSlab(alertID [32]byte, slabKey object.EncryptionKey) types.Hash256 {
return types.HashBytes(append(alertID[:], []byte(slabKey.String())...))
}

func randomAlertID() types.Hash256 {
Expand Down Expand Up @@ -135,21 +135,26 @@ func newOngoingMigrationsAlert(n int) alerts.Alert {
}
}

func newSlabMigrationFailedAlert(slab object.Slab, health float64, err error) alerts.Alert {
func newSlabMigrationFailedAlert(slabKey object.EncryptionKey, health float64, err error) alerts.Alert {
severity := alerts.SeverityWarning
if health < 0.5 {
severity = alerts.SeverityCritical
}

hint := "Migration failures can be temporary, but if they persist it can eventually lead to data loss and should therefor be taken very seriously."
if isErr(err, api.ErrGougingPreventedDownload) {
hint += " In this particular case, one or more hosts were considered to be price gouging. It might be necessary to adjust your price gouging settings."
}

return alerts.Alert{
ID: alertIDForSlab(alertMigrationID, slab),
ID: alertIDForSlab(alertMigrationID, slabKey),
Severity: severity,
Message: "Slab migration failed",
Data: map[string]interface{}{
"error": err,
"health": health,
"slabKey": slab.Key.String(),
"hint": "Migration failures can be temporary, but if they persist it can eventually lead to data loss and should therefor be taken very seriously.",
"slabKey": slabKey.String(),
"hint": hint,
},
Timestamp: time.Now(),
}
Expand Down
88 changes: 47 additions & 41 deletions autopilot/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package autopilot

import (
"context"
"fmt"
"math"
"sort"
"sync"
Expand All @@ -17,16 +18,41 @@ const (
migratorBatchSize = math.MaxInt // TODO: change once we have a fix for the infinite loop
)

type migrator struct {
ap *Autopilot
logger *zap.SugaredLogger
healthCutoff float64
parallelSlabsPerWorker uint64
signalMaintenanceFinished chan struct{}
type (
migrator struct {
ap *Autopilot
logger *zap.SugaredLogger
healthCutoff float64
parallelSlabsPerWorker uint64
signalMaintenanceFinished chan struct{}

mu sync.Mutex
migrating bool
migratingLastStart time.Time
}

job struct {
api.UnhealthySlab
slabIdx int
batchSize int
set string

b Bus
}
)

func (j *job) execute(ctx context.Context, w Worker) (_ api.MigrateSlabResponse, err error) {
slab, err := j.b.Slab(ctx, j.Key)
if err != nil {
return api.MigrateSlabResponse{}, fmt.Errorf("failed to fetch slab; %w", err)
}

res, err := w.MigrateSlab(ctx, slab, j.set)
if err != nil {
return api.MigrateSlabResponse{}, fmt.Errorf("failed to migrate slab; %w", err)
}

mu sync.Mutex
migrating bool
migratingLastStart time.Time
return res, nil
}

func newMigrator(ap *Autopilot, healthCutoff float64, parallelSlabsPerWorker uint64) *migrator {
Expand Down Expand Up @@ -75,16 +101,11 @@ func (m *migrator) tryPerformMigrations(ctx context.Context, wp *workerPool) {
func (m *migrator) performMigrations(p *workerPool) {
m.logger.Info("performing migrations")
b := m.ap.bus

ctx, span := tracing.Tracer.Start(context.Background(), "migrator.performMigrations")
defer span.End()

// prepare a channel to push work to the workers
type job struct {
api.UnhealthySlab
slabIdx int
batchSize int
set string
}
jobs := make(chan job)
var wg sync.WaitGroup
defer func() {
Expand All @@ -100,32 +121,23 @@ func (m *migrator) performMigrations(p *workerPool) {
go func(w Worker) {
defer wg.Done()

// fetch worker id once
id, err := w.ID(ctx)
if err != nil {
m.logger.Errorf("failed to fetch worker id: %v", err)
return
}

// process jobs
for j := range jobs {
slab, err := b.Slab(ctx, j.Key)
res, err := j.execute(ctx, w)
if err != nil {
m.logger.Errorf("%v: failed to fetch slab for migration %d/%d, health: %v, err: %v", id, j.slabIdx+1, j.batchSize, j.Health, err)
continue
}
ap, err := b.Autopilot(ctx, m.ap.id)
if err != nil {
m.logger.Errorf("%v: failed to fetch autopilot settings for migration %d/%d, health: %v, err: %v", id, j.slabIdx+1, j.batchSize, j.Health, err)
continue
}
res, err := w.MigrateSlab(ctx, slab, ap.Config.Contracts.Set)
if err != nil {
m.ap.RegisterAlert(ctx, newSlabMigrationFailedAlert(slab, j.Health, err))
m.logger.Errorf("%v: failed to migrate slab %d/%d, health: %v, err: %v", id, j.slabIdx+1, j.batchSize, j.Health, err)
continue
m.logger.Errorf("%v: migration %d/%d failed, key: %v, health: %v, err: failed to fetch autopilot settings; %v", id, j.slabIdx+1, j.batchSize, j.Key, j.Health, err)
m.ap.RegisterAlert(ctx, newSlabMigrationFailedAlert(j.Key, j.Health, err))
} else {
m.ap.DismissAlert(ctx, alertIDForSlab(alertMigrationID, slab))
m.logger.Infof("%v: migration %d/%d succeeded, key: %v, health: %v, shards migrated: %v", id, j.slabIdx+1, j.batchSize, j.Key, j.Health, res.NumShardsMigrated)
m.ap.DismissAlert(ctx, alertIDForSlab(alertMigrationID, j.Key))
}
m.logger.Debugf("%v: successfully migrated slab (health: %v migrated shards: %d) %d/%d", id, j.Health, res.NumShardsMigrated, j.slabIdx+1, j.batchSize)
}
}(w)
}
Expand All @@ -151,10 +163,7 @@ OUTER:
// recompute health.
start := time.Now()
if err := b.RefreshHealth(ctx); err != nil {
rerr := m.ap.alerts.RegisterAlert(ctx, newRefreshHealthFailedAlert(err))
if rerr != nil {
m.logger.Errorf("failed to register alert: err %v", rerr)
}
m.ap.RegisterAlert(ctx, newRefreshHealthFailedAlert(err))
m.logger.Errorf("failed to recompute cached health before migration", err)
return
}
Expand Down Expand Up @@ -193,7 +202,7 @@ OUTER:
toMigrate = append(toMigrate, *slab)
}

// sort the newsly added slabs by health
// sort the newly added slabs by health
newSlabs := toMigrate[len(toMigrate)-len(migrateNewMap):]
sort.Slice(newSlabs, func(i, j int) bool {
return newSlabs[i].Health < newSlabs[j].Health
Expand All @@ -205,10 +214,7 @@ OUTER:

// register an alert to notify users about ongoing migrations.
if len(toMigrate) > 0 {
err = m.ap.alerts.RegisterAlert(ctx, newOngoingMigrationsAlert(len(toMigrate)))
if err != nil {
m.logger.Errorf("failed to register alert: err %v", err)
}
m.ap.RegisterAlert(ctx, newOngoingMigrationsAlert(len(toMigrate)))
}

// return if there are no slabs to migrate
Expand All @@ -223,7 +229,7 @@ OUTER:
case <-m.signalMaintenanceFinished:
m.logger.Info("migrations interrupted - updating slabs for migration")
continue OUTER
case jobs <- job{slab, i, len(toMigrate), set}:
case jobs <- job{slab, i, len(toMigrate), set, b}:
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion worker/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -1093,7 +1093,12 @@ func (s *slabDownload) finish() ([][]byte, error) {
unused++
}
}
return nil, fmt.Errorf("failed to download slab: completed=%d, inflight=%d, launched=%d downloaders=%d unused=%d errors=%d %w", s.numCompleted, s.numInflight, s.numLaunched, s.mgr.numDownloaders(), unused, len(s.errs), s.errs)

err := fmt.Errorf("failed to download slab: completed=%d, inflight=%d, launched=%d downloaders=%d unused=%d errors=%d %v", s.numCompleted, s.numInflight, s.numLaunched, s.mgr.numDownloaders(), unused, len(s.errs), s.errs)
if s.numCompleted+s.errs.NumGouging() >= s.minShards {
err = fmt.Errorf("%w; %v", api.ErrGougingPreventedDownload, err)
}
return nil, err
}
return s.sectors, nil
}
Expand Down
10 changes: 10 additions & 0 deletions worker/rhpv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ func (he HostError) Unwrap() error {
// A HostErrorSet is a collection of errors from various hosts.
type HostErrorSet []*HostError

// NumGouging returns numbers of host that errored out due to price gouging.
func (hes HostErrorSet) NumGouging() (n int) {
for _, he := range hes {
if errors.Is(he.Err, errPriceTableGouging) {
n++
}
}
return
}

// Error implements error.
func (hes HostErrorSet) Error() string {
strs := make([]string, len(hes))
Expand Down
7 changes: 5 additions & 2 deletions worker/rhpv3.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ var (
// valid.
errPriceTableExpired = errors.New("price table requested is expired")

// errPriceTableGouging is returned when the host is price gouging.
errPriceTableGouging = errors.New("host price table gouging")

// errPriceTableNotFound is returned by the host when it can not find a
// price table that corresponds with the id we sent it.
errPriceTableNotFound = errors.New("price table not found")
Expand All @@ -85,7 +88,6 @@ func isClosedStream(err error) bool {
return isError(err, mux.ErrClosedStream) || isError(err, net.ErrClosed)
}
func isInsufficientFunds(err error) bool { return isError(err, ErrInsufficientFunds) }
func isMaxRevisionReached(err error) bool { return isError(err, errMaxRevisionReached) }
func isPriceTableExpired(err error) bool { return isError(err, errPriceTableExpired) }
func isPriceTableNotFound(err error) bool { return isError(err, errPriceTableNotFound) }
func isSectorNotFound(err error) bool {
Expand Down Expand Up @@ -593,7 +595,7 @@ func (h *host) priceTable(ctx context.Context, rev *types.FileContractRevision)
return rhpv3.HostPriceTable{}, err
}
if breakdown := gc.Check(nil, &pt.HostPriceTable); breakdown.Gouging() {
return rhpv3.HostPriceTable{}, fmt.Errorf("host price table gouging: %v", breakdown)
return rhpv3.HostPriceTable{}, fmt.Errorf("%w: %v", errPriceTableGouging, breakdown)
}
return pt.HostPriceTable, nil
}
Expand All @@ -603,6 +605,7 @@ func (h *host) DownloadSector(ctx context.Context, w io.Writer, root types.Hash2
if err != nil {
return err
}

// return errBalanceInsufficient if balance insufficient
defer func() {
if isBalanceInsufficient(err) {
Expand Down

0 comments on commit fb9a849

Please sign in to comment.