Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prune contract-sector links from the db if a host can't find the sector #643

Merged
merged 12 commits into from
Oct 6, 2023
Merged
18 changes: 18 additions & 0 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ type (
PackedSlabsForUpload(ctx context.Context, lockingDuration time.Duration, minShards, totalShards uint8, set string, limit int) ([]api.PackedSlab, error)
SlabBuffers(ctx context.Context) ([]api.SlabBuffer, error)

DeleteHostSector(ctx context.Context, hk types.PublicKey, root types.Hash256) error

ObjectsStats(ctx context.Context) (api.ObjectsStatsResponse, error)

AddPartialSlab(ctx context.Context, data []byte, minShards, totalShards uint8, contractSet string) (slabs []object.PartialSlab, bufferSize int64, err error)
Expand Down Expand Up @@ -1164,6 +1166,20 @@ func (b *bus) packedSlabsHandlerDonePOST(jc jape.Context) {
jc.Check("failed to mark packed slab(s) as uploaded", b.ms.MarkPackedSlabsUploaded(jc.Request.Context(), psrp.Slabs, psrp.UsedContracts))
}

func (b *bus) sectorsHostRootHandlerDELETE(jc jape.Context) {
var hk types.PublicKey
var root types.Hash256
if jc.DecodeParam("hk", &hk) != nil {
return
} else if jc.DecodeParam("root", &root) != nil {
return
}
err := b.ms.DeleteHostSector(jc.Request.Context(), hk, root)
if jc.Check("failed to mark sector as lost", err) != nil {
return
}
}

func (b *bus) slabObjectsHandlerGET(jc jape.Context) {
var key object.EncryptionKey
if jc.DecodeParam("key", &key) != nil {
Expand Down Expand Up @@ -2054,6 +2070,8 @@ func (b *bus) Handler() http.Handler {
"POST /slabbuffer/fetch": b.packedSlabsHandlerFetchPOST,
"POST /slabbuffer/done": b.packedSlabsHandlerDonePOST,

"DELETE /sectors/:hk/:root": b.sectorsHostRootHandlerDELETE,

"POST /slabs/migration": b.slabsMigrationHandlerPOST,
"GET /slabs/partial/:key": b.slabsPartialHandlerGET,
"POST /slabs/partial": b.slabsPartialHandlerPOST,
Expand Down
12 changes: 12 additions & 0 deletions bus/client/sectors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package client

import (
"context"
"fmt"

"go.sia.tech/core/types"
)

func (c *Client) DeleteHostSector(ctx context.Context, hk types.PublicKey, root types.Hash256) error {
return c.c.WithContext(ctx).DELETE(fmt.Sprintf("/sectors/%s/%s", hk, root))
}
66 changes: 65 additions & 1 deletion stores/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ func (s *SQLStore) ObjectsStats(ctx context.Context) (api.ObjectsStatsResponse,

var totalSectors uint64

batchSize := 10000000
batchSize := 5000000
marker := uint64(0)
for offset := 0; ; offset += batchSize {
var result struct {
Expand Down Expand Up @@ -1321,6 +1321,70 @@ func (s *SQLStore) CopyObject(ctx context.Context, srcBucket, dstBucket, srcPath
return
}

func (s *SQLStore) DeleteHostSector(ctx context.Context, hk types.PublicKey, root types.Hash256) error {
return s.retryTransaction(func(tx *gorm.DB) error {
// Fetch contract_sectors to delete.
var sectors []dbContractSector
err := tx.Raw(`
SELECT contract_sectors.*
FROM contract_sectors
INNER JOIN sectors s ON s.id = contract_sectors.db_sector_id
INNER JOIN contracts c ON c.id = contract_sectors.db_contract_id
INNER JOIN hosts h ON h.id = c.host_id
WHERE s.root = ? AND h.public_key = ?
`, root[:], publicKey(hk)).
Scan(&sectors).
Error
if err != nil {
return fmt.Errorf("failed to fetch contract sectors for deletion: %w", err)
}

if len(sectors) > 0 {
// Update the affected slabs.
var sectorIDs []uint
uniqueIDs := make(map[uint]struct{})
for _, s := range sectors {
if _, exists := uniqueIDs[s.DBSectorID]; !exists {
uniqueIDs[s.DBSectorID] = struct{}{}
sectorIDs = append(sectorIDs, s.DBSectorID)
}
}
err = tx.Exec("UPDATE slabs SET health_valid = 0 WHERE id IN (SELECT db_slab_id FROM sectors WHERE id IN (?))", sectorIDs).Error
if err != nil {
return fmt.Errorf("failed to invalidate slab health: %w", err)
}

// Delete contract_sectors.
res := tx.Delete(&sectors)
if err := res.Error; err != nil {
return fmt.Errorf("failed to delete contract sectors: %w", err)
} else if res.RowsAffected != int64(len(sectors)) {
return fmt.Errorf("expected %v affected rows but got %v", len(sectors), res.RowsAffected)
}
}

// Fetch the sector and update the latest_host field if the host for
// which we remove the sector is the latest_host.
var sector dbSector
err = tx.Where("root", root[:]).
Preload("Contracts.Host").
Find(&sector).
Error
if err != nil {
return fmt.Errorf("failed to fetch sectors: %w", err)
}
if sector.LatestHost == publicKey(hk) {
if len(sector.Contracts) == 0 {
sector.LatestHost = publicKey{} // no more hosts
} else {
sector.LatestHost = sector.Contracts[len(sector.Contracts)-1].Host.PublicKey // most recent contract
}
return tx.Save(sector).Error
}
return nil
})
}

func (s *SQLStore) UpdateObject(ctx context.Context, bucket, path, contractSet, eTag, mimeType string, o object.Object, usedContracts map[types.PublicKey]types.FileContractID) error {
s.objectsMu.Lock()
defer s.objectsMu.Unlock()
Expand Down
80 changes: 80 additions & 0 deletions stores/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3658,3 +3658,83 @@ func TestListObjects(t *testing.T) {
}
}
}

func TestDeleteHostSector(t *testing.T) {
db, _, _, err := newTestSQLStore(t.TempDir())
if err != nil {
t.Fatal(err)
}

// create 2 hosts.
hks, err := db.addTestHosts(2)
if err != nil {
t.Fatal(err)
}
hk1, hk2 := hks[0], hks[1]

// create 2 contracts with each
_, _, err = db.addTestContracts([]types.PublicKey{hk1, hk1, hk2, hk2})
if err != nil {
t.Fatal(err)
}

// get all contracts
var dbContracts []dbContract
if err := db.db.Find(&dbContracts).Error; err != nil {
t.Fatal(err)
}

// create a healthy slab with one sector that is uploaded to all contracts.
root := types.Hash256{1, 2, 3}
slab := dbSlab{
DBContractSetID: 1,
Key: []byte(object.GenerateEncryptionKey().String()),
Health: 1.0,
HealthValid: true,
TotalShards: 1,
Shards: []dbSector{
{
Contracts: dbContracts,
Root: root[:],
LatestHost: publicKey(hk1), // hk1 is latest host
},
},
}
if err := db.db.Create(&slab).Error; err != nil {
t.Fatal(err)
}

// Make sure 4 contractSector entries exist.
var n int64
if err := db.db.Model(&dbContractSector{}).
Count(&n).
Error; err != nil {
t.Fatal(err)
} else if n != 4 {
t.Fatal("expected 4 contract-sector links", n)
}

// Prune the sector from hk1.
if err := db.DeleteHostSector(context.Background(), hk1, root); err != nil {
t.Fatal(err)
}

// Make sure 2 contractSector entries exist.
if err := db.db.Model(&dbContractSector{}).
Count(&n).
Error; err != nil {
t.Fatal(err)
} else if n != 2 {
t.Fatal("expected 2 contract-sector links", n)
}

// Find the slab. It should have an invalid health.
var s dbSlab
if err := db.db.Preload("Shards").Take(&s).Error; err != nil {
t.Fatal(err)
} else if s.HealthValid {
t.Fatal("expected health to be invalid")
} else if s.Shards[0].LatestHost != publicKey(hk2) {
t.Fatal("expected hk2 to be latest host", types.PublicKey(s.Shards[0].LatestHost))
}
}
31 changes: 26 additions & 5 deletions worker/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type (
downloadManager struct {
hp hostProvider
pss partialSlabStore
slm sectorLostMarker
logger *zap.SugaredLogger

maxOverdrive uint64
Expand Down Expand Up @@ -71,8 +72,13 @@ type (
numDownloads uint64
}

sectorLostMarker interface {
DeleteHostSector(ctx context.Context, hk types.PublicKey, root types.Hash256) error
}

slabDownload struct {
mgr *downloadManager
slm sectorLostMarker

dID id
sID slabID
Expand Down Expand Up @@ -119,6 +125,7 @@ type (
sectorDownloadResp struct {
overdrive bool
hk types.PublicKey
root types.Hash256
sectorIndex int
sector []byte
err error
Expand Down Expand Up @@ -148,13 +155,14 @@ func (w *worker) initDownloadManager(maxOverdrive uint64, overdriveTimeout time.
panic("download manager already initialized") // developer error
}

w.downloadManager = newDownloadManager(w, w, maxOverdrive, overdriveTimeout, logger)
w.downloadManager = newDownloadManager(w, w, w.bus, maxOverdrive, overdriveTimeout, logger)
}

func newDownloadManager(hp hostProvider, pss partialSlabStore, maxOverdrive uint64, overdriveTimeout time.Duration, logger *zap.SugaredLogger) *downloadManager {
func newDownloadManager(hp hostProvider, pss partialSlabStore, slm sectorLostMarker, maxOverdrive uint64, overdriveTimeout time.Duration, logger *zap.SugaredLogger) *downloadManager {
return &downloadManager{
hp: hp,
pss: pss,
slm: slm,
logger: logger,

maxOverdrive: maxOverdrive,
Expand Down Expand Up @@ -539,6 +547,7 @@ func (mgr *downloadManager) newSlabDownload(ctx context.Context, dID id, slice o
// create slab download
return &slabDownload{
mgr: mgr,
slm: mgr.slm,

dID: dID,
sID: sID,
Expand Down Expand Up @@ -824,6 +833,7 @@ func (d *downloader) execute(req *sectorDownloadReq) (err error) {
func (req *sectorDownloadReq) succeed(sector []byte) {
req.resps.Add(&sectorDownloadResp{
hk: req.hk,
root: req.root,
overdrive: req.overdrive,
sectorIndex: req.sectorIndex,
sector: sector,
Expand All @@ -834,6 +844,7 @@ func (req *sectorDownloadReq) fail(err error) {
req.resps.Add(&sectorDownloadResp{
err: err,
hk: req.hk,
root: req.root,
overdrive: req.overdrive,
})
}
Expand Down Expand Up @@ -984,10 +995,12 @@ func (s *slabDownload) downloadShards(ctx context.Context, nextSlabChan chan str
resetOverdrive := s.overdrive(ctx, resps)

// launch 'MinShard' requests
for i := 0; i < int(s.minShards); i++ {
for i := 0; i < int(s.minShards); {
req := s.nextRequest(ctx, resps, false)
if err := s.launch(req); err != nil {
return nil, errors.New("no hosts available")
if req == nil {
return nil, fmt.Errorf("no hosts available")
} else if err := s.launch(req); err == nil {
i++
}
}

Expand All @@ -1011,6 +1024,14 @@ func (s *slabDownload) downloadShards(ctx context.Context, nextSlabChan chan str
break
}

if isSectorNotFound(resp.err) {
if err := s.slm.DeleteHostSector(ctx, resp.hk, resp.root); err != nil {
s.mgr.logger.Errorw("failed to mark sector as lost", "hk", resp.hk, "root", resp.root, "err", err)
} else {
s.mgr.logger.Infow("successfully marked sector as lost", "hk", resp.hk, "root", resp.root)
}
}

done, next = s.receive(*resp)
if !done && resp.err != nil {
for {
Expand Down
15 changes: 9 additions & 6 deletions worker/rhpv3.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ var (

// errSectorNotFound is returned by the host when it can not find the
// requested sector.
errSectorNotFound = errors.New("sector not found")
errSectorNotFoundOld = errors.New("could not find the desired sector")
errSectorNotFound = errors.New("sector not found")

// errWithdrawalsInactive occurs when the host is (perhaps temporarily)
// unsynced and has disabled its account manager.
Expand All @@ -83,11 +84,13 @@ func isBalanceMaxExceeded(err error) bool { return isError(err, errBalanceMaxEx
func isClosedStream(err error) bool {
return isError(err, mux.ErrClosedStream) || isError(err, net.ErrClosed)
}
func isInsufficientFunds(err error) bool { return isError(err, ErrInsufficientFunds) }
func isMaxRevisionReached(err error) bool { return isError(err, errMaxRevisionReached) }
func isPriceTableExpired(err error) bool { return isError(err, errPriceTableExpired) }
func isPriceTableNotFound(err error) bool { return isError(err, errPriceTableNotFound) }
func isSectorNotFound(err error) bool { return isError(err, errSectorNotFound) }
func isInsufficientFunds(err error) bool { return isError(err, ErrInsufficientFunds) }
func isMaxRevisionReached(err error) bool { return isError(err, errMaxRevisionReached) }
func isPriceTableExpired(err error) bool { return isError(err, errPriceTableExpired) }
func isPriceTableNotFound(err error) bool { return isError(err, errPriceTableNotFound) }
func isSectorNotFound(err error) bool {
return isError(err, errSectorNotFound) || isError(err, errSectorNotFoundOld)
}
func isWithdrawalsInactive(err error) bool { return isError(err, errWithdrawalsInactive) }

func isError(err error, target error) bool {
Expand Down
2 changes: 2 additions & 0 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ type Bus interface {
FetchPartialSlab(ctx context.Context, key object.EncryptionKey, offset, length uint32) ([]byte, error)
Slab(ctx context.Context, key object.EncryptionKey) (object.Slab, error)

DeleteHostSector(ctx context.Context, hk types.PublicKey, root types.Hash256) error

MarkPackedSlabsUploaded(ctx context.Context, slabs []api.UploadedPackedSlab, usedContracts map[types.PublicKey]types.FileContractID) error
PackedSlabsForUpload(ctx context.Context, lockingDuration time.Duration, minShards, totalShards uint8, set string, limit int) ([]api.PackedSlab, error)

Expand Down