Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create slabs and sectors in batches when inserting objects #879

Merged
merged 14 commits into from Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
232 changes: 155 additions & 77 deletions stores/metadata.go
Expand Up @@ -27,6 +27,9 @@ const (
// 10/30 erasure coding and takes <1s to execute on an SSD in SQLite.
refreshHealthBatchSize = 10000

sectorInsertionBatchSize = 500
sectorQueryBatchSize = 100

refreshHealthMinHealthValidity = 12 * time.Hour
refreshHealthMaxHealthValidity = 72 * time.Hour
)
Expand Down Expand Up @@ -1924,34 +1927,53 @@ func (ss *SQLStore) UpdateSlab(ctx context.Context, s object.Slab, contractSet s
return err
}

// loop updated shards
for i, shard := range s.Shards {
// ensure the sector exists
sector := dbSector{
// prepare sectors to update
sectors := make([]dbSector, len(s.Shards))
for i := range s.Shards {
sectors[i] = dbSector{
DBSlabID: slab.ID,
SlabIndex: i + 1,
LatestHost: publicKey(shard.LatestHost),
Root: shard.Root[:],
}
if err := createOrUpdateSector(tx, &sector); err != nil {
return fmt.Errorf("failed to create sector: %w", err)
LatestHost: publicKey(s.Shards[i].LatestHost),
Root: s.Shards[i].Root[:],
}
}

// ensure the sectors exists
sectors, err = upsertSectors(tx, sectors)
if err != nil {
return fmt.Errorf("failed to create sector: %w", err)
}

// build contract <-> sector links
var contractSectors []dbContractSector
for i, shard := range s.Shards {
sector := sectors[i]

// ensure the associations are updated
var associatedContracts []dbContract
for _, fcids := range shard.Contracts {
for _, fcid := range fcids {
if _, ok := contracts[fcid]; ok {
associatedContracts = append(associatedContracts, contracts[fcid])
contractSectors = append(contractSectors, dbContractSector{
DBSectorID: sector.ID,
DBContractID: contracts[fcid].ID,
})
}
}
}
if err := tx.
Model(&sector).
Association("Contracts").
Append(&associatedContracts); err != nil {
return err
}
}

// if there are no associations we are done
if len(contractSectors) == 0 {
return nil
}

// create associations
if err := tx.Table("contract_sectors").
Clauses(clause.OnConflict{
DoNothing: true,
}).
Create(&contractSectors).Error; err != nil {
return err
}
return nil
})
Expand Down Expand Up @@ -2128,79 +2150,126 @@ func (s *SQLStore) updateUserMetadata(tx *gorm.DB, objID uint, metadata api.Obje
func (s *SQLStore) createSlices(tx *gorm.DB, objID, multiPartID *uint, contractSetID uint, contracts map[types.FileContractID]dbContract, slices []object.SlabSlice) error {
if (objID == nil && multiPartID == nil) || (objID != nil && multiPartID != nil) {
return fmt.Errorf("either objID or multiPartID must be set")
} else if len(slices) == 0 {
return nil // nothing to do
}

for i, ss := range slices {
// create Slab if it doesn't exist yet
slabKey, err := ss.Key.MarshalBinary()
// build slabs
slabs := make([]dbSlab, len(slices))
for i := range slices {
slabKey, err := slices[i].Key.MarshalBinary()
if err != nil {
return fmt.Errorf("failed to marshal slab key: %w", err)
}
slab := &dbSlab{
slabs[i] = dbSlab{
Key: slabKey,
DBContractSetID: contractSetID,
MinShards: ss.MinShards,
TotalShards: uint8(len(ss.Shards)),
MinShards: slices[i].MinShards,
TotalShards: uint8(len(slices[i].Shards)),
}
err = tx.
Where(dbSlab{Key: slabKey}).
Clauses(clause.OnConflict{
DoNothing: true,
}).
Create(&slab).Error
if err != nil {
return fmt.Errorf("failed to create slab %v/%v: %w", i+1, len(slices), err)
} else if slab.DBContractSetID != contractSetID {
return fmt.Errorf("slab already exists in another contract set %v != %v", slab.DBContractSetID, contractSetID)
} else if slab.ID == 0 {
// if it already exists, fetch it
if err := tx.Where(dbSlab{Key: slabKey}).Take(&slab).Error; err != nil {
return fmt.Errorf("failed to fetch slab: %w", err)
}
}

// create slabs that don't exist yet
err := tx.
Clauses(clause.OnConflict{
DoNothing: true,
Columns: []clause.Column{{Name: "key"}},
}).
Create(&slabs).Error
if err != nil {
return fmt.Errorf("failed to create slabs %w", err)
}

// fetch the upserted slabs
for i := range slabs {
if err := tx.Raw("SELECT * FROM slabs WHERE `key` = ?", slabs[i].Key).
Scan(&slabs[i]).
Error; err != nil {
return fmt.Errorf("failed to fetch slab: %w", err)
} else if slabs[i].DBContractSetID != contractSetID {
return fmt.Errorf("slab already exists in another contract set %v != %v", slabs[i].DBContractSetID, contractSetID)
}
}

// Create Slice.
slice := dbSlice{
// build slices
dbSlices := make([]dbSlice, len(slices))
for i := range slices {
slab := slabs[i]
dbSlices[i] = dbSlice{
DBSlabID: slab.ID,
DBObjectID: objID,
ObjectIndex: uint(i + 1),
DBMultipartPartID: multiPartID,
Offset: ss.Offset,
Length: ss.Length,
}
err = tx.Create(&slice).Error
if err != nil {
return fmt.Errorf("failed to create slice %v/%v: %w", i+1, len(slices), err)
Offset: slices[i].Offset,
Length: slices[i].Length,
}
}

// if there are no slices we are done
if len(dbSlices) == 0 {
return nil
}

// create slices
err = tx.Create(&dbSlices).Error
if err != nil {
return fmt.Errorf("failed to create slice %w", err)
}

for j, shard := range ss.Shards {
sector := dbSector{
// build sectors
var sectors []dbSector
for i, ss := range slices {
slab := slabs[i]
for j := range ss.Shards {
sectors = append(sectors, dbSector{
DBSlabID: slab.ID,
SlabIndex: j + 1,
LatestHost: publicKey(shard.LatestHost),
Root: shard.Root[:],
}
// create sector if it doesn't exist yet
if err := createOrUpdateSector(tx, &sector); err != nil {
return fmt.Errorf("failed to create sector %v/%v: %w", j+1, len(ss.Shards), err)
}
// Add contract and host to join tables.
var associatedContracts []dbContract
LatestHost: publicKey(ss.Shards[j].LatestHost),
Root: ss.Shards[j].Root[:],
})
}
}

// create sector that don't exist yet
sectors, err = upsertSectors(tx, sectors)
if err != nil {
return fmt.Errorf("failed to create sectors: %w", err)
}

// build contract <-> sector links
sectorIdx := 0
var contractSectors []dbContractSector
for _, ss := range slices {
for _, shard := range ss.Shards {
sector := sectors[sectorIdx]
for _, fcids := range shard.Contracts {
for _, fcid := range fcids {
if _, ok := contracts[fcid]; ok {
associatedContracts = append(associatedContracts, contracts[fcid])
contractSectors = append(contractSectors, dbContractSector{
ChrisSchinnerl marked this conversation as resolved.
Show resolved Hide resolved
DBSectorID: sector.ID,
DBContractID: contracts[fcid].ID,
})
}
}
}
if err := tx.
Model(&sector).
Association("Contracts").
Append(&associatedContracts); err != nil {
return err
}
sectorIdx++
}
}

// if there are no associations we are done
if len(contractSectors) == 0 {
return nil
}

// create associations
if err := tx.
Table("contract_sectors").
Clauses(clause.OnConflict{
DoNothing: true,
}).
CreateInBatches(&contractSectors, sectorInsertionBatchSize).Error; err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -3011,26 +3080,35 @@ func validateSort(sortBy, sortDir string) error {
return nil
}

// createOrUpdateSector creates a sector or updates it if it exists already. The
// upsertSectors creates a sector or updates it if it exists already. The
// resulting ID is set on the input sector.
// NOTE: don't rely on any other fields of the returned sector than the ID
func createOrUpdateSector(tx *gorm.DB, sector *dbSector) error {
func upsertSectors(tx *gorm.DB, sectors []dbSector) ([]dbSector, error) {
if len(sectors) == 0 {
return nil, nil // nothing to do
}
err := tx.
Where(dbSector{Root: sector.Root[:]}).
Clauses(clause.OnConflict{
UpdateAll: true,
Columns: []clause.Column{{Name: "root"}},
}).
Create(&sector).
CreateInBatches(&sectors, sectorInsertionBatchSize).
Error
if err != nil {
return err
} else if sector.ID == 0 {
// if it already exists, fetch it - this fallback is needed for MySQL
// since it doesn't support returning the ID on conflict
if err := tx.Where(dbSector{Root: sector.Root[:]}).Take(&sector).Error; err != nil {
return err
}
return nil, err
}
return nil
// fetch the upserted sectors
roots := make([][]byte, len(sectors))
for i, sector := range sectors {
roots[i] = sector.Root[:]
}
sectors = sectors[:0]

var batch []dbSector
if err := tx.Where("root IN (?)", roots).FindInBatches(&batch, sectorQueryBatchSize, func(tx *gorm.DB, _ int) error {
sectors = append(sectors, batch...)
return nil
}).Error; err != nil {
return nil, err
}
return sectors, nil
}
24 changes: 18 additions & 6 deletions stores/metrics.go
Expand Up @@ -245,7 +245,9 @@ func (s *SQLStore) RecordContractMetric(ctx context.Context, metrics ...api.Cont
ListSpendingHi: unsigned64(metric.ListSpending.Hi),
}
}
return s.dbMetrics.Create(&dbMetrics).Error
return s.dbMetrics.Transaction(func(tx *gorm.DB) error {
return tx.Create(&dbMetrics).Error
})
}

func (s *SQLStore) RecordContractPruneMetric(ctx context.Context, metrics ...api.ContractPruneMetric) error {
Expand All @@ -263,7 +265,9 @@ func (s *SQLStore) RecordContractPruneMetric(ctx context.Context, metrics ...api
Duration: metric.Duration,
}
}
return s.dbMetrics.Create(&dbMetrics).Error
return s.dbMetrics.Transaction(func(tx *gorm.DB) error {
return tx.Create(&dbMetrics).Error
})
}

func (s *SQLStore) RecordContractSetChurnMetric(ctx context.Context, metrics ...api.ContractSetChurnMetric) error {
Expand All @@ -277,7 +281,9 @@ func (s *SQLStore) RecordContractSetChurnMetric(ctx context.Context, metrics ...
Timestamp: unixTimeMS(metric.Timestamp),
}
}
return s.dbMetrics.Create(&dbMetrics).Error
return s.dbMetrics.Transaction(func(tx *gorm.DB) error {
return tx.Create(&dbMetrics).Error
})
}

func (s *SQLStore) RecordContractSetMetric(ctx context.Context, metrics ...api.ContractSetMetric) error {
Expand All @@ -289,7 +295,9 @@ func (s *SQLStore) RecordContractSetMetric(ctx context.Context, metrics ...api.C
Timestamp: unixTimeMS(metric.Timestamp),
}
}
return s.dbMetrics.Create(&dbMetrics).Error
return s.dbMetrics.Transaction(func(tx *gorm.DB) error {
return tx.Create(&dbMetrics).Error
})
}

func (s *SQLStore) RecordWalletMetric(ctx context.Context, metrics ...api.WalletMetric) error {
Expand All @@ -305,7 +313,9 @@ func (s *SQLStore) RecordWalletMetric(ctx context.Context, metrics ...api.Wallet
UnconfirmedHi: unsigned64(metric.Unconfirmed.Hi),
}
}
return s.dbMetrics.Create(&dbMetrics).Error
return s.dbMetrics.Transaction(func(tx *gorm.DB) error {
return tx.Create(&dbMetrics).Error
})
}

func (s *SQLStore) RecordPerformanceMetric(ctx context.Context, metrics ...api.PerformanceMetric) error {
Expand All @@ -319,7 +329,9 @@ func (s *SQLStore) RecordPerformanceMetric(ctx context.Context, metrics ...api.P
Timestamp: unixTimeMS(metric.Timestamp),
}
}
return s.dbMetrics.Create(dbMetrics).Error
return s.dbMetrics.Transaction(func(tx *gorm.DB) error {
return tx.Create(&dbMetrics).Error
})
}

func (s *SQLStore) WalletMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.WalletMetricsQueryOpts) ([]api.WalletMetric, error) {
Expand Down
4 changes: 3 additions & 1 deletion stores/sql.go
Expand Up @@ -179,7 +179,9 @@ func NewSQLStore(cfg Config) (*SQLStore, modules.ConsensusChangeID, error) {
return nil, modules.ConsensusChangeID{}, fmt.Errorf("failed to create partial slab dir: %v", err)
}
db, err := gorm.Open(cfg.Conn, &gorm.Config{
Logger: cfg.GormLogger, // custom logger
Logger: cfg.GormLogger, // custom logger
SkipDefaultTransaction: true,
DisableNestedTransaction: true,
})
if err != nil {
return nil, modules.ConsensusChangeID{}, fmt.Errorf("failed to open SQL db")
Expand Down