From 6b75878f7fa9837399766333295dc62cf383feb9 Mon Sep 17 00:00:00 2001 From: PJ Date: Wed, 8 Nov 2023 12:09:03 +0100 Subject: [PATCH] worker: revert download refactor --- autopilot/alerts.go | 15 +++- autopilot/migrator.go | 15 +++- internal/testing/gouging_test.go | 6 +- stores/metadata.go | 4 +- worker/download.go | 126 ++++++++++++++----------------- worker/rhpv2.go | 41 +++------- worker/upload.go | 2 +- worker/worker.go | 4 +- 8 files changed, 104 insertions(+), 109 deletions(-) diff --git a/autopilot/alerts.go b/autopilot/alerts.go index 16a981247..8e4e6881c 100644 --- a/autopilot/alerts.go +++ b/autopilot/alerts.go @@ -140,11 +140,24 @@ func newOngoingMigrationsAlert(n int) alerts.Alert { } } +func newCriticalMigrationSucceededAlert(slabKey object.EncryptionKey) alerts.Alert { + return alerts.Alert{ + ID: alertIDForSlab(alertMigrationID, slabKey), + Severity: alerts.SeverityInfo, + Message: "Critical migration succeeded", + Data: map[string]interface{}{ + "slabKey": slabKey.String(), + "hint": "This migration succeeded thanks to the MigrationSurchargeMultiplier in the gouging settings that allowed overpaying hosts on some critical sector downloads", + }, + Timestamp: time.Now(), + } +} + func newCriticalMigrationFailedAlert(slabKey object.EncryptionKey, health float64, err error) alerts.Alert { return alerts.Alert{ ID: alertIDForSlab(alertMigrationID, slabKey), Severity: alerts.SeverityCritical, - Message: "Critical slab migration failed", + Message: "Critical migration failed", Data: map[string]interface{}{ "error": err.Error(), "health": health, diff --git a/autopilot/migrator.go b/autopilot/migrator.go index 341b518e7..054cf8ca8 100644 --- a/autopilot/migrator.go +++ b/autopilot/migrator.go @@ -141,9 +141,16 @@ func (m *migrator) performMigrations(p *workerPool) { } else { m.ap.RegisterAlert(ctx, newMigrationFailedAlert(j.Key, j.Health, err)) } - } else { - m.logger.Infof("%v: migration %d/%d succeeded, key: %v, health: %v, shards migrated: %v", id, j.slabIdx+1, j.batchSize, j.Key, j.Health, res.NumShardsMigrated) - m.ap.DismissAlert(ctx, alertIDForSlab(alertMigrationID, j.Key)) + continue + } + + m.logger.Infof("%v: migration %d/%d succeeded, key: %v, health: %v, overpaid: %v, shards migrated: %v", id, j.slabIdx+1, j.batchSize, j.Key, j.Health, res.Overpaid, res.NumShardsMigrated) + m.ap.DismissAlert(ctx, alertIDForSlab(alertMigrationID, j.Key)) + if res.Overpaid { + // this alert confirms the user his gouging settings + // are working, it will be dismissed automatically + // the next time this slab is successfully migrated + m.ap.RegisterAlert(ctx, newCriticalMigrationSucceededAlert(j.Key)) } } }(w) @@ -239,5 +246,7 @@ OUTER: case jobs <- job{slab, i, len(toMigrate), set, b}: } } + + return } } diff --git a/internal/testing/gouging_test.go b/internal/testing/gouging_test.go index ef390fb1b..199959cb1 100644 --- a/internal/testing/gouging_test.go +++ b/internal/testing/gouging_test.go @@ -22,7 +22,7 @@ func TestGouging(t *testing.T) { // create a new test cluster cluster := newTestCluster(t, testClusterOptions{ hosts: int(testAutopilotConfig.Contracts.Amount), - logger: newTestLoggerCustom(zapcore.DebugLevel), + logger: newTestLoggerCustom(zapcore.ErrorLevel), }) defer cluster.Shutdown() @@ -87,7 +87,9 @@ func TestGouging(t *testing.T) { // download the data - should fail buffer.Reset() - if err := w.DownloadObject(context.Background(), &buffer, api.DefaultBucketName, path, api.DownloadObjectOptions{}); err == nil { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + if err := w.DownloadObject(ctx, &buffer, api.DefaultBucketName, path, api.DownloadObjectOptions{}); err == nil { t.Fatal("expected download to fail") } } diff --git a/stores/metadata.go b/stores/metadata.go index 85834fbab..1e186a57b 100644 --- a/stores/metadata.go +++ b/stores/metadata.go @@ -1581,8 +1581,8 @@ func (ss *SQLStore) UpdateSlab(ctx context.Context, s object.Slab, contractSet s // make sure the roots stay the same. for i, shard := range s.Shards { - if bytes.Equal(shard.Root[:], slab.Shards[i].Root) { - return fmt.Errorf("%w: shard %v has changed root from %v to %v", errShardRootChanged, i, slab.Shards[i].Root, shard.Root) + if !bytes.Equal(shard.Root[:], slab.Shards[i].Root) { + return fmt.Errorf("%w: shard %v has changed root from %v to %v", errShardRootChanged, i, slab.Shards[i].Root, shard.Root[:]) } } diff --git a/worker/download.go b/worker/download.go index 050b092eb..3d4171f0d 100644 --- a/worker/download.go +++ b/worker/download.go @@ -566,6 +566,7 @@ func (mgr *downloadManager) newSlabDownload(ctx context.Context, dID id, slice o used: make(map[types.PublicKey]struct{}), sectors: make([][]byte, len(slice.Shards)), + errs: make(HostErrorSet), } } @@ -577,19 +578,14 @@ func (mgr *downloadManager) downloadSlab(ctx context.Context, dID id, slice obje // prepare new download slab := mgr.newSlabDownload(ctx, dID, slice, index, migration, nextSlabChan) - // start downloading - shards, overpaid, err := slab.download(ctx) - if err != nil { - responseChan <- &slabDownloadResponse{ - index: index, - err: err, - } - } else { - responseChan <- &slabDownloadResponse{ - index: index, - shards: shards, - overpaid: overpaid, - } + // execute download + resp := &slabDownloadResponse{index: index} + resp.shards, resp.overpaid, resp.err = slab.download(ctx) + + // send the response + select { + case <-ctx.Done(): + case responseChan <- resp: } } @@ -976,9 +972,10 @@ func (s *slabDownload) nextRequest(ctx context.Context, resps *sectorResponses, hk: sector.Host, // overpay is set to 'true' when a request is retried after the slab - // download failed, and we realise that it might have succeeded if we - // allowed overpaying for this download, we only do this for migrations - // and when the slab is below a certain health threshold + // download failed and we realise that it might have succeeded if we + // allowed overpaying for certain sectors, we only do this when trying + // to migrate a critically low-health slab that might otherwise be + // unrecoverable overpay: false, overdrive: overdrive, @@ -1019,10 +1016,10 @@ func (s *slabDownload) download(ctx context.Context) ([][]byte, bool, error) { var gouging []*sectorDownloadReq // collect responses - var lastResort bool + var done bool loop: - for s.ongoing() { + for s.inflight() > 0 && !done { select { case <-s.mgr.stopChan: return nil, false, errors.New("download stopped") @@ -1032,22 +1029,28 @@ loop: resetOverdrive() } - resps.Foreach(func(resp *sectorDownloadResp) { - if done := s.receive(*resp); done { - return + for { + resp := resps.Next() + if resp == nil { + break + } + + // receive the response + done = s.receive(*resp) + if done { + break } + // handle errors if resp.err != nil { // launch overdrive requests - if !lastResort { - for { - if req := s.nextRequest(ctx, resps, true); req != nil { - if err := s.launch(req); err != nil { - continue // try the next request if this fails to launch - } + for { + if req := s.nextRequest(ctx, resps, true); req != nil { + if err := s.launch(req); err != nil { + continue } - break } + break } // handle lost sectors @@ -1057,21 +1060,19 @@ loop: } else { s.mgr.logger.Infow("successfully marked sector as lost", "hk", resp.req.hk, "root", resp.req.root) } - } else if isPriceTableGouging(resp.err) && s.overpay { - resp.req.overpay = true + } else if isPriceTableGouging(resp.err) && s.overpay && !resp.req.overpay { + resp.req.overpay = true // ensures we don't retry the same request over and over again gouging = append(gouging, resp.req) } } - }) + } } - if !lastResort && !s.completed() && len(gouging) >= s.missing() { - lastResort = true + if !done && len(gouging) >= s.missing() { for _, req := range gouging { - if err := s.launch(req); err == nil { - s.errs.Remove(req.hk) - } + _ = s.launch(req) // ignore error } + gouging = nil goto loop } @@ -1130,16 +1131,10 @@ func (s *slabDownload) missing() int { return 0 } -func (s *slabDownload) completed() bool { - s.mu.Lock() - defer s.mu.Unlock() - return s.numCompleted >= s.minShards -} - -func (s *slabDownload) ongoing() bool { +func (s *slabDownload) inflight() uint64 { s.mu.Lock() defer s.mu.Unlock() - return s.numCompleted < s.minShards || s.numInflight > 0 + return s.numInflight } func (s *slabDownload) launch(req *sectorDownloadReq) error { @@ -1178,7 +1173,7 @@ func (s *slabDownload) launch(req *sectorDownloadReq) error { return nil } -func (s *slabDownload) receive(resp sectorDownloadResp) bool { +func (s *slabDownload) receive(resp sectorDownloadResp) (finished bool) { s.mu.Lock() defer s.mu.Unlock() @@ -1190,10 +1185,19 @@ func (s *slabDownload) receive(resp sectorDownloadResp) bool { // failed reqs can't complete the upload s.numInflight-- if resp.err != nil { - s.errs = append(s.errs, &HostError{resp.req.hk, resp.err}) + s.errs[resp.req.hk] = resp.err return false } + // try trigger next slab read + if !s.nextSlabTriggered && s.numCompleted+int(s.mgr.maxOverdrive) >= s.minShards { + select { + case <-s.nextSlabChan: + s.nextSlabTriggered = true + default: + } + } + // update num overpaid if resp.req.overpay { s.numOverpaid++ @@ -1203,15 +1207,6 @@ func (s *slabDownload) receive(resp sectorDownloadResp) bool { s.sectors[resp.req.sectorIndex] = resp.sector s.numCompleted++ - // try trigger next slab - if !s.nextSlabTriggered && s.numCompleted+int(s.mgr.maxOverdrive) >= s.minShards { - select { - case <-s.nextSlabChan: - s.nextSlabTriggered = true - default: - } - } - return s.numCompleted >= s.minShards } @@ -1322,18 +1317,13 @@ func (sr *sectorResponses) Close() error { return nil } -func (sr *sectorResponses) Foreach(fn func(res *sectorDownloadResp)) { - for { - sr.mu.Lock() - if len(sr.responses) == 0 { - sr.mu.Unlock() - return - } - - resp := sr.responses[0] - sr.responses = sr.responses[1:] - sr.mu.Unlock() - - fn(resp) +func (sr *sectorResponses) Next() *sectorDownloadResp { + sr.mu.Lock() + defer sr.mu.Unlock() + if len(sr.responses) == 0 { + return nil } + resp := sr.responses[0] + sr.responses = sr.responses[1:] + return resp } diff --git a/worker/rhpv2.go b/worker/rhpv2.go index 05b368323..3537858ce 100644 --- a/worker/rhpv2.go +++ b/worker/rhpv2.go @@ -50,51 +50,30 @@ var ( ErrContractFinalized = errors.New("contract cannot be revised further") ) -// A HostError associates an error with a given host. -type HostError struct { - HostKey types.PublicKey - Err error -} - -// Error implements error. -func (he HostError) Error() string { - return fmt.Sprintf("%x: %v", he.HostKey[:4], he.Err.Error()) -} - -// Unwrap returns the underlying error. -func (he HostError) Unwrap() error { - return he.Err -} - // A HostErrorSet is a collection of errors from various hosts. -type HostErrorSet []*HostError +type HostErrorSet map[types.PublicKey]error // NumGouging returns numbers of host that errored out due to price gouging. func (hes HostErrorSet) NumGouging() (n int) { for _, he := range hes { - if errors.Is(he.Err, errPriceTableGouging) { + if errors.Is(he, errPriceTableGouging) { n++ } } return } -// Remove removes all errors for the given host. -func (hes HostErrorSet) Remove(hk types.PublicKey) { - for i := 0; i < len(hes); i++ { - if hes[i].HostKey == hk { - hes = append(hes[:i], hes[i+1:]...) - i-- - } - } -} - // Error implements error. func (hes HostErrorSet) Error() string { - strs := make([]string, len(hes)) - for i := range strs { - strs[i] = hes[i].Error() + if len(hes) == 0 { + return "" } + + var strs []string + for hk, he := range hes { + strs = append(strs, fmt.Sprintf("%x: %v", hk[:4], he.Error())) + } + // include a leading newline so that the first error isn't printed on the // same line as the error context return "\n" + strings.Join(strs, "\n") diff --git a/worker/upload.go b/worker/upload.go index 37e217e41..fc56adc0c 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -1561,7 +1561,7 @@ func (s *slabUpload) receive(resp sectorUploadResp) (finished bool, next bool) { // failed reqs can't complete the upload s.numInflight-- if resp.err != nil { - s.errs = append(s.errs, &HostError{resp.req.hk, resp.err}) + s.errs[resp.req.hk] = resp.err return false, false } diff --git a/worker/worker.go b/worker/worker.go index bb0de45f0..3ffe16a04 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -396,6 +396,8 @@ func (w *worker) rhpScanHandler(jc jape.Context) { } func (w *worker) fetchContracts(ctx context.Context, metadatas []api.ContractMetadata, timeout time.Duration, blockHeight uint64) (contracts []api.Contract, errs HostErrorSet) { + errs = make(HostErrorSet) + // create requests channel reqs := make(chan api.ContractMetadata) @@ -410,7 +412,7 @@ func (w *worker) fetchContracts(ctx context.Context, metadatas []api.ContractMet }) mu.Lock() if err != nil { - errs = append(errs, &HostError{HostKey: md.HostKey, Err: err}) + errs[md.HostKey] = err contracts = append(contracts, api.Contract{ ContractMetadata: md, })