Skip to content

Commit

Permalink
Migrate UpdateSlab to raw SQL (#1249)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisSchinnerl committed May 29, 2024
2 parents 95d7e07 + 9995a46 commit 155e688
Show file tree
Hide file tree
Showing 9 changed files with 521 additions and 195 deletions.
3 changes: 3 additions & 0 deletions internal/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ const (
)

var (
ErrInvalidNumberOfShards = errors.New("slab has invalid number of shards")
ErrShardRootChanged = errors.New("shard root changed")

ErrRunV072 = errors.New("can't upgrade to >=v1.0.0 from your current version - please upgrade to v0.7.2 first (https://github.com/SiaFoundation/renterd/releases/tag/v0.7.2)")
ErrMySQLNoSuperPrivilege = errors.New("You do not have the SUPER privilege and binary logging is enabled")
)
Expand Down
16 changes: 14 additions & 2 deletions object/slab.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,20 @@ func ContractsFromShards(shards []Sector) map[types.PublicKey]map[types.FileCont
return usedContracts
}

func (s Slab) Contracts() map[types.PublicKey]map[types.FileContractID]struct{} {
return ContractsFromShards(s.Shards)
func (s Slab) Contracts() []types.FileContractID {
var usedContracts []types.FileContractID
added := make(map[types.FileContractID]struct{})
for _, shard := range s.Shards {
for _, fcids := range shard.Contracts {
for _, fcid := range fcids {
if _, exists := added[fcid]; !exists {
usedContracts = append(usedContracts, fcid)
added[fcid] = struct{}{}
}
}
}
}
return usedContracts
}

// Length returns the length of the raw data stored in s.
Expand Down
112 changes: 3 additions & 109 deletions stores/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ var (
)

var (
errInvalidNumberOfShards = errors.New("slab has invalid number of shards")
errShardRootChanged = errors.New("shard root changed")

objectDeleteBatchSizes = []int64{10, 50, 100, 200, 500, 1000, 5000, 10000, 50000, 100000}
)

Expand Down Expand Up @@ -1391,7 +1388,7 @@ GROUP BY d.id
}

func (s *SQLStore) Object(ctx context.Context, bucket, path string) (obj api.Object, err error) {
err = s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
err = s.retryTransaction(ctx, func(tx *gorm.DB) error {
obj, err = s.object(tx, bucket, path)
return err
})
Expand Down Expand Up @@ -1913,112 +1910,9 @@ func (ss *SQLStore) UpdateSlab(ctx context.Context, s object.Slab, contractSet s
}
}

// extract the slab key
key, err := s.Key.MarshalBinary()
if err != nil {
return err
}

// collect all used contracts
usedContracts := s.Contracts()

// Update slab.
return ss.retryTransaction(ctx, func(tx *gorm.DB) (err error) {
// update slab
if err := tx.Model(&dbSlab{}).
Where("key", key).
Updates(map[string]interface{}{
"db_contract_set_id": gorm.Expr("(SELECT id FROM contract_sets WHERE name = ?)", contractSet),
"health_valid_until": time.Now().Unix(),
"health": 1,
}).
Error; err != nil {
return err
}

// find all used contracts
contracts, err := fetchUsedContracts(tx, usedContracts)
if err != nil {
return err
}

// find existing slab
var slab dbSlab
if err = tx.
Where(&dbSlab{Key: key}).
Preload("Shards").
Take(&slab).
Error; err == gorm.ErrRecordNotFound {
return fmt.Errorf("slab with key '%s' not found: %w", string(key), err)
} else if err != nil {
return err
}

// make sure the number of shards doesn't change.
// NOTE: check both the slice as well as the TotalShards field to be
// safe.
if len(s.Shards) != int(slab.TotalShards) {
return fmt.Errorf("%w: expected %v shards (TotalShards) but got %v", errInvalidNumberOfShards, slab.TotalShards, len(s.Shards))
} else if len(s.Shards) != len(slab.Shards) {
return fmt.Errorf("%w: expected %v shards (Shards) but got %v", errInvalidNumberOfShards, len(slab.Shards), len(s.Shards))
}

// make sure the roots stay the same.
for i, shard := range s.Shards {
if shard.Root != types.Hash256(slab.Shards[i].Root) {
return fmt.Errorf("%w: shard %v has changed root from %v to %v", errShardRootChanged, i, slab.Shards[i].Root, shard.Root[:])
}
}

// prepare sectors to update
sectors := make([]dbSector, len(s.Shards))
for i := range s.Shards {
sectors[i] = dbSector{
DBSlabID: slab.ID,
SlabIndex: i + 1,
LatestHost: publicKey(s.Shards[i].LatestHost),
Root: s.Shards[i].Root[:],
}
}

// ensure the sectors exists
sectorIDs, err := upsertSectors(tx, sectors)
if err != nil {
return fmt.Errorf("failed to create sector: %w", err)
}

// build contract <-> sector links
var contractSectors []dbContractSector
for i, shard := range s.Shards {
sectorID := sectorIDs[i]

// ensure the associations are updated
for _, fcids := range shard.Contracts {
for _, fcid := range fcids {
if _, ok := contracts[fcid]; ok {
contractSectors = append(contractSectors, dbContractSector{
DBSectorID: sectorID,
DBContractID: contracts[fcid].ID,
})
}
}
}
}

// if there are no associations we are done
if len(contractSectors) == 0 {
return nil
}

// create associations
if err := tx.Table("contract_sectors").
Clauses(clause.OnConflict{
DoNothing: true,
}).
Create(&contractSectors).Error; err != nil {
return err
}
return nil
return ss.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error {
return tx.UpdateSlab(ctx, s, contractSet, s.Contracts())
})
}

Expand Down
5 changes: 3 additions & 2 deletions stores/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/config"
isql "go.sia.tech/renterd/internal/sql"
"go.sia.tech/renterd/internal/test"
"go.sia.tech/renterd/object"
sql "go.sia.tech/renterd/stores/sql"
Expand Down Expand Up @@ -3797,7 +3798,7 @@ func TestUpdateSlabSanityChecks(t *testing.T) {
if err := ss.UpdateSlab(context.Background(), object.Slab{
Key: slab.Key,
Shards: shards[:len(shards)-1],
}, testContractSet); !errors.Is(err, errInvalidNumberOfShards) {
}, testContractSet); !errors.Is(err, isql.ErrInvalidNumberOfShards) {
t.Fatal(err)
}

Expand All @@ -3811,7 +3812,7 @@ func TestUpdateSlabSanityChecks(t *testing.T) {
Key: slab.Key,
Shards: reversedShards,
}
if err := ss.UpdateSlab(context.Background(), reversedSlab, testContractSet); !errors.Is(err, errShardRootChanged) {
if err := ss.UpdateSlab(context.Background(), reversedSlab, testContractSet); !errors.Is(err, isql.ErrShardRootChanged) {
t.Fatal(err)
}
}
Expand Down
2 changes: 0 additions & 2 deletions stores/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,8 +559,6 @@ func (s *SQLStore) retryTransaction(ctx context.Context, fc func(tx *gorm.DB) er
errors.Is(err, context.Canceled) ||
errors.Is(err, context.DeadlineExceeded) ||
errors.Is(err, gorm.ErrRecordNotFound) ||
errors.Is(err, errInvalidNumberOfShards) ||
errors.Is(err, errShardRootChanged) ||
errors.Is(err, api.ErrContractNotFound) ||
errors.Is(err, api.ErrObjectNotFound) ||
errors.Is(err, api.ErrObjectCorrupted) ||
Expand Down
9 changes: 9 additions & 0 deletions stores/sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"io"

"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/object"
)
Expand Down Expand Up @@ -61,6 +62,14 @@ type (
// object already exists with the new prefix, `api.ErrObjectExists` is
// returned.
RenameObjects(ctx context.Context, bucket, prefixOld, prefixNew string, dirID int64, force bool) error

// UpdateSlab updates the slab in the database. That includes the following:
// - Optimistically set health to 100%
// - Invalidate health_valid_until
// - Update LatestHost for every shard
// The operation is not allowed to update the number of shards
// associated with a slab or the root/slabIndex of any shard.
UpdateSlab(ctx context.Context, s object.Slab, contractSet string, usedContracts []types.FileContractID) error
}

MetricsDatabase interface {
Expand Down
Loading

0 comments on commit 155e688

Please sign in to comment.