Skip to content

Commit

Permalink
Merge pull request #1009 from SiaFoundation/chris/refresh-health-lockup
Browse files Browse the repository at this point in the history
Fix lock contention in RefreshHealth
  • Loading branch information
ChrisSchinnerl committed Feb 29, 2024
2 parents f0cbd63 + d320125 commit 85141d3
Showing 1 changed file with 51 additions and 49 deletions.
100 changes: 51 additions & 49 deletions stores/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -1845,13 +1845,19 @@ func (ss *SQLStore) UpdateSlab(ctx context.Context, s object.Slab, contractSet s

// Update slab.
return ss.retryTransaction(func(tx *gorm.DB) (err error) {
// fetch contract set
var cs dbContractSet
if err := tx.Take(&cs, "name = ?", contractSet).Error; err != nil {
// update slab
if err := tx.Model(&dbSlab{}).
Where("key", key).
Updates(map[string]interface{}{
"db_contract_set_id": gorm.Expr("(SELECT id FROM contract_sets WHERE name = ?)", contractSet),
"health_valid_until": time.Now().Unix(),
"health": 1,
}).
Error; err != nil {
return err
}

// find all contracts of that shard
// find all used contracts
contracts, err := fetchUsedContracts(tx, usedContracts)
if err != nil {
return err
Expand Down Expand Up @@ -1885,18 +1891,6 @@ func (ss *SQLStore) UpdateSlab(ctx context.Context, s object.Slab, contractSet s
}
}

// update fields
if err := tx.Model(&slab).
Where(&slab).
Updates(map[string]interface{}{
"db_contract_set_id": cs.ID,
"health_valid_until": time.Now().Unix(),
"health": 1,
}).
Error; err != nil {
return err
}

// prepare sectors to update
sectors := make([]dbSector, len(s.Shards))
for i := range s.Shards {
Expand Down Expand Up @@ -1961,8 +1955,8 @@ func (s *SQLStore) RefreshHealth(ctx context.Context) error {
// Update slab health in batches.
now := time.Now()

for {
healthQuery := s.db.Raw(`
// build health query
healthQuery := s.db.Raw(`
SELECT slabs.id, slabs.db_contract_set_id, CASE WHEN (slabs.min_shards = slabs.total_shards)
THEN
CASE WHEN (COUNT(DISTINCT(CASE WHEN cs.name IS NULL THEN NULL ELSE c.host_id END)) < slabs.min_shards)
Expand All @@ -1981,50 +1975,58 @@ WHERE slabs.health_valid_until <= ?
GROUP BY slabs.id
LIMIT ?
`, now.Unix(), refreshHealthBatchSize)

for {
var rowsAffected int64
err := s.retryTransaction(func(tx *gorm.DB) error {
// create temp table from the health query since we will reuse it
if err := tx.Exec("DROP TABLE IF EXISTS src").Error; err != nil {
return err
} else if err = tx.Exec("CREATE TEMPORARY TABLE src AS ?", healthQuery).Error; err != nil {
return err
} else if err = tx.Exec("CREATE INDEX src_id ON src (id)").Error; err != nil {
return err
}

var res *gorm.DB
if isSQLite(s.db) {
res = tx.Exec("UPDATE slabs SET health = src.health, health_valid_until = (?) FROM src WHERE slabs.id=src.id", sqlRandomTimestamp(s.db, now, refreshHealthMinHealthValidity, refreshHealthMaxHealthValidity))
res = tx.Exec("UPDATE slabs SET health = inner.health, health_valid_until = (?) FROM (?) AS inner WHERE slabs.id=inner.id", sqlRandomTimestamp(s.db, now, refreshHealthMinHealthValidity, refreshHealthMaxHealthValidity), healthQuery)
} else {
res = tx.Exec("UPDATE slabs sla INNER JOIN src h ON sla.id = h.id SET sla.health = h.health, health_valid_until = (?)", sqlRandomTimestamp(s.db, now, refreshHealthMinHealthValidity, refreshHealthMaxHealthValidity))
res = tx.Exec("UPDATE slabs sla INNER JOIN (?) h ON sla.id = h.id SET sla.health = h.health, health_valid_until = (?)", healthQuery, sqlRandomTimestamp(s.db, now, refreshHealthMinHealthValidity, refreshHealthMaxHealthValidity))
}
if res.Error != nil {
return res.Error
}
rowsAffected = res.RowsAffected

// Update the health of the objects associated with the updated slabs.
if isSQLite(s.db) {
return tx.Exec(`UPDATE objects SET health = i.health FROM (
SELECT slices.db_object_id, MIN(s.health) AS health
FROM slices
INNER JOIN src s ON s.id = slices.db_slab_id
INNER JOIN objects o ON o.id = slices.db_object_id
GROUP BY slices.db_object_id
) i
WHERE i.db_object_id = objects.id AND objects.health != i.health`).Error
// Update the health of objects with outdated health.
var err error
if isSQLite(tx) {
err = tx.Exec(`
UPDATE objects
SET health = (
SELECT MIN(slabs.health)
FROM slabs
INNER JOIN slices ON slices.db_slab_id = slabs.id
INNER JOIN objects ON slices.db_object_id = objects.id
)
WHERE EXISTS (
SELECT 1 FROM slabs
INNER JOIN slices ON slices.db_slab_id = slabs.id
INNER JOIN objects ON slices.db_object_id = objects.id
WHERE slabs.health < objects.health
)
`).Error
} else {
return tx.Exec(`UPDATE objects
INNER JOIN (
SELECT slices.db_object_id, MIN(s.health) as health
FROM slices
INNER JOIN src s ON s.id = slices.db_slab_id
GROUP BY slices.db_object_id
) i ON objects.id = i.db_object_id
SET objects.health = i.health
WHERE objects.health != i.health
`).Error
err = tx.Exec(`
UPDATE objects
JOIN (
SELECT slices.db_object_id, MIN(slabs.health) AS min_health
FROM slabs
INNER JOIN slices ON slices.db_slab_id = slabs.id
GROUP BY slices.db_object_id
) AS min_healths ON objects.id = min_healths.db_object_id
SET objects.health = min_healths.min_health
WHERE objects.health > (
SELECT MIN(slabs.health)
FROM slabs
INNER JOIN slices ON slices.db_slab_id = slabs.id
WHERE slices.db_object_id = objects.id
);
`).Error
}
return err
})
if err != nil {
return err
Expand Down

0 comments on commit 85141d3

Please sign in to comment.