Skip to content

Commit

Permalink
Merge pull request #755 from SiaFoundation/pj/migration-estimates
Browse files Browse the repository at this point in the history
Migration Estimates
  • Loading branch information
ChrisSchinnerl committed Nov 27, 2023
2 parents 9302a83 + 0849bcc commit fbc581b
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 136 deletions.
8 changes: 7 additions & 1 deletion autopilot/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,18 @@ func newLostSectorsAlert(hk types.PublicKey, lostSectors uint64) alerts.Alert {
}
}

func newOngoingMigrationsAlert(n int) alerts.Alert {
func newOngoingMigrationsAlert(n int, estimate time.Duration) alerts.Alert {
data := make(map[string]interface{})
if rounded := estimate.Round(time.Minute); rounded > 0 {
data["estimate"] = fmt.Sprintf("~%v remaining", rounded)
}

return alerts.Alert{
ID: alertMigrationID,
Severity: alerts.SeverityInfo,
Message: fmt.Sprintf("Migrating %d slabs", n),
Timestamp: time.Now(),
Data: data,
}
}

Expand Down
42 changes: 31 additions & 11 deletions autopilot/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"go.sia.tech/renterd/api"
"go.sia.tech/renterd/object"
"go.sia.tech/renterd/stats"
"go.sia.tech/renterd/tracing"
"go.uber.org/zap"
)
Expand All @@ -26,6 +27,7 @@ type (
healthCutoff float64
parallelSlabsPerWorker uint64
signalMaintenanceFinished chan struct{}
statsSlabMigrationSpeedMS *stats.DataPoints

mu sync.Mutex
migrating bool
Expand Down Expand Up @@ -65,6 +67,7 @@ func newMigrator(ap *Autopilot, healthCutoff float64, parallelSlabsPerWorker uin
healthCutoff: healthCutoff,
parallelSlabsPerWorker: parallelSlabsPerWorker,
signalMaintenanceFinished: make(chan struct{}, 1),
statsSlabMigrationSpeedMS: stats.New(time.Hour),
}
}

Expand All @@ -81,6 +84,20 @@ func (m *migrator) Status() (bool, time.Time) {
return m.migrating, m.migratingLastStart
}

func (m *migrator) slabMigrationEstimate(remaining int) time.Duration {
// recompute p90
m.statsSlabMigrationSpeedMS.Recompute()

// return 0 if p90 is 0 (can happen if we haven't collected enough data points)
p90 := m.statsSlabMigrationSpeedMS.P90()
if p90 == 0 {
return 0
}

totalNumMS := float64(remaining) * p90 / float64(m.parallelSlabsPerWorker)
return time.Duration(totalNumMS) * time.Millisecond
}

func (m *migrator) tryPerformMigrations(ctx context.Context, wp *workerPool) {
m.mu.Lock()
if m.migrating || m.ap.isStopped() {
Expand Down Expand Up @@ -133,24 +150,27 @@ func (m *migrator) performMigrations(p *workerPool) {

// process jobs
for j := range jobs {
start := time.Now()
res, err := j.execute(ctx, w)
m.statsSlabMigrationSpeedMS.Track(float64(time.Since(start).Milliseconds()))

if err != nil {
m.logger.Errorf("%v: migration %d/%d failed, key: %v, health: %v, overpaid: %v, err: %v", id, j.slabIdx+1, j.batchSize, j.Key, j.Health, res.SurchargeApplied, err)
if res.SurchargeApplied {
m.ap.RegisterAlert(ctx, newCriticalMigrationFailedAlert(j.Key, j.Health, err))
} else {
m.ap.RegisterAlert(ctx, newMigrationFailedAlert(j.Key, j.Health, err))
}
continue
}

m.logger.Infof("%v: migration %d/%d succeeded, key: %v, health: %v, overpaid: %v, shards migrated: %v", id, j.slabIdx+1, j.batchSize, j.Key, j.Health, res.SurchargeApplied, res.NumShardsMigrated)
m.ap.DismissAlert(ctx, alertIDForSlab(alertMigrationID, j.Key))
if res.SurchargeApplied {
// this alert confirms the user his gouging settings
// are working, it will be dismissed automatically
// the next time this slab is successfully migrated
m.ap.RegisterAlert(ctx, newCriticalMigrationSucceededAlert(j.Key))
} else {
m.logger.Infof("%v: migration %d/%d succeeded, key: %v, health: %v, overpaid: %v, shards migrated: %v", id, j.slabIdx+1, j.batchSize, j.Key, j.Health, res.SurchargeApplied, res.NumShardsMigrated)
m.ap.DismissAlert(ctx, alertIDForSlab(alertMigrationID, j.Key))
if res.SurchargeApplied {
// this alert confirms the user his gouging
// settings are working, it will be dismissed
// automatically the next time this slab is
// successfully migrated
m.ap.RegisterAlert(ctx, newCriticalMigrationSucceededAlert(j.Key))
}
}
}
}(w)
Expand Down Expand Up @@ -228,7 +248,7 @@ OUTER:

// register an alert to notify users about ongoing migrations.
if len(toMigrate) > 0 {
m.ap.RegisterAlert(ctx, newOngoingMigrationsAlert(len(toMigrate)))
m.ap.RegisterAlert(ctx, newOngoingMigrationsAlert(len(toMigrate), m.slabMigrationEstimate(len(toMigrate))))
}

// return if there are no slabs to migrate
Expand Down
19 changes: 9 additions & 10 deletions internal/testing/pruning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package testing
import (
"bytes"
"context"
"errors"
"fmt"
"math"
"strings"
Expand Down Expand Up @@ -69,20 +68,20 @@ func TestHostPruning(t *testing.T) {
// wait for the autopilot loop to finish at least once
recordFailedInteractions(9, h1.PublicKey())

// assert the autopilot loop ran at least once by successfully triggering it twice
// trigger the autopilot loop twice, failing to trigger it twice shouldn't
// fail the test, this avoids an NDF on windows
remaining := 2
tt.Retry(100, 50*time.Millisecond, func() error {
triggered, err := a.Trigger(true)
for i := 1; i < 100; i++ {
triggered, err := a.Trigger(false)
tt.OK(err)

if triggered {
remaining--
if remaining == 0 {
break
}
}
if remaining > 0 {
return errors.New("failed to trigger the autopilot loop")
}
return nil
})
time.Sleep(50 * time.Millisecond)
}

// assert the host was not pruned
hostss, err := b.Hosts(context.Background(), api.GetHostsOptions{})
Expand Down
121 changes: 121 additions & 0 deletions stats/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package stats

import (
"math"
"sync"
"time"

"github.com/montanaflynn/stats"
)

const (
statsDecayHalfTime = 10 * time.Minute
statsDecayThreshold = 5 * time.Minute
)

type (
DataPoints struct {
stats.Float64Data
halfLife time.Duration
size int

mu sync.Mutex
cnt int
p90 float64
lastDatapoint time.Time
lastDecay time.Time
}

Float64Data = stats.Float64Data
)

func Default() *DataPoints {
return New(statsDecayHalfTime)
}

func NoDecay() *DataPoints {
return New(0)
}

func New(halfLife time.Duration) *DataPoints {
return &DataPoints{
size: 1000,
Float64Data: make([]float64, 0),
halfLife: halfLife,
lastDecay: time.Now(),
}
}

func (a *DataPoints) Average() float64 {
a.mu.Lock()
defer a.mu.Unlock()
avg, err := a.Mean()
if err != nil {
avg = 0
}
return avg
}

func (a *DataPoints) P90() float64 {
a.mu.Lock()
defer a.mu.Unlock()
return a.p90
}

func (a *DataPoints) Recompute() {
a.mu.Lock()
defer a.mu.Unlock()

// apply decay
a.tryDecay()

// recalculate the p90
p90, err := a.Percentile(90)
if err != nil {
p90 = 0
}
a.p90 = p90
}

func (a *DataPoints) Track(p float64) {
a.mu.Lock()
defer a.mu.Unlock()

if a.cnt < a.size {
a.Float64Data = append(a.Float64Data, p)
} else {
a.Float64Data[a.cnt%a.size] = p
}

a.lastDatapoint = time.Now()
a.cnt++
}

func (a *DataPoints) tryDecay() {
// return if decay is disabled
if a.halfLife == 0 {
return
}

// return if decay is not needed
if time.Since(a.lastDatapoint) < statsDecayThreshold {
return
}

// return if decay is not due
decayFreq := a.halfLife / 5
timePassed := time.Since(a.lastDecay)
if timePassed < decayFreq {
return
}

// calculate decay and apply it
strength := float64(timePassed) / float64(a.halfLife)
decay := math.Floor(math.Pow(0.5, strength)*100) / 100 // round down to 2 decimals
for i := range a.Float64Data {
a.Float64Data[i] *= decay
}

// update the last decay time
a.lastDecay = time.Now()
}
17 changes: 9 additions & 8 deletions worker/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/object"
"go.sia.tech/renterd/stats"
"go.sia.tech/renterd/tracing"
"go.uber.org/zap"
"lukechampine.com/frand"
Expand All @@ -42,8 +43,8 @@ type (
maxOverdrive uint64
overdriveTimeout time.Duration

statsOverdrivePct *dataPoints
statsSlabDownloadSpeedBytesPerMS *dataPoints
statsOverdrivePct *stats.DataPoints
statsSlabDownloadSpeedBytesPerMS *stats.DataPoints

stopChan chan struct{}

Expand All @@ -55,8 +56,8 @@ type (
downloader struct {
host hostV3

statsDownloadSpeedBytesPerMS *dataPoints // keep track of this separately for stats (no decay is applied)
statsSectorDownloadEstimateInMS *dataPoints
statsDownloadSpeedBytesPerMS *stats.DataPoints // keep track of this separately for stats (no decay is applied)
statsSectorDownloadEstimateInMS *stats.DataPoints

signalWorkChan chan struct{}
stopChan chan struct{}
Expand Down Expand Up @@ -173,8 +174,8 @@ func newDownloadManager(hp hostProvider, pss partialSlabStore, slm sectorLostMar
maxOverdrive: maxOverdrive,
overdriveTimeout: overdriveTimeout,

statsOverdrivePct: newDataPoints(0),
statsSlabDownloadSpeedBytesPerMS: newDataPoints(0),
statsOverdrivePct: stats.NoDecay(),
statsSlabDownloadSpeedBytesPerMS: stats.NoDecay(),

stopChan: make(chan struct{}),

Expand All @@ -186,8 +187,8 @@ func newDownloader(host hostV3) *downloader {
return &downloader{
host: host,

statsSectorDownloadEstimateInMS: newDataPoints(statsDecayHalfTime),
statsDownloadSpeedBytesPerMS: newDataPoints(0), // no decay for exposed stats
statsSectorDownloadEstimateInMS: stats.Default(),
statsDownloadSpeedBytesPerMS: stats.NoDecay(),

signalWorkChan: make(chan struct{}, 1),
stopChan: make(chan struct{}),
Expand Down
Loading

0 comments on commit fbc581b

Please sign in to comment.