diff --git a/api/endpoints.go b/api/endpoints.go index 40054bf0..e97118b6 100644 --- a/api/endpoints.go +++ b/api/endpoints.go @@ -284,7 +284,7 @@ func (a *api) handleGETContract(c jape.Context) { } func (a *api) handleGETVolume(c jape.Context) { - var id int + var id int64 if err := c.DecodeParam("id", &id); err != nil { return } else if id < 0 { @@ -292,7 +292,7 @@ func (a *api) handleGETVolume(c jape.Context) { return } - volume, err := a.volumes.Volume(int64(id)) + volume, err := a.volumes.Volume(id) if errors.Is(err, storage.ErrVolumeNotFound) { c.Error(err, http.StatusNotFound) return @@ -303,7 +303,7 @@ func (a *api) handleGETVolume(c jape.Context) { } func (a *api) handlePUTVolume(c jape.Context) { - var id int + var id int64 if err := c.DecodeParam("id", &id); err != nil { return } else if id < 0 { @@ -316,7 +316,7 @@ func (a *api) handlePUTVolume(c jape.Context) { return } - err := a.volumes.SetReadOnly(int64(id), req.ReadOnly) + err := a.volumes.SetReadOnly(id, req.ReadOnly) if errors.Is(err, storage.ErrVolumeNotFound) { c.Error(err, http.StatusNotFound) return diff --git a/api/volumes.go b/api/volumes.go index f7957bb5..2f40be4d 100644 --- a/api/volumes.go +++ b/api/volumes.go @@ -153,7 +153,7 @@ func (a *api) handlePOSTVolume(c jape.Context) { } func (a *api) handleDeleteVolume(c jape.Context) { - var id int + var id int64 var force bool if err := c.DecodeParam("id", &id); err != nil { return @@ -163,12 +163,12 @@ func (a *api) handleDeleteVolume(c jape.Context) { } else if err := c.DecodeForm("force", &force); err != nil { return } - err := a.volumeJobs.RemoveVolume(int64(id), force) + err := a.volumeJobs.RemoveVolume(id, force) a.checkServerError(c, "failed to remove volume", err) } func (a *api) handlePUTVolumeResize(c jape.Context) { - var id int + var id int64 if err := c.DecodeParam("id", &id); err != nil { return } else if id < 0 { @@ -181,12 +181,12 @@ func (a *api) handlePUTVolumeResize(c jape.Context) { return } - err := a.volumeJobs.ResizeVolume(int64(id), req.MaxSectors) + err := a.volumeJobs.ResizeVolume(id, req.MaxSectors) a.checkServerError(c, "failed to resize volume", err) } func (a *api) handleDELETEVolumeCancelOp(c jape.Context) { - var id int + var id int64 if err := c.DecodeParam("id", &id); err != nil { return } else if id < 0 { @@ -194,6 +194,6 @@ func (a *api) handleDELETEVolumeCancelOp(c jape.Context) { return } - err := a.volumeJobs.Cancel(int64(id)) + err := a.volumeJobs.Cancel(id) a.checkServerError(c, "failed to cancel operation", err) } diff --git a/go.mod b/go.mod index 5708e9cc..b40f1b06 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/mattn/go-sqlite3 v1.14.17 gitlab.com/NebulousLabs/encoding v0.0.0-20200604091946-456c3dc907fe go.sia.tech/core v0.1.12-0.20231011160830-b58e9e8ec3ce - go.sia.tech/jape v0.9.1-0.20230525021720-ecf031ecbffb + go.sia.tech/jape v0.10.0 go.sia.tech/renterd v0.6.0 go.sia.tech/siad v1.5.10-0.20230228235644-3059c0b930ca go.sia.tech/web/hostd v0.26.0 diff --git a/go.sum b/go.sum index e2883a1d..4801d46e 100644 --- a/go.sum +++ b/go.sum @@ -228,6 +228,8 @@ go.sia.tech/core v0.1.12-0.20231011160830-b58e9e8ec3ce h1:rRji+HxWtZEyXAyc/LSqS4 go.sia.tech/core v0.1.12-0.20231011160830-b58e9e8ec3ce/go.mod h1:3EoY+rR78w1/uGoXXVqcYdwSjSJKuEMI5bL7WROA27Q= go.sia.tech/jape v0.9.1-0.20230525021720-ecf031ecbffb h1:yLDEqkqC19E/HgBoq2Uhw9oH3SMNRyeRjZ7Ep4dPKR8= go.sia.tech/jape v0.9.1-0.20230525021720-ecf031ecbffb/go.mod h1:4QqmBB+t3W7cNplXPj++ZqpoUb2PeiS66RLpXmEGap4= +go.sia.tech/jape v0.10.0 h1:wsIURirNV29fvqxhvvbd0yhKh+9JeNZvz4haJUL/+yI= +go.sia.tech/jape v0.10.0/go.mod h1:4QqmBB+t3W7cNplXPj++ZqpoUb2PeiS66RLpXmEGap4= go.sia.tech/mux v1.2.0 h1:ofa1Us9mdymBbGMY2XH/lSpY8itFsKIo/Aq8zwe+GHU= go.sia.tech/mux v1.2.0/go.mod h1:Yyo6wZelOYTyvrHmJZ6aQfRoer3o4xyKQ4NmQLJrBSo= go.sia.tech/renterd v0.6.0 h1:eEWftx9xOvYVeGCejmyTopxKAql+KvnHxh1kMk5GEAo= diff --git a/host/storage/consts_default.go b/host/storage/consts_default.go index e76a8243..7b9e0bc2 100644 --- a/host/storage/consts_default.go +++ b/host/storage/consts_default.go @@ -5,5 +5,7 @@ package storage import "time" const ( + resizeBatchSize = 64 // 256 MiB + cleanupInterval = 15 * time.Minute ) diff --git a/host/storage/consts_testing.go b/host/storage/consts_testing.go index 6e2460fc..18360e44 100644 --- a/host/storage/consts_testing.go +++ b/host/storage/consts_testing.go @@ -4,4 +4,6 @@ package storage const ( cleanupInterval = 0 + + resizeBatchSize = 4 // 16 MiB ) diff --git a/host/storage/recorder.go b/host/storage/recorder.go index d3b66ce1..cc417ade 100644 --- a/host/storage/recorder.go +++ b/host/storage/recorder.go @@ -7,7 +7,7 @@ import ( "go.uber.org/zap" ) -const flushInterval = 30 * time.Second +const flushInterval = time.Minute type ( sectorAccessRecorder struct { diff --git a/host/storage/storage.go b/host/storage/storage.go index 9172984e..7068a92d 100644 --- a/host/storage/storage.go +++ b/host/storage/storage.go @@ -21,8 +21,6 @@ import ( ) const ( - resizeBatchSize = 64 // 256 MiB - // MaxTempSectorBlocks is the maximum number of blocks that a temp sector // can be stored for. MaxTempSectorBlocks = 144 * 7 // 7 days @@ -88,64 +86,6 @@ type ( } ) -// getVolume returns the volume with the given ID, or an error if the volume does -// not exist or is currently busy. -func (vm *VolumeManager) getVolume(v int64) (*volume, error) { - vm.mu.Lock() - defer vm.mu.Unlock() - vol, ok := vm.volumes[v] - if !ok { - return nil, fmt.Errorf("volume %v not found", v) - } - return vol, nil -} - -// lockVolume locks a volume for operations until release is called. A locked -// volume cannot have its size or status changed and no new sectors can be -// written to it. -func (vm *VolumeManager) lockVolume(id int64) (func(), error) { - vm.mu.Lock() - defer vm.mu.Unlock() - v, ok := vm.volumes[id] - if !ok { - return nil, fmt.Errorf("volume %v not found", id) - } else if v.busy { - return nil, fmt.Errorf("volume %v is busy", id) - } - v.busy = true - var once sync.Once - return func() { - once.Do(func() { - vm.mu.Lock() - v, ok := vm.volumes[id] - if ok { - v.busy = false - } - vm.mu.Unlock() - }) - }, nil -} - -// writeSector writes a sector to a volume. The location is assumed to be empty -// and locked. -func (vm *VolumeManager) writeSector(data *[rhp2.SectorSize]byte, loc SectorLocation, sync bool) error { - vol, err := vm.getVolume(loc.Volume) - if err != nil { - return fmt.Errorf("failed to get volume: %w", err) - } else if err := vol.WriteSector(data, loc.Index); err != nil { - return fmt.Errorf("failed to write sector data: %w", err) - } - - if sync { - return vm.Sync() - } - - vm.mu.Lock() - vm.changedVolumes[loc.Volume] = true - vm.mu.Unlock() - return nil -} - // loadVolumes opens all volumes. Volumes that are already loaded are skipped. func (vm *VolumeManager) loadVolumes() error { done, err := vm.tg.Add() @@ -198,8 +138,9 @@ func (vm *VolumeManager) loadVolumes() error { // mark the volume as available if err := vm.vs.SetAvailable(vol.ID, true); err != nil { return fmt.Errorf("failed to mark volume '%v' as available: %w", vol.LocalPath, err) + } else if err := v.SetStatus(VolumeStatusReady); err != nil { + return fmt.Errorf("failed to set volume status: %w", err) } - v.SetStatus(VolumeStatusReady) vm.log.Debug("loaded volume", zap.Int64("id", vol.ID), zap.String("path", vol.LocalPath)) } return nil @@ -221,14 +162,26 @@ func (vm *VolumeManager) migrateSector(loc SectorLocation, log *zap.Logger) erro return fmt.Errorf("sector corrupt: %v != %v", loc.Root, root) } - // write the sector to the new location - return vm.writeSector(sector, loc, true) + vm.mu.Lock() + vol, ok := vm.volumes[loc.Volume] + vm.mu.Unlock() + if !ok { + return fmt.Errorf("volume %v not found", loc.Volume) + } + // write the sector to the new location and sync the volume + if err := vol.WriteSector(sector, loc.Index); err != nil { + return fmt.Errorf("failed to write sector: %w", err) + } else if err := vol.Sync(); err != nil { + return fmt.Errorf("failed to sync volume: %w", err) + } + return nil } // growVolume grows a volume by adding sectors to the end of the volume. func (vm *VolumeManager) growVolume(ctx context.Context, id int64, volume *volume, oldMaxSectors, newMaxSectors uint64) error { - if oldMaxSectors > newMaxSectors { - return errors.New("old sectors must be less than new sectors") + log := vm.log.Named("grow").With(zap.Int64("volumeID", id), zap.Uint64("start", oldMaxSectors), zap.Uint64("end", newMaxSectors)) + if oldMaxSectors > newMaxSectors { // sanity check + log.Panic("old sectors must be less than new sectors") } // register an alert @@ -270,6 +223,7 @@ func (vm *VolumeManager) growVolume(ctx context.Context, id int64, volume *volum } else if err := vm.vs.GrowVolume(id, target); err != nil { return fmt.Errorf("failed to expand volume metadata: %w", err) } + log.Debug("expanded volume", zap.Uint64("current", current)) // update the alert alert.Data["currentSectors"] = target @@ -282,7 +236,7 @@ func (vm *VolumeManager) growVolume(ctx context.Context, id int64, volume *volum // shrinkVolume shrinks a volume by removing sectors from the end of the volume. func (vm *VolumeManager) shrinkVolume(ctx context.Context, id int64, volume *volume, oldMaxSectors, newMaxSectors uint64) error { - log := vm.log.Named("shrinkVolume").With(zap.Int64("volumeID", id), zap.Uint64("oldMaxSectors", oldMaxSectors), zap.Uint64("newMaxSectors", newMaxSectors)) + log := vm.log.Named("shrink").With(zap.Int64("volumeID", id), zap.Uint64("start", oldMaxSectors), zap.Uint64("end", newMaxSectors)) if oldMaxSectors <= newMaxSectors { return errors.New("old sectors must be greater than new sectors") } @@ -352,6 +306,8 @@ func (vm *VolumeManager) shrinkVolume(ctx context.Context, id int64, volume *vol return fmt.Errorf("failed to shrink volume data to %v sectors: %w", current, err) } + log.Debug("shrunk volume", zap.Uint64("current", current)) + current = target // update the alert a.Data["currentSectors"] = current @@ -362,43 +318,16 @@ func (vm *VolumeManager) shrinkVolume(ctx context.Context, id int64, volume *vol return nil } -func (vm *VolumeManager) volumeStats(id int64) (vs VolumeStats) { +// volumeStats returns the stats for a volume. A lock must be held on the volume +// manager before this function is called. +func (vm *VolumeManager) volumeStats(id int64) VolumeStats { v, ok := vm.volumes[id] if !ok { - vs.Status = "unavailable" - } else { - vs = v.stats - } - return -} - -func (vm *VolumeManager) setVolumeStatus(id int64, status string) { - vm.mu.Lock() - defer vm.mu.Unlock() - - v, ok := vm.volumes[id] - if !ok { - return - } - v.stats.Status = status -} - -func (vm *VolumeManager) doResize(ctx context.Context, volumeID int64, vol *volume, current, target uint64) error { - ctx, cancel, err := vm.tg.AddContext(ctx) - if err != nil { - return err - } - defer cancel() - - switch { - case current > target: - // volume is shrinking - return vm.shrinkVolume(ctx, volumeID, vol, current, target) - case current < target: - // volume is growing - return vm.growVolume(ctx, volumeID, vol, current, target) + return VolumeStats{ + Status: VolumeStatusUnavailable, + } } - return nil + return v.Stats() } func (vm *VolumeManager) migrateForRemoval(ctx context.Context, id int64, localPath string, force bool, log *zap.Logger) (int, error) { @@ -588,21 +517,15 @@ func (vm *VolumeManager) AddVolume(ctx context.Context, localPath string, maxSec vm.volumes[volumeID] = vol vm.mu.Unlock() - // lock the volume during grow operation - release, err := vm.lockVolume(volumeID) - if err != nil { - return Volume{}, fmt.Errorf("failed to lock volume: %w", err) - } + vm.vs.SetAvailable(volumeID, true) go func() { + defer vol.SetStatus(VolumeStatusReady) + log := vm.log.Named("initialize").With(zap.Int64("volumeID", volumeID), zap.Uint64("maxSectors", maxSectors)) start := time.Now() - err := func() error { - defer vm.vs.SetAvailable(volumeID, true) - defer vm.setVolumeStatus(volumeID, VolumeStatusReady) - defer release() - return vm.doResize(ctx, volumeID, vol, 0, maxSectors) - }() + + err := vm.growVolume(ctx, volumeID, vol, 0, maxSectors) alert := alerts.Alert{ ID: frand.Entropy256(), Data: map[string]interface{}{ @@ -639,11 +562,15 @@ func (vm *VolumeManager) SetReadOnly(id int64, readOnly bool) error { } defer done() - release, err := vm.lockVolume(id) - if err != nil { - return fmt.Errorf("failed to lock volume: %w", err) + // check that the volume is available and not busy + vm.mu.Lock() + vol, ok := vm.volumes[id] + vm.mu.Unlock() + if !ok { + return fmt.Errorf("volume %v not found", id) + } else if vol.Status() != VolumeStatusReady { + return fmt.Errorf("volume is %v", vol.Status()) } - defer release() if err := vm.vs.SetReadOnly(id, readOnly); err != nil { return fmt.Errorf("failed to set volume %v to read-only: %w", id, err) @@ -660,32 +587,33 @@ func (vm *VolumeManager) RemoveVolume(ctx context.Context, id int64, force bool, } defer done() - // lock the volume during removal to prevent concurrent operations - release, err := vm.lockVolume(id) - if err != nil { - return fmt.Errorf("failed to lock volume: %w", err) + vm.mu.Lock() + vol, ok := vm.volumes[id] + vm.mu.Unlock() + if !ok { + return fmt.Errorf("volume %v not found", id) + } else if err := vol.SetStatus(VolumeStatusRemoving); err != nil { + return fmt.Errorf("failed to set volume status: %w", err) } - vol, err := vm.vs.Volume(id) + stat, err := vm.vs.Volume(id) if err != nil { - release() return fmt.Errorf("failed to get volume: %w", err) } // set the volume to read-only to prevent new sectors from being added if err := vm.vs.SetReadOnly(id, true); err != nil { - release() return fmt.Errorf("failed to set volume %v to read-only: %w", id, err) } go func() { start := time.Now() - migrated, err := func() (int, error) { - vm.setVolumeStatus(id, VolumeStatusRemoving) - defer vm.setVolumeStatus(id, VolumeStatusReady) - defer release() - return vm.migrateForRemoval(ctx, id, vol.LocalPath, force, log) - }() + defer vol.SetStatus(VolumeStatusReady) + + migrated, err := vm.migrateForRemoval(ctx, id, stat.LocalPath, force, log) + if err != nil { + log.Error("failed to migrate sectors", zap.Error(err)) + } alert := alerts.Alert{ ID: frand.Entropy256(), @@ -723,44 +651,68 @@ func (vm *VolumeManager) ResizeVolume(ctx context.Context, id int64, maxSectors } defer done() - vol, err := vm.getVolume(id) + stat, err := vm.vs.Volume(id) if err != nil { return fmt.Errorf("failed to get volume: %w", err) } - release, err := vm.lockVolume(id) - if err != nil { - return fmt.Errorf("failed to lock volume: %w", err) + vm.mu.Lock() + defer vm.mu.Unlock() + + vol, ok := vm.volumes[id] + if !ok { + return fmt.Errorf("volume %v not found", id) } - vm.setVolumeStatus(id, VolumeStatusResizing) - stat, err := vm.vs.Volume(id) - if err != nil { - release() - return fmt.Errorf("failed to get volume: %w", err) + // check that the volume is not already being resized + if err := vol.SetStatus(VolumeStatusResizing); err != nil { + return fmt.Errorf("failed to set volume status: %w", err) } - oldReadonly := stat.ReadOnly - // set the volume to read-only to prevent new sectors from being added - if err := vm.vs.SetReadOnly(id, true); err != nil { - release() - return fmt.Errorf("failed to set volume %v to read-only: %w", id, err) + var resetReadOnly bool + if stat.TotalSectors > maxSectors && !stat.ReadOnly { + // set the volume to read-only to prevent new sectors from being added + // while the volume is being shrunk + if err := vm.vs.SetReadOnly(id, true); err != nil { + return fmt.Errorf("failed to set volume %v to read-only: %w", id, err) + } + resetReadOnly = true } go func() { log := vm.log.Named("resize").With(zap.Int64("volumeID", id)) - start := time.Now() - err := func() error { - defer func() { - // restore the volume to its original read-only status - if err := vm.vs.SetReadOnly(id, oldReadonly); err != nil { - log.Error("failed to restore volume read-only status", zap.Error(err)) + + defer func() { + if resetReadOnly { + // reset the volume to read-write + if err := vm.vs.SetReadOnly(id, false); err != nil { + vm.log.Error("failed to set volume to read-write", zap.Error(err)) } - vm.setVolumeStatus(id, VolumeStatusReady) - }() - defer release() - return vm.doResize(ctx, id, vol, stat.TotalSectors, maxSectors) + } + vol.SetStatus(VolumeStatusReady) }() + + ctx, cancel, err := vm.tg.AddContext(ctx) + if err != nil { + result <- err + return + } + defer cancel() + + start := time.Now() + + current := stat.TotalSectors + target := maxSectors + + switch { + case current > target: + // volume is shrinking + err = vm.shrinkVolume(ctx, id, vol, stat.TotalSectors, maxSectors) + case current < target: + // volume is growing + err = vm.growVolume(ctx, id, vol, stat.TotalSectors, maxSectors) + } + alert := alerts.Alert{ ID: frand.Entropy256(), Data: map[string]interface{}{ @@ -770,6 +722,7 @@ func (vm *VolumeManager) ResizeVolume(ctx context.Context, id int64, maxSectors }, Timestamp: time.Now(), } + if err != nil { log.Error("failed to resize volume", zap.Error(err)) alert.Message = "Volume resize failed" @@ -808,10 +761,13 @@ func (vm *VolumeManager) RemoveSector(root types.Hash256) error { return fmt.Errorf("failed to remove sector %v: %w", root, err) } + vm.mu.Lock() + defer vm.mu.Unlock() + // get the volume from memory - vol, err := vm.getVolume(loc.Volume) - if err != nil { - return fmt.Errorf("failed to get volume %v: %w", loc.Volume, err) + vol, ok := vm.volumes[loc.Volume] + if !ok { + return fmt.Errorf("volume %v not found", loc.Volume) } // zero the sector and immediately sync the volume @@ -901,11 +857,15 @@ func (vm *VolumeManager) Sync() error { toSync = append(toSync, id) } vm.mu.Unlock() + for _, id := range toSync { - v, err := vm.getVolume(id) - if err != nil { - return fmt.Errorf("failed to get volume %v: %w", id, err) - } else if err := v.Sync(); err != nil { + vm.mu.Lock() + vol, ok := vm.volumes[id] + vm.mu.Unlock() + if !ok { + continue + } + if err := vol.Sync(); err != nil { return fmt.Errorf("failed to sync volume %v: %w", id, err) } vm.mu.Lock() @@ -928,12 +888,27 @@ func (vm *VolumeManager) Write(root types.Hash256, data *[rhp2.SectorSize]byte) return nil } start := time.Now() - if err := vm.writeSector(data, loc, false); err != nil { - return err + + vm.mu.Lock() + vol, ok := vm.volumes[loc.Volume] + vm.mu.Unlock() + if !ok { + return fmt.Errorf("volume %v not found", loc.Volume) + } + + // write the sector to the volume + if err := vol.WriteSector(data, loc.Index); err != nil { + return fmt.Errorf("failed to write sector data: %w", err) } vm.log.Debug("wrote sector", zap.String("root", root.String()), zap.Int64("volume", loc.Volume), zap.Uint64("index", loc.Index), zap.Duration("elapsed", time.Since(start))) + // Add newly written sector to cache vm.cache.Add(root, data) + + // mark the volume as changed + vm.mu.Lock() + vm.changedVolumes[loc.Volume] = true + vm.mu.Unlock() return nil }) if err == nil { diff --git a/host/storage/storage_test.go b/host/storage/storage_test.go index 6f8b7539..21888c18 100644 --- a/host/storage/storage_test.go +++ b/host/storage/storage_test.go @@ -661,6 +661,159 @@ func TestVolumeDistribution(t *testing.T) { } } +func TestVolumeConcurrency(t *testing.T) { + const ( + sectors = 256 + writeSectors = 10 + ) + dir := t.TempDir() + + // create the database + log := zaptest.NewLogger(t) + db, err := sqlite.OpenDatabase(filepath.Join(dir, "hostd.db"), log.Named("sqlite")) + if err != nil { + t.Fatal(err) + } + defer db.Close() + + g, err := gateway.New(":0", false, filepath.Join(dir, "gateway")) + if err != nil { + t.Fatal(err) + } + defer g.Close() + + cs, errCh := consensus.New(g, false, filepath.Join(dir, "consensus")) + select { + case err := <-errCh: + if err != nil { + t.Fatal(err) + } + default: + } + cm, err := chain.NewManager(cs) + if err != nil { + t.Fatal(err) + } + defer cm.Close() + defer cm.Close() + + // initialize the storage manager + am := alerts.NewManager() + vm, err := storage.NewVolumeManager(db, am, cm, log.Named("volumes"), 0) + if err != nil { + t.Fatal(err) + } + defer vm.Close() + + volumeFilePath := filepath.Join(t.TempDir(), "hostdata.dat") + result := make(chan error, 1) + volume, err := vm.AddVolume(context.Background(), volumeFilePath, sectors, result) + if err != nil { + t.Fatal(err) + } + + for { + // reload the volume, since the initialization progress will have changed + v, err := vm.Volume(volume.ID) + if err != nil { + t.Fatal(err) + } + volume = v.Volume + // wait for enough sectors to be initialized + if volume.TotalSectors >= writeSectors { + break + } + } + + // add a few sectors while the volume is initializing + roots := make([]types.Hash256, writeSectors) + for i := range roots { + var sector [rhp2.SectorSize]byte + if _, err := frand.Read(sector[:256]); err != nil { + t.Fatal(err) + } + root := rhp2.SectorRoot(§or) + release, err := vm.Write(root, §or) + if err != nil { + t.Fatal(i, err) + } + defer release() + roots[i] = root + } + + // read the sectors back + for _, root := range roots { + sector, err := vm.Read(root) + if err != nil { + t.Fatal(err) + } else if rhp2.SectorRoot(sector) != root { + t.Fatal("sector was corrupted") + } + } + + // wait for the volume to finish initializing + if err := <-result; err != nil { + t.Fatal(err) + } + + // reload the volume, since initialization should be complete + v, err := vm.Volume(volume.ID) + if err != nil { + t.Fatal(err) + } + volume = v.Volume + + // check the volume + if err := checkFileSize(volumeFilePath, int64(sectors*rhp2.SectorSize)); err != nil { + t.Fatal(err) + } else if volume.TotalSectors != sectors { + t.Fatalf("expected %v total sectors, got %v", sectors, volume.TotalSectors) + } else if volume.UsedSectors != writeSectors { + t.Fatalf("expected %v used sectors, got %v", writeSectors, volume.UsedSectors) + } + + // generate a random sector + var sector [rhp2.SectorSize]byte + _, _ = frand.Read(sector[:256]) + root := rhp2.SectorRoot(§or) + + // shrink the volume so it is nearly full + if err := vm.ResizeVolume(context.Background(), volume.ID, writeSectors+1, result); err != nil { + t.Fatal(err) + } + + // try to write a sector to the volume, which should fail + if _, err := vm.Write(root, §or); !errors.Is(err, storage.ErrNotEnoughStorage) { + t.Fatalf("expected %v, got %v", storage.ErrNotEnoughStorage, err) + } + + // wait for the volume to finish shrinking + if err := <-result; err != nil { + t.Fatal(err) + } + + // reload the volume, since shrinking should be complete + v, err = vm.Volume(volume.ID) + if err != nil { + t.Fatal(err) + } + volume = v.Volume + + // check the volume + if err := checkFileSize(volumeFilePath, int64((writeSectors+1)*rhp2.SectorSize)); err != nil { + t.Fatal(err) + } else if volume.TotalSectors != writeSectors+1 { + t.Fatalf("expected %v total sectors, got %v", writeSectors+1, volume.TotalSectors) + } else if volume.UsedSectors != writeSectors { + t.Fatalf("expected %v used sectors, got %v", writeSectors, volume.UsedSectors) + } + + // write the sector again, which should succeed + if _, err := vm.Write(root, §or); err != nil { + t.Fatal(err) + } +} + func TestVolumeGrow(t *testing.T) { const initialSectors = 20 dir := t.TempDir() @@ -763,8 +916,6 @@ func TestVolumeGrow(t *testing.T) { t.Fatalf("expected %v total sectors, got %v", newSectors, meta.TotalSectors) } else if meta.UsedSectors != uint64(len(roots)) { t.Fatalf("expected %v used sectors, got %v", len(roots), meta.UsedSectors) - } else if meta.Status != storage.VolumeStatusReady { - t.Fatalf("expected volume status to be ready, got %v", meta.Status) } f, err := os.Open(volumeFilePath) @@ -933,8 +1084,6 @@ func TestVolumeShrink(t *testing.T) { t.Fatalf("expected %v total sectors, got %v", remainingSectors, meta.TotalSectors) } else if meta.UsedSectors != remainingSectors { t.Fatalf("expected %v used sectors, got %v", remainingSectors, meta.UsedSectors) - } else if meta.Status != storage.VolumeStatusReady { - t.Fatalf("expected volume status to be ready, got %v", meta.Status) } // validate that the sectors were moved to the beginning of the volume diff --git a/host/storage/volume.go b/host/storage/volume.go index 135b8a70..7bdf8822 100644 --- a/host/storage/volume.go +++ b/host/storage/volume.go @@ -4,8 +4,6 @@ import ( "errors" "fmt" "io" - "math" - "math/rand" "os" "sync" @@ -25,16 +23,15 @@ type ( Close() error } - // A volume stores and retrieves sector data + // A volume stores and retrieves sector data from a local file volume struct { - // data is a flatfile that stores the volume's sector data - data volumeData + // when reading or writing to the volume, a read lock should be held. + // When resizing or updating the volume's state, a write lock should be + // held. + mu sync.RWMutex - mu sync.Mutex // protects the fields below + data volumeData // data is a flatfile that stores the volume's sector data stats VolumeStats - // busy must be set to true when the volume is being resized to prevent - // conflicting operations. - busy bool } // VolumeStats contains statistics about a volume @@ -67,6 +64,28 @@ type ( // ErrVolumeNotAvailable is returned when a volume is not available var ErrVolumeNotAvailable = errors.New("volume not available") +func (v *volume) incrementReadStats(err error) { + v.mu.Lock() + defer v.mu.Unlock() + if err != nil { + v.stats.FailedReads++ + v.appendError(err) + } else { + v.stats.SuccessfulReads++ + } +} + +func (v *volume) incrementWriteStats(err error) { + v.mu.Lock() + defer v.mu.Unlock() + if err != nil { + v.stats.FailedWrites++ + v.appendError(err) + } else { + v.stats.SuccessfulWrites++ + } +} + func (v *volume) appendError(err error) { v.stats.Errors = append(v.stats.Errors, err) if len(v.stats.Errors) > 100 { @@ -89,21 +108,56 @@ func (v *volume) OpenVolume(localPath string, reload bool) error { return nil } +// SetStatus sets the status of the volume. If the new status is resizing, the +// volume must be ready. If the new status is removing, the volume must be ready +// or unavailable. +func (v *volume) SetStatus(status string) error { + v.mu.Lock() + defer v.mu.Unlock() + + if v.stats.Status == status { + return nil + } + + switch status { + case VolumeStatusRemoving: + if v.stats.Status != VolumeStatusReady && v.stats.Status != VolumeStatusUnavailable { + return fmt.Errorf("volume is %v", v.stats.Status) + } + case VolumeStatusResizing: + if v.stats.Status != VolumeStatusReady { + return fmt.Errorf("volume is %v", v.stats.Status) + } + case VolumeStatusReady, VolumeStatusUnavailable: + default: + panic("cannot set status to " + status) // developer error + } + v.stats.Status = status + return nil +} + +func (v *volume) Status() string { + v.mu.RLock() + defer v.mu.RUnlock() + return v.stats.Status +} + // ReadSector reads the sector at index from the volume func (v *volume) ReadSector(index uint64) (*[rhp2.SectorSize]byte, error) { if v.data == nil { return nil, ErrVolumeNotAvailable } + var sector [rhp2.SectorSize]byte + + v.mu.RLock() + defer v.mu.RUnlock() _, err := v.data.ReadAt(sector[:], int64(index*rhp2.SectorSize)) - v.mu.Lock() + if err != nil { - v.stats.FailedReads++ - v.appendError(fmt.Errorf("failed to read sector at index %v: %w", index, err)) - } else { - v.stats.SuccessfulReads++ + err = fmt.Errorf("failed to read sector at index %v: %w", index, err) } - v.mu.Unlock() + go v.incrementReadStats(err) return §or, err } @@ -112,25 +166,17 @@ func (v *volume) WriteSector(data *[rhp2.SectorSize]byte, index uint64) error { if v.data == nil { panic("volume not open") // developer error } + + v.mu.RLock() + defer v.mu.RUnlock() _, err := v.data.WriteAt(data[:], int64(index*rhp2.SectorSize)) - v.mu.Lock() if err != nil { - v.stats.FailedWrites++ - v.appendError(fmt.Errorf("failed to write sector to index %v: %w", index, err)) - } else { - v.stats.SuccessfulWrites++ + err = fmt.Errorf("failed to write sector to index %v: %w", index, err) } - v.mu.Unlock() + go v.incrementWriteStats(err) return err } -// SetStatus sets the status message of the volume -func (v *volume) SetStatus(status string) { - v.mu.Lock() - v.stats.Status = status - v.mu.Unlock() -} - // Sync syncs the volume func (v *volume) Sync() error { v.mu.Lock() @@ -147,30 +193,39 @@ func (v *volume) Sync() error { } func (v *volume) Resize(oldSectors, newSectors uint64) error { - v.mu.Lock() - defer v.mu.Unlock() - if v.data == nil { return ErrVolumeNotAvailable } if newSectors > oldSectors { - buf := make([]byte, rhp2.SectorSize) - r := rand.New(rand.NewSource(int64(frand.Uint64n(math.MaxInt64)))) - for i := oldSectors; i < newSectors; i++ { - r.Read(buf) - if _, err := v.data.WriteAt(buf, int64(i*rhp2.SectorSize)); err != nil { - return fmt.Errorf("failed to write sector to index %v: %w", i, err) - } + size := (newSectors - oldSectors) * rhp2.SectorSize // should never be more than 256 MiB + buf := make([]byte, size) + _, _ = frand.Read(buf) // frand will never return an error + + v.mu.Lock() + defer v.mu.Unlock() + + // write the data to the end of the file + if _, err := v.data.WriteAt(buf, int64(oldSectors*rhp2.SectorSize)); err != nil { + return err } } else { + v.mu.Lock() + defer v.mu.Unlock() + if err := v.data.Truncate(int64(newSectors * rhp2.SectorSize)); err != nil { - return fmt.Errorf("failed to truncate volume: %w", err) + return err } } return nil } +func (v *volume) Stats() VolumeStats { + v.mu.RLock() + defer v.mu.RUnlock() + return v.stats +} + // Close closes the volume func (v *volume) Close() error { v.mu.Lock()