diff --git a/bus/bus.go b/bus/bus.go index c1c79924b..6e4e2b6e2 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -151,6 +151,8 @@ type ( PackedSlabsForUpload(ctx context.Context, lockingDuration time.Duration, minShards, totalShards uint8, set string, limit int) ([]api.PackedSlab, error) SlabBuffers(ctx context.Context) ([]api.SlabBuffer, error) + DeleteHostSector(ctx context.Context, hk types.PublicKey, root types.Hash256) error + ObjectsStats(ctx context.Context) (api.ObjectsStatsResponse, error) AddPartialSlab(ctx context.Context, data []byte, minShards, totalShards uint8, contractSet string) (slabs []object.PartialSlab, bufferSize int64, err error) @@ -1164,6 +1166,20 @@ func (b *bus) packedSlabsHandlerDonePOST(jc jape.Context) { jc.Check("failed to mark packed slab(s) as uploaded", b.ms.MarkPackedSlabsUploaded(jc.Request.Context(), psrp.Slabs, psrp.UsedContracts)) } +func (b *bus) sectorsHostRootHandlerDELETE(jc jape.Context) { + var hk types.PublicKey + var root types.Hash256 + if jc.DecodeParam("hk", &hk) != nil { + return + } else if jc.DecodeParam("root", &root) != nil { + return + } + err := b.ms.DeleteHostSector(jc.Request.Context(), hk, root) + if jc.Check("failed to mark sector as lost", err) != nil { + return + } +} + func (b *bus) slabObjectsHandlerGET(jc jape.Context) { var key object.EncryptionKey if jc.DecodeParam("key", &key) != nil { @@ -2054,6 +2070,8 @@ func (b *bus) Handler() http.Handler { "POST /slabbuffer/fetch": b.packedSlabsHandlerFetchPOST, "POST /slabbuffer/done": b.packedSlabsHandlerDonePOST, + "DELETE /sectors/:hk/:root": b.sectorsHostRootHandlerDELETE, + "POST /slabs/migration": b.slabsMigrationHandlerPOST, "GET /slabs/partial/:key": b.slabsPartialHandlerGET, "POST /slabs/partial": b.slabsPartialHandlerPOST, diff --git a/bus/client/sectors.go b/bus/client/sectors.go new file mode 100644 index 000000000..669d88f3f --- /dev/null +++ b/bus/client/sectors.go @@ -0,0 +1,12 @@ +package client + +import ( + "context" + "fmt" + + "go.sia.tech/core/types" +) + +func (c *Client) DeleteHostSector(ctx context.Context, hk types.PublicKey, root types.Hash256) error { + return c.c.WithContext(ctx).DELETE(fmt.Sprintf("/sectors/%s/%s", hk, root)) +} diff --git a/stores/metadata.go b/stores/metadata.go index 6b186ee19..0c619a7e6 100644 --- a/stores/metadata.go +++ b/stores/metadata.go @@ -563,7 +563,7 @@ func (s *SQLStore) ObjectsStats(ctx context.Context) (api.ObjectsStatsResponse, var totalSectors uint64 - batchSize := 10000000 + batchSize := 5000000 marker := uint64(0) for offset := 0; ; offset += batchSize { var result struct { @@ -1321,6 +1321,70 @@ func (s *SQLStore) CopyObject(ctx context.Context, srcBucket, dstBucket, srcPath return } +func (s *SQLStore) DeleteHostSector(ctx context.Context, hk types.PublicKey, root types.Hash256) error { + return s.retryTransaction(func(tx *gorm.DB) error { + // Fetch contract_sectors to delete. + var sectors []dbContractSector + err := tx.Raw(` + SELECT contract_sectors.* + FROM contract_sectors + INNER JOIN sectors s ON s.id = contract_sectors.db_sector_id + INNER JOIN contracts c ON c.id = contract_sectors.db_contract_id + INNER JOIN hosts h ON h.id = c.host_id + WHERE s.root = ? AND h.public_key = ? + `, root[:], publicKey(hk)). + Scan(§ors). + Error + if err != nil { + return fmt.Errorf("failed to fetch contract sectors for deletion: %w", err) + } + + if len(sectors) > 0 { + // Update the affected slabs. + var sectorIDs []uint + uniqueIDs := make(map[uint]struct{}) + for _, s := range sectors { + if _, exists := uniqueIDs[s.DBSectorID]; !exists { + uniqueIDs[s.DBSectorID] = struct{}{} + sectorIDs = append(sectorIDs, s.DBSectorID) + } + } + err = tx.Exec("UPDATE slabs SET health_valid = 0 WHERE id IN (SELECT db_slab_id FROM sectors WHERE id IN (?))", sectorIDs).Error + if err != nil { + return fmt.Errorf("failed to invalidate slab health: %w", err) + } + + // Delete contract_sectors. + res := tx.Delete(§ors) + if err := res.Error; err != nil { + return fmt.Errorf("failed to delete contract sectors: %w", err) + } else if res.RowsAffected != int64(len(sectors)) { + return fmt.Errorf("expected %v affected rows but got %v", len(sectors), res.RowsAffected) + } + } + + // Fetch the sector and update the latest_host field if the host for + // which we remove the sector is the latest_host. + var sector dbSector + err = tx.Where("root", root[:]). + Preload("Contracts.Host"). + Find(§or). + Error + if err != nil { + return fmt.Errorf("failed to fetch sectors: %w", err) + } + if sector.LatestHost == publicKey(hk) { + if len(sector.Contracts) == 0 { + sector.LatestHost = publicKey{} // no more hosts + } else { + sector.LatestHost = sector.Contracts[len(sector.Contracts)-1].Host.PublicKey // most recent contract + } + return tx.Save(sector).Error + } + return nil + }) +} + func (s *SQLStore) UpdateObject(ctx context.Context, bucket, path, contractSet, eTag, mimeType string, o object.Object, usedContracts map[types.PublicKey]types.FileContractID) error { s.objectsMu.Lock() defer s.objectsMu.Unlock() diff --git a/stores/metadata_test.go b/stores/metadata_test.go index 3a1ed3222..63971101c 100644 --- a/stores/metadata_test.go +++ b/stores/metadata_test.go @@ -3658,3 +3658,83 @@ func TestListObjects(t *testing.T) { } } } + +func TestDeleteHostSector(t *testing.T) { + db, _, _, err := newTestSQLStore(t.TempDir()) + if err != nil { + t.Fatal(err) + } + + // create 2 hosts. + hks, err := db.addTestHosts(2) + if err != nil { + t.Fatal(err) + } + hk1, hk2 := hks[0], hks[1] + + // create 2 contracts with each + _, _, err = db.addTestContracts([]types.PublicKey{hk1, hk1, hk2, hk2}) + if err != nil { + t.Fatal(err) + } + + // get all contracts + var dbContracts []dbContract + if err := db.db.Find(&dbContracts).Error; err != nil { + t.Fatal(err) + } + + // create a healthy slab with one sector that is uploaded to all contracts. + root := types.Hash256{1, 2, 3} + slab := dbSlab{ + DBContractSetID: 1, + Key: []byte(object.GenerateEncryptionKey().String()), + Health: 1.0, + HealthValid: true, + TotalShards: 1, + Shards: []dbSector{ + { + Contracts: dbContracts, + Root: root[:], + LatestHost: publicKey(hk1), // hk1 is latest host + }, + }, + } + if err := db.db.Create(&slab).Error; err != nil { + t.Fatal(err) + } + + // Make sure 4 contractSector entries exist. + var n int64 + if err := db.db.Model(&dbContractSector{}). + Count(&n). + Error; err != nil { + t.Fatal(err) + } else if n != 4 { + t.Fatal("expected 4 contract-sector links", n) + } + + // Prune the sector from hk1. + if err := db.DeleteHostSector(context.Background(), hk1, root); err != nil { + t.Fatal(err) + } + + // Make sure 2 contractSector entries exist. + if err := db.db.Model(&dbContractSector{}). + Count(&n). + Error; err != nil { + t.Fatal(err) + } else if n != 2 { + t.Fatal("expected 2 contract-sector links", n) + } + + // Find the slab. It should have an invalid health. + var s dbSlab + if err := db.db.Preload("Shards").Take(&s).Error; err != nil { + t.Fatal(err) + } else if s.HealthValid { + t.Fatal("expected health to be invalid") + } else if s.Shards[0].LatestHost != publicKey(hk2) { + t.Fatal("expected hk2 to be latest host", types.PublicKey(s.Shards[0].LatestHost)) + } +} diff --git a/worker/download.go b/worker/download.go index 88b4fdc86..7015f3006 100644 --- a/worker/download.go +++ b/worker/download.go @@ -35,6 +35,7 @@ type ( downloadManager struct { hp hostProvider pss partialSlabStore + slm sectorLostMarker logger *zap.SugaredLogger maxOverdrive uint64 @@ -71,8 +72,13 @@ type ( numDownloads uint64 } + sectorLostMarker interface { + DeleteHostSector(ctx context.Context, hk types.PublicKey, root types.Hash256) error + } + slabDownload struct { mgr *downloadManager + slm sectorLostMarker dID id sID slabID @@ -119,6 +125,7 @@ type ( sectorDownloadResp struct { overdrive bool hk types.PublicKey + root types.Hash256 sectorIndex int sector []byte err error @@ -148,13 +155,14 @@ func (w *worker) initDownloadManager(maxOverdrive uint64, overdriveTimeout time. panic("download manager already initialized") // developer error } - w.downloadManager = newDownloadManager(w, w, maxOverdrive, overdriveTimeout, logger) + w.downloadManager = newDownloadManager(w, w, w.bus, maxOverdrive, overdriveTimeout, logger) } -func newDownloadManager(hp hostProvider, pss partialSlabStore, maxOverdrive uint64, overdriveTimeout time.Duration, logger *zap.SugaredLogger) *downloadManager { +func newDownloadManager(hp hostProvider, pss partialSlabStore, slm sectorLostMarker, maxOverdrive uint64, overdriveTimeout time.Duration, logger *zap.SugaredLogger) *downloadManager { return &downloadManager{ hp: hp, pss: pss, + slm: slm, logger: logger, maxOverdrive: maxOverdrive, @@ -539,6 +547,7 @@ func (mgr *downloadManager) newSlabDownload(ctx context.Context, dID id, slice o // create slab download return &slabDownload{ mgr: mgr, + slm: mgr.slm, dID: dID, sID: sID, @@ -824,6 +833,7 @@ func (d *downloader) execute(req *sectorDownloadReq) (err error) { func (req *sectorDownloadReq) succeed(sector []byte) { req.resps.Add(§orDownloadResp{ hk: req.hk, + root: req.root, overdrive: req.overdrive, sectorIndex: req.sectorIndex, sector: sector, @@ -834,6 +844,7 @@ func (req *sectorDownloadReq) fail(err error) { req.resps.Add(§orDownloadResp{ err: err, hk: req.hk, + root: req.root, overdrive: req.overdrive, }) } @@ -984,10 +995,12 @@ func (s *slabDownload) downloadShards(ctx context.Context, nextSlabChan chan str resetOverdrive := s.overdrive(ctx, resps) // launch 'MinShard' requests - for i := 0; i < int(s.minShards); i++ { + for i := 0; i < int(s.minShards); { req := s.nextRequest(ctx, resps, false) - if err := s.launch(req); err != nil { - return nil, errors.New("no hosts available") + if req == nil { + return nil, fmt.Errorf("no hosts available") + } else if err := s.launch(req); err == nil { + i++ } } @@ -1011,6 +1024,14 @@ func (s *slabDownload) downloadShards(ctx context.Context, nextSlabChan chan str break } + if isSectorNotFound(resp.err) { + if err := s.slm.DeleteHostSector(ctx, resp.hk, resp.root); err != nil { + s.mgr.logger.Errorw("failed to mark sector as lost", "hk", resp.hk, "root", resp.root, "err", err) + } else { + s.mgr.logger.Infow("successfully marked sector as lost", "hk", resp.hk, "root", resp.root) + } + } + done, next = s.receive(*resp) if !done && resp.err != nil { for { diff --git a/worker/rhpv3.go b/worker/rhpv3.go index 0f7c5ecf9..aefd7c11c 100644 --- a/worker/rhpv3.go +++ b/worker/rhpv3.go @@ -71,7 +71,8 @@ var ( // errSectorNotFound is returned by the host when it can not find the // requested sector. - errSectorNotFound = errors.New("sector not found") + errSectorNotFoundOld = errors.New("could not find the desired sector") + errSectorNotFound = errors.New("sector not found") // errWithdrawalsInactive occurs when the host is (perhaps temporarily) // unsynced and has disabled its account manager. @@ -83,11 +84,13 @@ func isBalanceMaxExceeded(err error) bool { return isError(err, errBalanceMaxEx func isClosedStream(err error) bool { return isError(err, mux.ErrClosedStream) || isError(err, net.ErrClosed) } -func isInsufficientFunds(err error) bool { return isError(err, ErrInsufficientFunds) } -func isMaxRevisionReached(err error) bool { return isError(err, errMaxRevisionReached) } -func isPriceTableExpired(err error) bool { return isError(err, errPriceTableExpired) } -func isPriceTableNotFound(err error) bool { return isError(err, errPriceTableNotFound) } -func isSectorNotFound(err error) bool { return isError(err, errSectorNotFound) } +func isInsufficientFunds(err error) bool { return isError(err, ErrInsufficientFunds) } +func isMaxRevisionReached(err error) bool { return isError(err, errMaxRevisionReached) } +func isPriceTableExpired(err error) bool { return isError(err, errPriceTableExpired) } +func isPriceTableNotFound(err error) bool { return isError(err, errPriceTableNotFound) } +func isSectorNotFound(err error) bool { + return isError(err, errSectorNotFound) || isError(err, errSectorNotFoundOld) +} func isWithdrawalsInactive(err error) bool { return isError(err, errWithdrawalsInactive) } func isError(err error, target error) bool { diff --git a/worker/worker.go b/worker/worker.go index 0d74ad7a8..1be7070bd 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -170,6 +170,8 @@ type Bus interface { FetchPartialSlab(ctx context.Context, key object.EncryptionKey, offset, length uint32) ([]byte, error) Slab(ctx context.Context, key object.EncryptionKey) (object.Slab, error) + DeleteHostSector(ctx context.Context, hk types.PublicKey, root types.Hash256) error + MarkPackedSlabsUploaded(ctx context.Context, slabs []api.UploadedPackedSlab, usedContracts map[types.PublicKey]types.FileContractID) error PackedSlabsForUpload(ctx context.Context, lockingDuration time.Duration, minShards, totalShards uint8, set string, limit int) ([]api.PackedSlab, error)