Skip to content

Commit

Permalink
worker: revert download refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjan committed Nov 8, 2023
1 parent d77650a commit 6b75878
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 109 deletions.
15 changes: 14 additions & 1 deletion autopilot/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,24 @@ func newOngoingMigrationsAlert(n int) alerts.Alert {
}
}

func newCriticalMigrationSucceededAlert(slabKey object.EncryptionKey) alerts.Alert {
return alerts.Alert{
ID: alertIDForSlab(alertMigrationID, slabKey),
Severity: alerts.SeverityInfo,
Message: "Critical migration succeeded",
Data: map[string]interface{}{
"slabKey": slabKey.String(),
"hint": "This migration succeeded thanks to the MigrationSurchargeMultiplier in the gouging settings that allowed overpaying hosts on some critical sector downloads",
},
Timestamp: time.Now(),
}
}

func newCriticalMigrationFailedAlert(slabKey object.EncryptionKey, health float64, err error) alerts.Alert {
return alerts.Alert{
ID: alertIDForSlab(alertMigrationID, slabKey),
Severity: alerts.SeverityCritical,
Message: "Critical slab migration failed",
Message: "Critical migration failed",
Data: map[string]interface{}{
"error": err.Error(),
"health": health,
Expand Down
15 changes: 12 additions & 3 deletions autopilot/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,16 @@ func (m *migrator) performMigrations(p *workerPool) {
} else {
m.ap.RegisterAlert(ctx, newMigrationFailedAlert(j.Key, j.Health, err))
}
} else {
m.logger.Infof("%v: migration %d/%d succeeded, key: %v, health: %v, shards migrated: %v", id, j.slabIdx+1, j.batchSize, j.Key, j.Health, res.NumShardsMigrated)
m.ap.DismissAlert(ctx, alertIDForSlab(alertMigrationID, j.Key))
continue
}

m.logger.Infof("%v: migration %d/%d succeeded, key: %v, health: %v, overpaid: %v, shards migrated: %v", id, j.slabIdx+1, j.batchSize, j.Key, j.Health, res.Overpaid, res.NumShardsMigrated)
m.ap.DismissAlert(ctx, alertIDForSlab(alertMigrationID, j.Key))
if res.Overpaid {
// this alert confirms the user his gouging settings
// are working, it will be dismissed automatically
// the next time this slab is successfully migrated
m.ap.RegisterAlert(ctx, newCriticalMigrationSucceededAlert(j.Key))
}
}
}(w)
Expand Down Expand Up @@ -239,5 +246,7 @@ OUTER:
case jobs <- job{slab, i, len(toMigrate), set, b}:
}
}

return
}
}
6 changes: 4 additions & 2 deletions internal/testing/gouging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestGouging(t *testing.T) {
// create a new test cluster
cluster := newTestCluster(t, testClusterOptions{
hosts: int(testAutopilotConfig.Contracts.Amount),
logger: newTestLoggerCustom(zapcore.DebugLevel),
logger: newTestLoggerCustom(zapcore.ErrorLevel),
})
defer cluster.Shutdown()

Expand Down Expand Up @@ -87,7 +87,9 @@ func TestGouging(t *testing.T) {

// download the data - should fail
buffer.Reset()
if err := w.DownloadObject(context.Background(), &buffer, api.DefaultBucketName, path, api.DownloadObjectOptions{}); err == nil {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
if err := w.DownloadObject(ctx, &buffer, api.DefaultBucketName, path, api.DownloadObjectOptions{}); err == nil {
t.Fatal("expected download to fail")
}
}
4 changes: 2 additions & 2 deletions stores/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -1581,8 +1581,8 @@ func (ss *SQLStore) UpdateSlab(ctx context.Context, s object.Slab, contractSet s

// make sure the roots stay the same.
for i, shard := range s.Shards {
if bytes.Equal(shard.Root[:], slab.Shards[i].Root) {
return fmt.Errorf("%w: shard %v has changed root from %v to %v", errShardRootChanged, i, slab.Shards[i].Root, shard.Root)
if !bytes.Equal(shard.Root[:], slab.Shards[i].Root) {
return fmt.Errorf("%w: shard %v has changed root from %v to %v", errShardRootChanged, i, slab.Shards[i].Root, shard.Root[:])
}
}

Expand Down
126 changes: 58 additions & 68 deletions worker/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@ func (mgr *downloadManager) newSlabDownload(ctx context.Context, dID id, slice o
used: make(map[types.PublicKey]struct{}),

sectors: make([][]byte, len(slice.Shards)),
errs: make(HostErrorSet),
}
}

Expand All @@ -577,19 +578,14 @@ func (mgr *downloadManager) downloadSlab(ctx context.Context, dID id, slice obje
// prepare new download
slab := mgr.newSlabDownload(ctx, dID, slice, index, migration, nextSlabChan)

// start downloading
shards, overpaid, err := slab.download(ctx)
if err != nil {
responseChan <- &slabDownloadResponse{
index: index,
err: err,
}
} else {
responseChan <- &slabDownloadResponse{
index: index,
shards: shards,
overpaid: overpaid,
}
// execute download
resp := &slabDownloadResponse{index: index}
resp.shards, resp.overpaid, resp.err = slab.download(ctx)

// send the response
select {
case <-ctx.Done():
case responseChan <- resp:
}
}

Expand Down Expand Up @@ -976,9 +972,10 @@ func (s *slabDownload) nextRequest(ctx context.Context, resps *sectorResponses,
hk: sector.Host,

// overpay is set to 'true' when a request is retried after the slab
// download failed, and we realise that it might have succeeded if we
// allowed overpaying for this download, we only do this for migrations
// and when the slab is below a certain health threshold
// download failed and we realise that it might have succeeded if we
// allowed overpaying for certain sectors, we only do this when trying
// to migrate a critically low-health slab that might otherwise be
// unrecoverable
overpay: false,

overdrive: overdrive,
Expand Down Expand Up @@ -1019,10 +1016,10 @@ func (s *slabDownload) download(ctx context.Context) ([][]byte, bool, error) {
var gouging []*sectorDownloadReq

// collect responses
var lastResort bool
var done bool

loop:
for s.ongoing() {
for s.inflight() > 0 && !done {
select {
case <-s.mgr.stopChan:
return nil, false, errors.New("download stopped")
Expand All @@ -1032,22 +1029,28 @@ loop:
resetOverdrive()
}

resps.Foreach(func(resp *sectorDownloadResp) {
if done := s.receive(*resp); done {
return
for {
resp := resps.Next()
if resp == nil {
break
}

// receive the response
done = s.receive(*resp)
if done {
break
}

// handle errors
if resp.err != nil {
// launch overdrive requests
if !lastResort {
for {
if req := s.nextRequest(ctx, resps, true); req != nil {
if err := s.launch(req); err != nil {
continue // try the next request if this fails to launch
}
for {
if req := s.nextRequest(ctx, resps, true); req != nil {
if err := s.launch(req); err != nil {
continue
}
break
}
break
}

// handle lost sectors
Expand All @@ -1057,21 +1060,19 @@ loop:
} else {
s.mgr.logger.Infow("successfully marked sector as lost", "hk", resp.req.hk, "root", resp.req.root)
}
} else if isPriceTableGouging(resp.err) && s.overpay {
resp.req.overpay = true
} else if isPriceTableGouging(resp.err) && s.overpay && !resp.req.overpay {
resp.req.overpay = true // ensures we don't retry the same request over and over again
gouging = append(gouging, resp.req)
}
}
})
}
}

if !lastResort && !s.completed() && len(gouging) >= s.missing() {
lastResort = true
if !done && len(gouging) >= s.missing() {
for _, req := range gouging {
if err := s.launch(req); err == nil {
s.errs.Remove(req.hk)
}
_ = s.launch(req) // ignore error
}
gouging = nil
goto loop
}

Expand Down Expand Up @@ -1130,16 +1131,10 @@ func (s *slabDownload) missing() int {
return 0
}

func (s *slabDownload) completed() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.numCompleted >= s.minShards
}

func (s *slabDownload) ongoing() bool {
func (s *slabDownload) inflight() uint64 {
s.mu.Lock()
defer s.mu.Unlock()
return s.numCompleted < s.minShards || s.numInflight > 0
return s.numInflight
}

func (s *slabDownload) launch(req *sectorDownloadReq) error {
Expand Down Expand Up @@ -1178,7 +1173,7 @@ func (s *slabDownload) launch(req *sectorDownloadReq) error {
return nil
}

func (s *slabDownload) receive(resp sectorDownloadResp) bool {
func (s *slabDownload) receive(resp sectorDownloadResp) (finished bool) {
s.mu.Lock()
defer s.mu.Unlock()

Expand All @@ -1190,10 +1185,19 @@ func (s *slabDownload) receive(resp sectorDownloadResp) bool {
// failed reqs can't complete the upload
s.numInflight--
if resp.err != nil {
s.errs = append(s.errs, &HostError{resp.req.hk, resp.err})
s.errs[resp.req.hk] = resp.err
return false
}

// try trigger next slab read
if !s.nextSlabTriggered && s.numCompleted+int(s.mgr.maxOverdrive) >= s.minShards {
select {
case <-s.nextSlabChan:
s.nextSlabTriggered = true
default:
}
}

// update num overpaid
if resp.req.overpay {
s.numOverpaid++
Expand All @@ -1203,15 +1207,6 @@ func (s *slabDownload) receive(resp sectorDownloadResp) bool {
s.sectors[resp.req.sectorIndex] = resp.sector
s.numCompleted++

// try trigger next slab
if !s.nextSlabTriggered && s.numCompleted+int(s.mgr.maxOverdrive) >= s.minShards {
select {
case <-s.nextSlabChan:
s.nextSlabTriggered = true
default:
}
}

return s.numCompleted >= s.minShards
}

Expand Down Expand Up @@ -1322,18 +1317,13 @@ func (sr *sectorResponses) Close() error {
return nil
}

func (sr *sectorResponses) Foreach(fn func(res *sectorDownloadResp)) {
for {
sr.mu.Lock()
if len(sr.responses) == 0 {
sr.mu.Unlock()
return
}

resp := sr.responses[0]
sr.responses = sr.responses[1:]
sr.mu.Unlock()

fn(resp)
func (sr *sectorResponses) Next() *sectorDownloadResp {
sr.mu.Lock()
defer sr.mu.Unlock()
if len(sr.responses) == 0 {
return nil
}
resp := sr.responses[0]
sr.responses = sr.responses[1:]
return resp
}
41 changes: 10 additions & 31 deletions worker/rhpv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,51 +50,30 @@ var (
ErrContractFinalized = errors.New("contract cannot be revised further")
)

// A HostError associates an error with a given host.
type HostError struct {
HostKey types.PublicKey
Err error
}

// Error implements error.
func (he HostError) Error() string {
return fmt.Sprintf("%x: %v", he.HostKey[:4], he.Err.Error())
}

// Unwrap returns the underlying error.
func (he HostError) Unwrap() error {
return he.Err
}

// A HostErrorSet is a collection of errors from various hosts.
type HostErrorSet []*HostError
type HostErrorSet map[types.PublicKey]error

// NumGouging returns numbers of host that errored out due to price gouging.
func (hes HostErrorSet) NumGouging() (n int) {
for _, he := range hes {
if errors.Is(he.Err, errPriceTableGouging) {
if errors.Is(he, errPriceTableGouging) {
n++
}
}
return
}

// Remove removes all errors for the given host.
func (hes HostErrorSet) Remove(hk types.PublicKey) {
for i := 0; i < len(hes); i++ {
if hes[i].HostKey == hk {
hes = append(hes[:i], hes[i+1:]...)
i--
}
}
}

// Error implements error.
func (hes HostErrorSet) Error() string {
strs := make([]string, len(hes))
for i := range strs {
strs[i] = hes[i].Error()
if len(hes) == 0 {
return ""
}

var strs []string
for hk, he := range hes {
strs = append(strs, fmt.Sprintf("%x: %v", hk[:4], he.Error()))
}

// include a leading newline so that the first error isn't printed on the
// same line as the error context
return "\n" + strings.Join(strs, "\n")
Expand Down
2 changes: 1 addition & 1 deletion worker/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -1561,7 +1561,7 @@ func (s *slabUpload) receive(resp sectorUploadResp) (finished bool, next bool) {
// failed reqs can't complete the upload
s.numInflight--
if resp.err != nil {
s.errs = append(s.errs, &HostError{resp.req.hk, resp.err})
s.errs[resp.req.hk] = resp.err
return false, false
}

Expand Down
4 changes: 3 additions & 1 deletion worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,8 @@ func (w *worker) rhpScanHandler(jc jape.Context) {
}

func (w *worker) fetchContracts(ctx context.Context, metadatas []api.ContractMetadata, timeout time.Duration, blockHeight uint64) (contracts []api.Contract, errs HostErrorSet) {
errs = make(HostErrorSet)

// create requests channel
reqs := make(chan api.ContractMetadata)

Expand All @@ -410,7 +412,7 @@ func (w *worker) fetchContracts(ctx context.Context, metadatas []api.ContractMet
})
mu.Lock()
if err != nil {
errs = append(errs, &HostError{HostKey: md.HostKey, Err: err})
errs[md.HostKey] = err
contracts = append(contracts, api.Contract{
ContractMetadata: md,
})
Expand Down

0 comments on commit 6b75878

Please sign in to comment.