Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migration Critical Downloads #717

Merged
merged 18 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/autopilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ func (sb HostScoreBreakdown) String() string {
return fmt.Sprintf("Age: %v, Col: %v, Int: %v, SR: %v, UT: %v, V: %v, Pr: %v", sb.Age, sb.Collateral, sb.Interactions, sb.StorageRemaining, sb.Uptime, sb.Version, sb.Prices)
}

func (hgb HostGougingBreakdown) DownloadGouging() bool {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leaving comment to prevent merge - should extend UI to make sure we can set the surcharge multiplier

return hgb.V3.DownloadErr != ""
}

func (hgb HostGougingBreakdown) Gouging() bool {
return hgb.V2.Gouging() || hgb.V3.Gouging()
}
Expand Down
6 changes: 6 additions & 0 deletions api/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ type (
// MinMaxEphemeralAccountBalance is the minimum accepted value for
// `MaxEphemeralAccountBalance` in the host's price settings.
MinMaxEphemeralAccountBalance types.Currency `json:"minMaxEphemeralAccountBalance"`

// MigrationSurchargeMultiplier is the multiplier applied to the
// 'MaxDownloadPrice' when checking whether a host is too expensive,
// this multiplier is only applied for when trying to migrate critically
// low-health slabs.
MigrationSurchargeMultiplier uint64 `json:"migrationSurchargeMultiplier"`
}

// RedundancySettings contain settings that dictate an object's redundancy.
Expand Down
4 changes: 3 additions & 1 deletion api/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ type (

// MigrateSlabResponse is the response type for the /slab/migrate endpoint.
MigrateSlabResponse struct {
NumShardsMigrated int `json:"numShardsMigrated"`
NumShardsMigrated int `json:"numShardsMigrated"`
SurchargeApplied bool `json:"surchargeApplied,omitempty"`
Error string `json:"error,omitempty"`
}

// RHPFormRequest is the request type for the /rhp/form endpoint.
Expand Down
38 changes: 33 additions & 5 deletions autopilot/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func alertIDForHost(alertID [32]byte, hk types.PublicKey) types.Hash256 {
return types.HashBytes(append(alertID[:], hk[:]...))
}

func alertIDForSlab(alertID [32]byte, slab object.Slab) types.Hash256 {
return types.HashBytes(append(alertID[:], []byte(slab.Key.String())...))
func alertIDForSlab(alertID [32]byte, slabKey object.EncryptionKey) types.Hash256 {
return types.HashBytes(append(alertID[:], []byte(slabKey.String())...))
}

func randomAlertID() types.Hash256 {
Expand Down Expand Up @@ -159,7 +159,35 @@ func newOngoingMigrationsAlert(n int) alerts.Alert {
}
}

func newSlabMigrationFailedAlert(slab object.Slab, health float64, err error) alerts.Alert {
func newCriticalMigrationSucceededAlert(slabKey object.EncryptionKey) alerts.Alert {
return alerts.Alert{
ID: alertIDForSlab(alertMigrationID, slabKey),
Severity: alerts.SeverityInfo,
Message: "Critical migration succeeded",
Data: map[string]interface{}{
"slabKey": slabKey.String(),
"hint": "This migration succeeded thanks to the MigrationSurchargeMultiplier in the gouging settings that allowed overpaying hosts on some critical sector downloads",
},
Timestamp: time.Now(),
}
}

func newCriticalMigrationFailedAlert(slabKey object.EncryptionKey, health float64, err error) alerts.Alert {
return alerts.Alert{
ID: alertIDForSlab(alertMigrationID, slabKey),
Severity: alerts.SeverityCritical,
Message: "Critical migration failed",
Data: map[string]interface{}{
"error": err.Error(),
"health": health,
"slabKey": slabKey.String(),
"hint": "If migrations of low-health slabs fail, it might be necessary to increase the MigrationSurchargeMultiplier in the gouging settings to ensure it has every chance of succeeding.",
},
Timestamp: time.Now(),
}
}

func newMigrationFailedAlert(slabKey object.EncryptionKey, health float64, err error) alerts.Alert {
severity := alerts.SeverityError
if health < 0.25 {
severity = alerts.SeverityCritical
Expand All @@ -168,13 +196,13 @@ func newSlabMigrationFailedAlert(slab object.Slab, health float64, err error) al
}

return alerts.Alert{
ID: alertIDForSlab(alertMigrationID, slab),
ID: alertIDForSlab(alertMigrationID, slabKey),
Severity: severity,
Message: "Slab migration failed",
Data: map[string]interface{}{
"error": err.Error(),
"health": health,
"slabKey": slab.Key.String(),
"slabKey": slabKey.String(),
"hint": "Migration failures can be temporary, but if they persist it can eventually lead to data loss and should therefor be taken very seriously.",
},
Timestamp: time.Now(),
Expand Down
102 changes: 62 additions & 40 deletions autopilot/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package autopilot

import (
"context"
"errors"
"fmt"
"math"
"sort"
"sync"
Expand All @@ -17,16 +19,43 @@ const (
migratorBatchSize = math.MaxInt // TODO: change once we have a fix for the infinite loop
)

type migrator struct {
ap *Autopilot
logger *zap.SugaredLogger
healthCutoff float64
parallelSlabsPerWorker uint64
signalMaintenanceFinished chan struct{}
type (
migrator struct {
ap *Autopilot
logger *zap.SugaredLogger
healthCutoff float64
parallelSlabsPerWorker uint64
signalMaintenanceFinished chan struct{}

mu sync.Mutex
migrating bool
migratingLastStart time.Time
}

job struct {
api.UnhealthySlab
slabIdx int
batchSize int
set string

mu sync.Mutex
migrating bool
migratingLastStart time.Time
b Bus
}
)

func (j *job) execute(ctx context.Context, w Worker) (_ api.MigrateSlabResponse, err error) {
slab, err := j.b.Slab(ctx, j.Key)
if err != nil {
return api.MigrateSlabResponse{}, fmt.Errorf("failed to fetch slab; %w", err)
}

res, err := w.MigrateSlab(ctx, slab, j.set)
if err != nil {
return api.MigrateSlabResponse{}, fmt.Errorf("failed to migrate slab; %w", err)
} else if res.Error != "" {
return res, fmt.Errorf("failed to migrate slab; %w", errors.New(res.Error))
}

return res, nil
}

func newMigrator(ap *Autopilot, healthCutoff float64, parallelSlabsPerWorker uint64) *migrator {
Expand Down Expand Up @@ -75,16 +104,11 @@ func (m *migrator) tryPerformMigrations(ctx context.Context, wp *workerPool) {
func (m *migrator) performMigrations(p *workerPool) {
m.logger.Info("performing migrations")
b := m.ap.bus

ctx, span := tracing.Tracer.Start(context.Background(), "migrator.performMigrations")
defer span.End()

// prepare a channel to push work to the workers
type job struct {
api.UnhealthySlab
slabIdx int
batchSize int
set string
}
jobs := make(chan job)
var wg sync.WaitGroup
defer func() {
Expand All @@ -100,32 +124,34 @@ func (m *migrator) performMigrations(p *workerPool) {
go func(w Worker) {
defer wg.Done()

// fetch worker id once
id, err := w.ID(ctx)
if err != nil {
m.logger.Errorf("failed to fetch worker id: %v", err)
return
}

// process jobs
for j := range jobs {
slab, err := b.Slab(ctx, j.Key)
if err != nil {
m.logger.Errorf("%v: failed to fetch slab for migration %d/%d, health: %v, err: %v", id, j.slabIdx+1, j.batchSize, j.Health, err)
continue
}
ap, err := b.Autopilot(ctx, m.ap.id)
res, err := j.execute(ctx, w)
if err != nil {
m.logger.Errorf("%v: failed to fetch autopilot settings for migration %d/%d, health: %v, err: %v", id, j.slabIdx+1, j.batchSize, j.Health, err)
m.logger.Errorf("%v: migration %d/%d failed, key: %v, health: %v, overpaid: %v, err: %v", id, j.slabIdx+1, j.batchSize, j.Key, j.Health, res.SurchargeApplied, err)
if res.SurchargeApplied {
m.ap.RegisterAlert(ctx, newCriticalMigrationFailedAlert(j.Key, j.Health, err))
} else {
m.ap.RegisterAlert(ctx, newMigrationFailedAlert(j.Key, j.Health, err))
}
continue
}
res, err := w.MigrateSlab(ctx, slab, ap.Config.Contracts.Set)
if err != nil {
m.ap.RegisterAlert(ctx, newSlabMigrationFailedAlert(slab, j.Health, err))
m.logger.Errorf("%v: failed to migrate slab %d/%d, health: %v, err: %v", id, j.slabIdx+1, j.batchSize, j.Health, err)
continue
} else {
m.ap.DismissAlert(ctx, alertIDForSlab(alertMigrationID, slab))

m.logger.Infof("%v: migration %d/%d succeeded, key: %v, health: %v, overpaid: %v, shards migrated: %v", id, j.slabIdx+1, j.batchSize, j.Key, j.Health, res.SurchargeApplied, res.NumShardsMigrated)
m.ap.DismissAlert(ctx, alertIDForSlab(alertMigrationID, j.Key))
if res.SurchargeApplied {
// this alert confirms the user his gouging settings
// are working, it will be dismissed automatically
// the next time this slab is successfully migrated
m.ap.RegisterAlert(ctx, newCriticalMigrationSucceededAlert(j.Key))
}
m.logger.Debugf("%v: successfully migrated slab (health: %v migrated shards: %d) %d/%d", id, j.Health, res.NumShardsMigrated, j.slabIdx+1, j.batchSize)
}
}(w)
}
Expand All @@ -151,10 +177,7 @@ OUTER:
// recompute health.
start := time.Now()
if err := b.RefreshHealth(ctx); err != nil {
rerr := m.ap.alerts.RegisterAlert(ctx, newRefreshHealthFailedAlert(err))
if rerr != nil {
m.logger.Errorf("failed to register alert: err %v", rerr)
}
m.ap.RegisterAlert(ctx, newRefreshHealthFailedAlert(err))
m.logger.Errorf("failed to recompute cached health before migration", err)
return
}
Expand Down Expand Up @@ -193,7 +216,7 @@ OUTER:
toMigrate = append(toMigrate, *slab)
}

// sort the newsly added slabs by health
// sort the newly added slabs by health
newSlabs := toMigrate[len(toMigrate)-len(migrateNewMap):]
sort.Slice(newSlabs, func(i, j int) bool {
return newSlabs[i].Health < newSlabs[j].Health
Expand All @@ -205,10 +228,7 @@ OUTER:

// register an alert to notify users about ongoing migrations.
if len(toMigrate) > 0 {
err = m.ap.alerts.RegisterAlert(ctx, newOngoingMigrationsAlert(len(toMigrate)))
if err != nil {
m.logger.Errorf("failed to register alert: err %v", err)
}
m.ap.RegisterAlert(ctx, newOngoingMigrationsAlert(len(toMigrate)))
}

// return if there are no slabs to migrate
Expand All @@ -223,8 +243,10 @@ OUTER:
case <-m.signalMaintenanceFinished:
m.logger.Info("migrations interrupted - updating slabs for migration")
continue OUTER
case jobs <- job{slab, i, len(toMigrate), set}:
case jobs <- job{slab, i, len(toMigrate), set, b}:
}
}

return
}
}
1 change: 1 addition & 0 deletions build/env_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var (
MinPriceTableValidity: 5 * time.Minute, // 5 minutes
MinAccountExpiry: 24 * time.Hour, // 1 day
MinMaxEphemeralAccountBalance: types.Siacoins(1), // 1 SC
MigrationSurchargeMultiplier: 10, // 10x
}

// DefaultUploadPackingSettings define the default upload packing settings
Expand Down
1 change: 1 addition & 0 deletions build/env_testnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ var (
MinPriceTableValidity: 5 * time.Minute, // 5 minutes
MinAccountExpiry: 24 * time.Hour, // 1 day
MinMaxEphemeralAccountBalance: types.Siacoins(1), // 1 SC
MigrationSurchargeMultiplier: 10, // 10x
}

// DefaultUploadPackingSettings define the default upload packing settings
Expand Down
4 changes: 2 additions & 2 deletions internal/testing/gouging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestGouging(t *testing.T) {
// create a new test cluster
cluster := newTestCluster(t, testClusterOptions{
hosts: int(testAutopilotConfig.Contracts.Amount),
logger: newTestLoggerCustom(zapcore.DebugLevel),
logger: newTestLoggerCustom(zapcore.ErrorLevel),
})
defer cluster.Shutdown()

Expand Down Expand Up @@ -88,6 +88,6 @@ func TestGouging(t *testing.T) {
// download the data - should fail
buffer.Reset()
if err := w.DownloadObject(context.Background(), &buffer, api.DefaultBucketName, path, api.DownloadObjectOptions{}); err == nil {
t.Fatal("expected download to fail")
t.Fatal("expected download to fail", err)
}
}
2 changes: 1 addition & 1 deletion stores/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -1692,7 +1692,7 @@ func (ss *SQLStore) UpdateSlab(ctx context.Context, s object.Slab, contractSet s
// make sure the roots stay the same.
for i, shard := range s.Shards {
if shard.Root != types.Hash256(slab.Shards[i].Root) {
return fmt.Errorf("%w: shard %v has changed root from %v to %v", errShardRootChanged, i, slab.Shards[i].Root, shard.Root)
return fmt.Errorf("%w: shard %v has changed root from %v to %v", errShardRootChanged, i, slab.Shards[i].Root, shard.Root[:])
}
}

Expand Down
54 changes: 54 additions & 0 deletions stores/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package stores

import (
"context"
"encoding/json"
"errors"
"fmt"
"reflect"
"strings"
Expand All @@ -10,6 +12,7 @@ import (
"go.sia.tech/renterd/api"
"go.uber.org/zap"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)

var (
Expand Down Expand Up @@ -321,6 +324,12 @@ func performMigrations(db *gorm.DB, logger *zap.SugaredLogger) error {
return performMigration00029_contractPrice(tx, logger)
},
},
{
ID: "00030_defaultMigrationSurchargeMultiplier",
Migrate: func(tx *gorm.DB) error {
return performMigration00030_defaultMigrationSurchargeMultiplier(tx, logger)
},
},
}
// Create migrator.
m := gormigrate.New(db, gormigrate.DefaultOptions, migrations)
Expand Down Expand Up @@ -1306,3 +1315,48 @@ func performMigration00029_contractPrice(txn *gorm.DB, logger *zap.SugaredLogger
logger.Info("migration 00029_contractPrice complete")
return nil
}

func performMigration00030_defaultMigrationSurchargeMultiplier(txn *gorm.DB, logger *zap.SugaredLogger) error {
logger.Info("performing migration 00030_defaultMigrationSurchargeMultiplier")

// fetch setting
var entry dbSetting
if err := txn.
Where(&dbSetting{Key: api.SettingGouging}).
Take(&entry).
Error; errors.Is(err, gorm.ErrRecordNotFound) {
logger.Debugf("no gouging settings found, skipping migration")
return nil
} else if err != nil {
return fmt.Errorf("failed to fetch gouging settings: %w", err)
}

// unmarshal setting into gouging settings
var gs api.GougingSettings
if err := json.Unmarshal([]byte(entry.Value), &gs); err != nil {
return err
}

// set default value
if gs.MigrationSurchargeMultiplier == 0 {
gs.MigrationSurchargeMultiplier = 10
}

// update setting
if err := gs.Validate(); err != nil {
return err
} else if bytes, err := json.Marshal(gs); err != nil {
return err
} else if err := txn.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "key"}},
DoUpdates: clause.AssignmentColumns([]string{"value"}),
}).Create(&dbSetting{
Key: api.SettingGouging,
Value: string(bytes),
}).Error; err != nil {
return err
}

logger.Info("migration 00030_defaultMigrationSurchargeMultiplier complete")
return nil
}
Loading