Skip to content

Commit

Permalink
storage: refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
n8maninger committed Nov 15, 2023
1 parent aa37cd4 commit 45b35e2
Showing 1 changed file with 79 additions and 86 deletions.
165 changes: 79 additions & 86 deletions host/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,30 +86,6 @@ type (
}
)

// writeSector writes a sector to a volume. The location is assumed to be empty
// and locked.
func (vm *VolumeManager) writeSector(data *[rhp2.SectorSize]byte, loc SectorLocation, sync bool) error {
vm.mu.Lock()
vol, ok := vm.volumes[loc.Volume]
vm.mu.Unlock()
if !ok {
return fmt.Errorf("volume %v not found", loc.Volume)
}

if err := vol.WriteSector(data, loc.Index); err != nil {
return fmt.Errorf("failed to write sector data: %w", err)
}

if sync {
return vm.Sync()
}

vm.mu.Lock()
vm.changedVolumes[loc.Volume] = true
vm.mu.Unlock()
return nil
}

// loadVolumes opens all volumes. Volumes that are already loaded are skipped.
func (vm *VolumeManager) loadVolumes() error {
done, err := vm.tg.Add()
Expand Down Expand Up @@ -137,41 +113,35 @@ func (vm *VolumeManager) loadVolumes() error {
vm.volumes[vol.ID] = v
}

err := func() error {
if err := v.OpenVolume(vol.LocalPath, false); err != nil {
v.appendError(fmt.Errorf("failed to open volume: %w", err))
vm.log.Error("unable to open volume", zap.Error(err), zap.Int64("id", vol.ID), zap.String("path", vol.LocalPath))
// mark the volume as unavailable
if err := vm.vs.SetAvailable(vol.ID, false); err != nil {
return fmt.Errorf("failed to mark volume '%v' as unavailable: %w", vol.LocalPath, err)
}
if err := v.OpenVolume(vol.LocalPath, false); err != nil {
v.appendError(fmt.Errorf("failed to open volume: %w", err))
vm.log.Error("unable to open volume", zap.Error(err), zap.Int64("id", vol.ID), zap.String("path", vol.LocalPath))
// mark the volume as unavailable
if err := vm.vs.SetAvailable(vol.ID, false); err != nil {
return fmt.Errorf("failed to mark volume '%v' as unavailable: %w", vol.LocalPath, err)
}

// register an alert
vm.a.Register(alerts.Alert{
ID: frand.Entropy256(),
Severity: alerts.SeverityError,
Message: "Failed to open volume",
Data: map[string]any{
"volume": vol.LocalPath,
"error": err.Error(),
},
Timestamp: time.Now(),
})
// register an alert
vm.a.Register(alerts.Alert{
ID: frand.Entropy256(),
Severity: alerts.SeverityError,
Message: "Failed to open volume",
Data: map[string]any{
"volume": vol.LocalPath,
"error": err.Error(),
},
Timestamp: time.Now(),
})

return nil
}
// mark the volume as available
if err := vm.vs.SetAvailable(vol.ID, true); err != nil {
return fmt.Errorf("failed to mark volume '%v' as available: %w", vol.LocalPath, err)
} else if err := v.SetStatus(VolumeStatusReady); err != nil {
return fmt.Errorf("failed to set volume status: %w", err)
}
vm.log.Debug("loaded volume", zap.Int64("id", vol.ID), zap.String("path", vol.LocalPath))
return nil
}()
if err != nil {
return err
continue
}
// mark the volume as available
if err := vm.vs.SetAvailable(vol.ID, true); err != nil {
return fmt.Errorf("failed to mark volume '%v' as available: %w", vol.LocalPath, err)
} else if err := v.SetStatus(VolumeStatusReady); err != nil {
return fmt.Errorf("failed to set volume status: %w", err)
}
vm.log.Debug("loaded volume", zap.Int64("id", vol.ID), zap.String("path", vol.LocalPath))
}
return nil
}
Expand All @@ -192,13 +162,24 @@ func (vm *VolumeManager) migrateSector(loc SectorLocation, log *zap.Logger) erro
return fmt.Errorf("sector corrupt: %v != %v", loc.Root, root)
}

// write the sector to the new location
return vm.writeSector(sector, loc, true)
vm.mu.Lock()
vol, ok := vm.volumes[loc.Volume]
vm.mu.Unlock()
if !ok {
return fmt.Errorf("volume %v not found", loc.Volume)
}
// write the sector to the new location and sync the volume
if err := vol.WriteSector(sector, loc.Index); err != nil {
return fmt.Errorf("failed to write sector: %w", err)
} else if err := vol.Sync(); err != nil {
return fmt.Errorf("failed to sync volume: %w", err)
}
return nil
}

// growVolume grows a volume by adding sectors to the end of the volume.
func (vm *VolumeManager) growVolume(ctx context.Context, id int64, volume *volume, oldMaxSectors, newMaxSectors uint64) error {
log := vm.log.Named("grow").With(zap.Int64("volumeID", id))
log := vm.log.Named("grow").With(zap.Int64("volumeID", id), zap.Uint64("start", oldMaxSectors), zap.Uint64("end", newMaxSectors))
if oldMaxSectors > newMaxSectors { // sanity check
log.Panic("old sectors must be less than new sectors")
}
Expand Down Expand Up @@ -229,31 +210,24 @@ func (vm *VolumeManager) growVolume(ctx context.Context, id int64, volume *volum
default:
}

err := func() error {
target := current + resizeBatchSize
if target > newMaxSectors {
target = newMaxSectors
}

// truncate the file and add the indices to the volume store. resize is
// done in chunks to prevent holding a lock for too long and to allow
// progress tracking.
if err := volume.Resize(current, target); err != nil {
return fmt.Errorf("failed to expand volume data: %w", err)
} else if err := vm.vs.GrowVolume(id, target); err != nil {
return fmt.Errorf("failed to expand volume metadata: %w", err)
}

log.Debug("expanded volume", zap.Uint64("currentSectors", current), zap.Uint64("targetSectors", target))
target := current + resizeBatchSize
if target > newMaxSectors {
target = newMaxSectors
}

// update the alert
alert.Data["currentSectors"] = target
vm.a.Register(alert)
return nil
}()
if err != nil {
return err
// truncate the file and add the indices to the volume store. resize is
// done in chunks to prevent holding a lock for too long and to allow
// progress tracking.
if err := volume.Resize(current, target); err != nil {
return fmt.Errorf("failed to expand volume data: %w", err)
} else if err := vm.vs.GrowVolume(id, target); err != nil {
return fmt.Errorf("failed to expand volume metadata: %w", err)
}
log.Debug("expanded volume", zap.Uint64("current", current))

// update the alert
alert.Data["currentSectors"] = target
vm.a.Register(alert)
// sleep to allow other operations to run
time.Sleep(time.Millisecond)
}
Expand All @@ -262,7 +236,7 @@ func (vm *VolumeManager) growVolume(ctx context.Context, id int64, volume *volum

// shrinkVolume shrinks a volume by removing sectors from the end of the volume.
func (vm *VolumeManager) shrinkVolume(ctx context.Context, id int64, volume *volume, oldMaxSectors, newMaxSectors uint64) error {
log := vm.log.Named("shrinkVolume").With(zap.Int64("volumeID", id), zap.Uint64("oldMaxSectors", oldMaxSectors), zap.Uint64("newMaxSectors", newMaxSectors))
log := vm.log.Named("shrink").With(zap.Int64("volumeID", id), zap.Uint64("start", oldMaxSectors), zap.Uint64("end", newMaxSectors))
if oldMaxSectors <= newMaxSectors {
return errors.New("old sectors must be greater than new sectors")
}
Expand Down Expand Up @@ -332,7 +306,7 @@ func (vm *VolumeManager) shrinkVolume(ctx context.Context, id int64, volume *vol
return fmt.Errorf("failed to shrink volume data to %v sectors: %w", current, err)
}

log.Debug("shrunk volume", zap.Uint64("currentSectors", current), zap.Uint64("targetSectors", target))
log.Debug("shrunk volume", zap.Uint64("current", current))

current = target
// update the alert
Expand All @@ -344,6 +318,8 @@ func (vm *VolumeManager) shrinkVolume(ctx context.Context, id int64, volume *vol
return nil
}

// volumeStats returns the stats for a volume. A lock must be held on the volume
// manager before this function is called.
func (vm *VolumeManager) volumeStats(id int64) VolumeStats {
v, ok := vm.volumes[id]
if !ok {
Expand Down Expand Up @@ -892,7 +868,9 @@ func (vm *VolumeManager) Sync() error {
if err := vol.Sync(); err != nil {
return fmt.Errorf("failed to sync volume %v: %w", id, err)
}
vm.mu.Lock()
delete(vm.changedVolumes, id)
vm.mu.Unlock()
}
return nil
}
Expand All @@ -910,12 +888,27 @@ func (vm *VolumeManager) Write(root types.Hash256, data *[rhp2.SectorSize]byte)
return nil
}
start := time.Now()
if err := vm.writeSector(data, loc, false); err != nil {
return err

vm.mu.Lock()
vol, ok := vm.volumes[loc.Volume]
vm.mu.Unlock()
if !ok {
return fmt.Errorf("volume %v not found", loc.Volume)
}

// write the sector to the volume
if err := vol.WriteSector(data, loc.Index); err != nil {
return fmt.Errorf("failed to write sector data: %w", err)
}
vm.log.Debug("wrote sector", zap.String("root", root.String()), zap.Int64("volume", loc.Volume), zap.Uint64("index", loc.Index), zap.Duration("elapsed", time.Since(start)))

// Add newly written sector to cache
vm.cache.Add(root, data)

// mark the volume as changed
vm.mu.Lock()
vm.changedVolumes[loc.Volume] = true
vm.mu.Unlock()
return nil
})
if err == nil {
Expand Down

0 comments on commit 45b35e2

Please sign in to comment.