Skip to content

Commit

Permalink
Merge pull request #786 from SiaFoundation/chris/refactor-buffered-slabs
Browse files Browse the repository at this point in the history
Reduce database interactions when adding partial slab
  • Loading branch information
ChrisSchinnerl committed Dec 6, 2023
2 parents 278b237 + 82d03b4 commit 199c12a
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 91 deletions.
2 changes: 0 additions & 2 deletions stores/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,7 @@ type (

DBSlab dbSlab

Complete bool `gorm:"index"`
Filename string
Size int64
}

dbSector struct {
Expand Down
13 changes: 11 additions & 2 deletions stores/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2773,11 +2773,20 @@ func TestPartialSlab(t *testing.T) {
}

// Add 2 more partial slabs.
_, _, err = ss.AddPartialSlab(ctx, frand.Bytes(rhpv2.SectorSize/2), 1, 2, testContractSet)
slices1, _, err := ss.AddPartialSlab(ctx, frand.Bytes(rhpv2.SectorSize/2), 1, 2, testContractSet)
if err != nil {
t.Fatal(err)
}
_, bufferSize, err = ss.AddPartialSlab(ctx, frand.Bytes(rhpv2.SectorSize/2), 1, 2, testContractSet)
slices2, bufferSize, err := ss.AddPartialSlab(ctx, frand.Bytes(rhpv2.SectorSize/2), 1, 2, testContractSet)
if err != nil {
t.Fatal(err)
}

// Associate them with an object.
err = ss.UpdateObject(context.Background(), api.DefaultBucketName, "foo", testContractSet, "", "", object.Object{
Key: object.GenerateEncryptionKey(),
Slabs: append(slices1, slices2...),
})
if err != nil {
t.Fatal(err)
}
Expand Down
22 changes: 22 additions & 0 deletions stores/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,12 @@ func performMigrations(db *gorm.DB, logger *zap.SugaredLogger) error {
return performMigration00034_objectHealth(tx, logger)
},
},
{
ID: "00035_bufferedSlabsDropSizeAndComplete",
Migrate: func(tx *gorm.DB) error {
return performMigration00035_bufferedSlabsDropSizeAndComplete(tx, logger)
},
},
}
// Create migrator.
m := gormigrate.New(db, gormigrate.DefaultOptions, migrations)
Expand Down Expand Up @@ -1377,3 +1383,19 @@ func performMigration00034_objectHealth(txn *gorm.DB, logger *zap.SugaredLogger)
logger.Info("migration 00034_objectHealth complete")
return nil
}

func performMigration00035_bufferedSlabsDropSizeAndComplete(txn *gorm.DB, logger *zap.SugaredLogger) error {
logger.Info("performing migration 00035_bufferedSlabsDropSizeAndComplete")
if txn.Migrator().HasColumn(&dbBufferedSlab{}, "size") {
if err := txn.Migrator().DropColumn(&dbBufferedSlab{}, "size"); err != nil {
return err
}
}
if txn.Migrator().HasColumn(&dbBufferedSlab{}, "complete") {
if err := txn.Migrator().DropColumn(&dbBufferedSlab{}, "complete"); err != nil {
return err
}
}
logger.Info("migration 00035_bufferedSlabsDropSizeAndComplete complete")
return nil
}
104 changes: 17 additions & 87 deletions stores/slabbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,10 @@ type SlabBuffer struct {
slabKey object.EncryptionKey
maxSize int64

dbMu sync.Mutex

mu sync.Mutex
file *os.File
lockedUntil time.Time
size int64
dbSize int64
syncErr error
}

Expand Down Expand Up @@ -104,19 +101,29 @@ func newSlabBufferManager(sqlStore *SQLStore, slabBufferCompletionThreshold int6
sqlStore.logger.Errorf("failed to open buffer file %v for slab %v: %v", buffer.Filename, buffer.DBSlab.Key, err)
continue
}
// Get the size of the buffer by looking at all slices using it
var size int64
err = sqlStore.db.Model(&dbSlab{}).
Joins("INNER JOIN slices sli ON slabs.id = sli.db_slab_id").
Select("COALESCE(MAX(offset+length), 0) as Size").
Where("slabs.db_buffered_slab_id = ?", buffer.ID).
Scan(&size).
Error
if err != nil {
return nil, err
}
// Create the slab buffer.
sb := &SlabBuffer{
dbID: buffer.ID,
filename: buffer.Filename,
slabKey: ec,
maxSize: int64(bufferedSlabSize(buffer.DBSlab.MinShards)),
file: file,
dbSize: buffer.Size,
size: buffer.Size,
size: size,
}
// Add the buffer to the manager.
gid := bufferGID(buffer.DBSlab.MinShards, buffer.DBSlab.TotalShards, uint32(buffer.DBSlab.DBContractSetID))
if buffer.Complete {
if size >= int64(sb.maxSize-slabBufferCompletionThreshold) {
mgr.completeBuffers[gid] = append(mgr.completeBuffers[gid], sb)
} else {
mgr.incompleteBuffers[gid] = append(mgr.incompleteBuffers[gid], sb)
Expand Down Expand Up @@ -219,80 +226,19 @@ func (mgr *SlabBufferManager) AddPartialSlab(ctx context.Context, data []byte, m
}

// Commit all used buffers to disk.
type dbUpdate struct {
complete bool
syncSize int64
buffer *SlabBuffer
}
var dbUpdates []dbUpdate
for _, buffer := range usedBuffers {
syncSize, complete, err := buffer.commitAppend(mgr.bufferedSlabCompletionThreshold)
complete, err := buffer.commitAppend(mgr.bufferedSlabCompletionThreshold)
if err != nil {
return nil, 0, err
}
// Move the buffer from incomplete to complete if it is now complete.
if complete {
mgr.markBufferComplete(buffer, gid)
}
// Remember to update the db with the new size if necessary.
dbUpdates = append(dbUpdates, dbUpdate{
buffer: buffer,
complete: complete,
syncSize: syncSize,
})
}

// Update size field in db. Since multiple threads might be trying to do
// this, the operation is associative.
for _, update := range dbUpdates {
err = func() error {
// Make sure only one thread can update the entry for a buffer at a
// time.
update.buffer.dbMu.Lock()
defer update.buffer.dbMu.Unlock()

// Since the order in which threads arrive here is not deterministic
// there is a chance that we don't need to perform this update
// because a larger size was already written to the db.
if !update.buffer.requiresDBUpdate() {
return nil
}
err = mgr.s.retryTransaction(func(tx *gorm.DB) error {
return update.buffer.saveSize(tx, update.syncSize, update.complete)
})
if err != nil {
return err
}
// Update the dbSize field in the buffer to the size we just wrote to the
// db. This also needs to be associative.
update.buffer.updateDBSize(update.syncSize)
return nil
}()
if err != nil {
return nil, 0, fmt.Errorf("failed to update size/complete in db: %w", err)
}
}

return slabs, mgr.BufferSize(gid), nil
}

func (b *SlabBuffer) saveSize(tx *gorm.DB, size int64, complete bool) error {
maxOp := "GREATEST"
if isSQLite(tx) {
maxOp = "MAX"
}
if err := tx.Model(&dbBufferedSlab{}).
Where("id", b.dbID).
Updates(map[string]interface{}{
"complete": gorm.Expr("complete OR ?", complete),
"size": gorm.Expr(maxOp+"(size, ?)", size),
}).
Error; err != nil {
return fmt.Errorf("failed to update buffered slab %v in database: %v", b.dbID, err)
}
return nil
}

func (mgr *SlabBufferManager) BufferSize(gid bufferGroupID) (total int64) {
mgr.mu.Lock()
defer mgr.mu.Unlock()
Expand Down Expand Up @@ -467,13 +413,13 @@ func (buf *SlabBuffer) recordAppend(data []byte, mustFit bool, minShards uint8)
}
}

func (buf *SlabBuffer) commitAppend(completionThreshold int64) (int64, bool, error) {
func (buf *SlabBuffer) commitAppend(completionThreshold int64) (bool, error) {
// Fetch the current size first. We know that we have at least synced the
// buffer up to this point upon success.
buf.mu.Lock()
if buf.syncErr != nil {
buf.mu.Unlock()
return 0, false, buf.syncErr
return false, buf.syncErr
}
syncSize := buf.size
buf.mu.Unlock()
Expand All @@ -484,21 +430,7 @@ func (buf *SlabBuffer) commitAppend(completionThreshold int64) (int64, bool, err
buf.mu.Lock()
defer buf.mu.Unlock()
buf.syncErr = err
return syncSize, syncSize >= buf.maxSize-completionThreshold, err
}

func (buf *SlabBuffer) requiresDBUpdate() bool {
buf.mu.Lock()
defer buf.mu.Unlock()
return buf.size > buf.dbSize
}

func (buf *SlabBuffer) updateDBSize(size int64) {
buf.mu.Lock()
if size > buf.dbSize {
buf.dbSize = size
}
buf.mu.Unlock()
return syncSize+completionThreshold >= buf.maxSize, err
}

func (mgr *SlabBufferManager) markBufferComplete(buffer *SlabBuffer, gid bufferGroupID) {
Expand Down Expand Up @@ -543,8 +475,6 @@ func createSlabBuffer(tx *gorm.DB, contractSetID uint, dir string, minShards, to
MinShards: minShards,
TotalShards: totalShards,
},
Complete: false,
Size: 0,
Filename: fileName,
}
err = tx.Create(&createdSlab).
Expand Down

0 comments on commit 199c12a

Please sign in to comment.