Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate UpdateSlab to raw SQL #1249

Merged
merged 3 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions internal/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ const (
)

var (
ErrInvalidNumberOfShards = errors.New("slab has invalid number of shards")
ErrShardRootChanged = errors.New("shard root changed")

ErrRunV072 = errors.New("can't upgrade to >=v1.0.0 from your current version - please upgrade to v0.7.2 first (https://github.com/SiaFoundation/renterd/releases/tag/v0.7.2)")
ErrMySQLNoSuperPrivilege = errors.New("You do not have the SUPER privilege and binary logging is enabled")
)
Expand Down
16 changes: 14 additions & 2 deletions object/slab.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,20 @@ func ContractsFromShards(shards []Sector) map[types.PublicKey]map[types.FileCont
return usedContracts
}

func (s Slab) Contracts() map[types.PublicKey]map[types.FileContractID]struct{} {
return ContractsFromShards(s.Shards)
func (s Slab) Contracts() []types.FileContractID {
var usedContracts []types.FileContractID
added := make(map[types.FileContractID]struct{})
for _, shard := range s.Shards {
for _, fcids := range shard.Contracts {
for _, fcid := range fcids {
if _, exists := added[fcid]; !exists {
usedContracts = append(usedContracts, fcid)
added[fcid] = struct{}{}
}
}
}
}
return usedContracts
}

// Length returns the length of the raw data stored in s.
Expand Down
112 changes: 3 additions & 109 deletions stores/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ var (
)

var (
errInvalidNumberOfShards = errors.New("slab has invalid number of shards")
errShardRootChanged = errors.New("shard root changed")

objectDeleteBatchSizes = []int64{10, 50, 100, 200, 500, 1000, 5000, 10000, 50000, 100000}
)

Expand Down Expand Up @@ -1391,7 +1388,7 @@ GROUP BY d.id
}

func (s *SQLStore) Object(ctx context.Context, bucket, path string) (obj api.Object, err error) {
err = s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
err = s.retryTransaction(ctx, func(tx *gorm.DB) error {
obj, err = s.object(tx, bucket, path)
return err
})
Expand Down Expand Up @@ -1913,112 +1910,9 @@ func (ss *SQLStore) UpdateSlab(ctx context.Context, s object.Slab, contractSet s
}
}

// extract the slab key
key, err := s.Key.MarshalBinary()
if err != nil {
return err
}

// collect all used contracts
usedContracts := s.Contracts()

// Update slab.
return ss.retryTransaction(ctx, func(tx *gorm.DB) (err error) {
// update slab
if err := tx.Model(&dbSlab{}).
Where("key", key).
Updates(map[string]interface{}{
"db_contract_set_id": gorm.Expr("(SELECT id FROM contract_sets WHERE name = ?)", contractSet),
"health_valid_until": time.Now().Unix(),
"health": 1,
}).
Error; err != nil {
return err
}

// find all used contracts
contracts, err := fetchUsedContracts(tx, usedContracts)
if err != nil {
return err
}

// find existing slab
var slab dbSlab
if err = tx.
Where(&dbSlab{Key: key}).
Preload("Shards").
Take(&slab).
Error; err == gorm.ErrRecordNotFound {
return fmt.Errorf("slab with key '%s' not found: %w", string(key), err)
} else if err != nil {
return err
}

// make sure the number of shards doesn't change.
// NOTE: check both the slice as well as the TotalShards field to be
// safe.
if len(s.Shards) != int(slab.TotalShards) {
return fmt.Errorf("%w: expected %v shards (TotalShards) but got %v", errInvalidNumberOfShards, slab.TotalShards, len(s.Shards))
} else if len(s.Shards) != len(slab.Shards) {
return fmt.Errorf("%w: expected %v shards (Shards) but got %v", errInvalidNumberOfShards, len(slab.Shards), len(s.Shards))
}

// make sure the roots stay the same.
for i, shard := range s.Shards {
if shard.Root != types.Hash256(slab.Shards[i].Root) {
return fmt.Errorf("%w: shard %v has changed root from %v to %v", errShardRootChanged, i, slab.Shards[i].Root, shard.Root[:])
}
}

// prepare sectors to update
sectors := make([]dbSector, len(s.Shards))
for i := range s.Shards {
sectors[i] = dbSector{
DBSlabID: slab.ID,
SlabIndex: i + 1,
LatestHost: publicKey(s.Shards[i].LatestHost),
Root: s.Shards[i].Root[:],
}
}

// ensure the sectors exists
sectorIDs, err := upsertSectors(tx, sectors)
if err != nil {
return fmt.Errorf("failed to create sector: %w", err)
}

// build contract <-> sector links
var contractSectors []dbContractSector
for i, shard := range s.Shards {
sectorID := sectorIDs[i]

// ensure the associations are updated
for _, fcids := range shard.Contracts {
for _, fcid := range fcids {
if _, ok := contracts[fcid]; ok {
contractSectors = append(contractSectors, dbContractSector{
DBSectorID: sectorID,
DBContractID: contracts[fcid].ID,
})
}
}
}
}

// if there are no associations we are done
if len(contractSectors) == 0 {
return nil
}

// create associations
if err := tx.Table("contract_sectors").
Clauses(clause.OnConflict{
DoNothing: true,
}).
Create(&contractSectors).Error; err != nil {
return err
}
return nil
return ss.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error {
return tx.UpdateSlab(ctx, s, contractSet, s.Contracts())
})
}

Expand Down
5 changes: 3 additions & 2 deletions stores/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/config"
isql "go.sia.tech/renterd/internal/sql"
"go.sia.tech/renterd/internal/test"
"go.sia.tech/renterd/object"
sql "go.sia.tech/renterd/stores/sql"
Expand Down Expand Up @@ -3797,7 +3798,7 @@ func TestUpdateSlabSanityChecks(t *testing.T) {
if err := ss.UpdateSlab(context.Background(), object.Slab{
Key: slab.Key,
Shards: shards[:len(shards)-1],
}, testContractSet); !errors.Is(err, errInvalidNumberOfShards) {
}, testContractSet); !errors.Is(err, isql.ErrInvalidNumberOfShards) {
t.Fatal(err)
}

Expand All @@ -3811,7 +3812,7 @@ func TestUpdateSlabSanityChecks(t *testing.T) {
Key: slab.Key,
Shards: reversedShards,
}
if err := ss.UpdateSlab(context.Background(), reversedSlab, testContractSet); !errors.Is(err, errShardRootChanged) {
if err := ss.UpdateSlab(context.Background(), reversedSlab, testContractSet); !errors.Is(err, isql.ErrShardRootChanged) {
t.Fatal(err)
}
}
Expand Down
2 changes: 0 additions & 2 deletions stores/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,8 +559,6 @@ func (s *SQLStore) retryTransaction(ctx context.Context, fc func(tx *gorm.DB) er
errors.Is(err, context.Canceled) ||
errors.Is(err, context.DeadlineExceeded) ||
errors.Is(err, gorm.ErrRecordNotFound) ||
errors.Is(err, errInvalidNumberOfShards) ||
errors.Is(err, errShardRootChanged) ||
errors.Is(err, api.ErrContractNotFound) ||
errors.Is(err, api.ErrObjectNotFound) ||
errors.Is(err, api.ErrObjectCorrupted) ||
Expand Down
9 changes: 9 additions & 0 deletions stores/sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"io"

"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/object"
)
Expand Down Expand Up @@ -61,6 +62,14 @@ type (
// object already exists with the new prefix, `api.ErrObjectExists` is
// returned.
RenameObjects(ctx context.Context, bucket, prefixOld, prefixNew string, dirID int64, force bool) error

// UpdateSlab updates the slab in the database. That includes the following:
// - Optimistically set health to 100%
// - Invalidate health_valid_until
// - Update LatestHost for every shard
// The operation is not allowed to update the number of shards
// associated with a slab or the root/slabIndex of any shard.
UpdateSlab(ctx context.Context, s object.Slab, contractSet string, usedContracts []types.FileContractID) error
}

MetricsDatabase interface {
Expand Down
Loading
Loading