Skip to content

Commit

Permalink
Check free space when registering task (#585)
Browse files Browse the repository at this point in the history
Signed-off-by: zzy987 <67889264+zzy987@users.noreply.github.com>
  • Loading branch information
zzy987 authored and gaius-qi committed Jun 28, 2023
1 parent 09824e5 commit a084386
Show file tree
Hide file tree
Showing 16 changed files with 299 additions and 49 deletions.
3 changes: 0 additions & 3 deletions cdnsystem/cdn.go
Expand Up @@ -84,9 +84,6 @@ func New(cfg *config.Config) (*Server, error) {

// Initialize storage manager
storageMgr.Initialize(taskMgr)
if err != nil {
return nil, errors.Wrapf(err, "create storage manager")
}

// Initialize storage manager
cdnSeedServer, err := rpcserver.NewCdnSeedServer(cfg, taskMgr)
Expand Down
7 changes: 7 additions & 0 deletions cdnsystem/errors/errors.go
Expand Up @@ -92,6 +92,9 @@ var (

// ErrConvertFailed represents failed to convert.
ErrConvertFailed = errors.New("convert failed")

// ErrResourcesLacked represents a lack of resources, for example, the disk does not have enough space.
ErrResourcesLacked = errors.New("resources lacked")
)

// IsSystemError checks the error is a system error or not.
Expand Down Expand Up @@ -152,3 +155,7 @@ func IsFileNotExist(err error) bool {
_, ok := err.(ErrFileNotExist)
return ok
}

func IsResourcesLacked(err error) bool {
return errors.Cause(err) == ErrResourcesLacked
}
5 changes: 5 additions & 0 deletions cdnsystem/rpcserver/rpcserver.go
Expand Up @@ -143,6 +143,11 @@ func (css *CdnSeedServer) ObtainSeeds(ctx context.Context, req *cdnsystem.SeedRe
// register task
pieceChan, err := css.taskMgr.Register(ctx, registerRequest)
if err != nil {
if cdnerrors.IsResourcesLacked(err) {
err = dferrors.Newf(dfcodes.ResourceLacked, "resources lacked for task(%s): %v", req.TaskId, err)
span.RecordError(err)
return err
}
err = dferrors.Newf(dfcodes.CdnTaskRegistryFail, "failed to register seed task(%s): %v", req.TaskId, err)
span.RecordError(err)
return err
Expand Down
4 changes: 4 additions & 0 deletions cdnsystem/supervisor/cdn/manager.go
Expand Up @@ -174,6 +174,10 @@ func (cm *Manager) Delete(taskID string) error {
return nil
}

func (cm *Manager) TryFreeSpace(fileLength int64) (bool, error) {
return cm.cacheStore.TryFreeSpace(fileLength)
}

func (cm *Manager) handleCDNResult(task *types.SeedTask, sourceDigest string, downloadMetadata *downloadMetadata) (bool, error) {
logger.WithTaskID(task.TaskID).Debugf("handle cdn result, downloadMetaData: %+v", downloadMetadata)
var isSuccess = true
Expand Down
58 changes: 57 additions & 1 deletion cdnsystem/supervisor/cdn/storage/disk/disk.go
Expand Up @@ -20,9 +20,13 @@ import (
"encoding/json"
"fmt"
"io"
"os"
"path"
"strings"
"time"

"go.uber.org/atomic"

cdnerrors "d7y.io/dragonfly/v2/cdnsystem/errors"
"d7y.io/dragonfly/v2/cdnsystem/storedriver"
"d7y.io/dragonfly/v2/cdnsystem/storedriver/local"
Expand Down Expand Up @@ -131,7 +135,7 @@ func (s *diskStorageMgr) GC() error {
for _, taskID := range gcTaskIDs {
synclock.Lock(taskID, false)
// try to ensure the taskID is not using again
if s.taskMgr.Exist(taskID) {
if _, exist := s.taskMgr.Exist(taskID); exist {
synclock.UnLock(taskID, false)
continue
}
Expand Down Expand Up @@ -230,3 +234,55 @@ func (s *diskStorageMgr) DeleteTask(taskID string) error {
func (s *diskStorageMgr) ResetRepo(task *types.SeedTask) error {
return s.DeleteTask(task.TaskID)
}

func (s *diskStorageMgr) TryFreeSpace(fileLength int64) (bool, error) {
freeSpace, err := s.diskDriver.GetFreeSpace()
if err != nil {
return false, err
}
if freeSpace > 500*unit.GB && freeSpace.ToNumber() > fileLength {
return true, nil
}

remainder := atomic.NewInt64(0)
r := &storedriver.Raw{
WalkFn: func(filePath string, info os.FileInfo, err error) error {
if fileutils.IsRegular(filePath) {
taskID := strings.Split(path.Base(filePath), ".")[0]
task, exist := s.taskMgr.Exist(taskID)
if exist {
var totalLen int64 = 0
if task.CdnFileLength > 0 {
totalLen = task.CdnFileLength
} else {
totalLen = task.SourceFileLength
}
if totalLen > 0 {
remainder.Add(totalLen - info.Size())
}
} else {
logger.Warnf("failed to get task: %s", taskID)
}
}
return nil
},
}
s.diskDriver.Walk(r)

enoughSpace := freeSpace.ToNumber()-remainder.Load() > fileLength
if !enoughSpace {
s.cleaner.GC("disk", true)
remainder.Store(0)
s.diskDriver.Walk(r)
freeSpace, err = s.diskDriver.GetFreeSpace()
if err != nil {
return false, err
}
enoughSpace = freeSpace.ToNumber()-remainder.Load() > fileLength
}
if !enoughSpace {
return false, nil
}

return true, nil
}
115 changes: 115 additions & 0 deletions cdnsystem/supervisor/cdn/storage/disk/disk_test.go
@@ -0,0 +1,115 @@
package disk

import (
"fmt"
"testing"

"d7y.io/dragonfly/v2/cdnsystem/storedriver"
"d7y.io/dragonfly/v2/cdnsystem/supervisor/cdn/storage"
"d7y.io/dragonfly/v2/cdnsystem/supervisor/mock"
"github.com/golang/mock/gomock"

"d7y.io/dragonfly/v2/pkg/unit"

"github.com/stretchr/testify/suite"
)

func TestDiskStorageMgrSuite(t *testing.T) {
suite.Run(t, new(DiskStorageMgrSuite))
}

type DiskStorageMgrSuite struct {
m *diskStorageMgr
suite.Suite
}

func (suite *DiskStorageMgrSuite) TestTryFreeSpace() {
ctrl := gomock.NewController(suite.T())
diskDriver := storedriver.NewMockDriver(ctrl)
taskMgr := mock.NewMockSeedTaskMgr(ctrl)
suite.m = &diskStorageMgr{
diskDriver: diskDriver,
taskMgr: taskMgr,
}
diskDriver.EXPECT().GetTotalSpace().Return(100*unit.GB, nil)
cleaner, _ := storage.NewStorageCleaner(suite.m.getDefaultGcConfig(), diskDriver, suite.m, taskMgr)
suite.m.cleaner = cleaner

tests := []struct {
name string
setupSuite func()
fileLength int64
success func(bool, error) bool
}{
{
name: "very large free space",
setupSuite: func() {
// call GetFreeSpace 1 time in TryFreeSpace and return
diskDriver.EXPECT().GetFreeSpace().Return(unit.TB, nil)
},
fileLength: unit.MB.ToNumber(),
success: func(ok bool, err error) bool {
return ok == true && err == nil
},
},
{
name: "try a small file",
setupSuite: func() {
// call GetFreeSpace 1 time in TryFreeSpace
diskDriver.EXPECT().GetFreeSpace().Return(100*unit.GB, nil)
// call Walk 1 time in TryFreeSpace
diskDriver.EXPECT().Walk(gomock.Any())
},
fileLength: unit.KB.ToNumber(),
success: func(ok bool, err error) bool {
return ok == true && err == nil
},
},
{
name: "try a very large file",
setupSuite: func() {
// call GetFreeSpace 2 times in TryFreeSpace, 1 time in GC
diskDriver.EXPECT().GetFreeSpace().Return(100*unit.GB, nil).Times(3)
// call Walk 2 times in TryFreeSpace, 1 time in GC
diskDriver.EXPECT().Walk(gomock.Any()).Times(3)
},
fileLength: unit.TB.ToNumber(),
success: func(ok bool, err error) bool {
return ok == false && err == nil
},
},
{
name: "if get free space meets error",
setupSuite: func() {
// call GetFreeSpace 1 times in TryFreeSpace and return
diskDriver.EXPECT().GetFreeSpace().Return(unit.ToBytes(0), fmt.Errorf("a error for test"))
},
fileLength: unit.MB.ToNumber(),
success: func(ok bool, err error) bool {
return ok == false && err != nil && err.Error() == "a error for test"
},
},
{
name: "ok after gc",
setupSuite: func() {
// first call GetFreeSpace 1 times in TryFreeSpace, 1 time in GC
diskDriver.EXPECT().GetFreeSpace().Return(100*unit.MB, nil).Times(2)
// then call GetFreeSpace 1 times in TryFreeSpace, get another value
diskDriver.EXPECT().GetFreeSpace().Return(100*unit.GB, nil)
// call Walk 2 times in TryFreeSpace, 1 time in GC
diskDriver.EXPECT().Walk(gomock.Any()).Times(3)
},
fileLength: unit.GB.ToNumber(),
success: func(ok bool, err error) bool {
return ok == true && err == nil
},
},
}

for _, tt := range tests {
suite.Run(tt.name, func() {
tt.setupSuite()
suite.True(tt.success(suite.m.TryFreeSpace(tt.fileLength)))
})
}
}
65 changes: 57 additions & 8 deletions cdnsystem/supervisor/cdn/storage/hybrid/hybrid.go
Expand Up @@ -177,10 +177,7 @@ func (h *hybridStorageMgr) gcTasks(gcTaskIDs []string, isDisk bool) int {
for _, taskID := range gcTaskIDs {
synclock.Lock(taskID, false)
// try to ensure the taskID is not using again
if _, err := h.taskMgr.Get(taskID); err == nil || !cdnerrors.IsDataNotFound(err) {
if err != nil {
logger.GcLogger.With("type", "hybrid").Errorf("gc disk: failed to get taskID(%s): %v", taskID, err)
}
if _, exist := h.taskMgr.Exist(taskID); exist {
synclock.UnLock(taskID, false)
continue
}
Expand Down Expand Up @@ -297,6 +294,58 @@ func (h *hybridStorageMgr) StatDownloadFile(taskID string) (*storedriver.Storage
return h.diskDriver.Stat(storage.GetDownloadRaw(taskID))
}

func (h *hybridStorageMgr) TryFreeSpace(fileLength int64) (bool, error) {
diskFreeSpace, err := h.diskDriver.GetFreeSpace()
if err != nil {
return false, err
}
if diskFreeSpace > 500*unit.GB && diskFreeSpace.ToNumber() > fileLength {
return true, nil
}

remainder := atomic.NewInt64(0)
r := &storedriver.Raw{
WalkFn: func(filePath string, info os.FileInfo, err error) error {
if fileutils.IsRegular(filePath) {
taskID := strings.Split(path.Base(filePath), ".")[0]
task, exist := h.taskMgr.Exist(taskID)
if exist {
var totalLen int64 = 0
if task.CdnFileLength > 0 {
totalLen = task.CdnFileLength
} else {
totalLen = task.SourceFileLength
}
if totalLen > 0 {
remainder.Add(totalLen - info.Size())
}
} else {
logger.Warnf("failed to get task: %s", taskID)
}
}
return nil
},
}
h.diskDriver.Walk(r)

enoughSpace := diskFreeSpace.ToNumber()-remainder.Load() > fileLength
if !enoughSpace {
h.diskDriverCleaner.GC("hybrid", true)
remainder.Store(0)
h.diskDriver.Walk(r)
diskFreeSpace, err = h.diskDriver.GetFreeSpace()
if err != nil {
return false, err
}
enoughSpace = diskFreeSpace.ToNumber()-remainder.Load() > fileLength
}
if !enoughSpace {
return false, nil
}

return true, nil
}

func (h *hybridStorageMgr) deleteDiskFiles(taskID string) error {
return h.deleteTaskFiles(taskID, true, true)
}
Expand Down Expand Up @@ -352,9 +401,9 @@ func (h *hybridStorageMgr) tryShmSpace(url, taskID string, fileLength int64) (st
h.memoryDriver.Walk(&storedriver.Raw{
WalkFn: func(filePath string, info os.FileInfo, err error) error {
if fileutils.IsRegular(filePath) {
taskID := path.Base(filePath)
task, err := h.taskMgr.Get(taskID)
if err == nil {
taskID := strings.Split(path.Base(filePath), ".")[0]
task, exist := h.taskMgr.Exist(taskID)
if exist {
var totalLen int64 = 0
if task.CdnFileLength > 0 {
totalLen = task.CdnFileLength
Expand All @@ -365,7 +414,7 @@ func (h *hybridStorageMgr) tryShmSpace(url, taskID string, fileLength int64) (st
remainder.Add(totalLen - info.Size())
}
} else {
logger.Warnf("failed to get task: %s: %v", taskID, err)
logger.Warnf("failed to get task: %s", taskID)
}
}
return nil
Expand Down
15 changes: 15 additions & 0 deletions cdnsystem/supervisor/cdn/storage/mock/mock_storage_mgr.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cdnsystem/supervisor/cdn/storage/storage_gc.go
Expand Up @@ -100,7 +100,7 @@ func (cleaner *Cleaner) GC(storagePattern string, force bool) ([]string, error)
walkTaskIds[taskID] = true

// we should return directly when we success to get info which means it is being used
if cleaner.taskMgr.Exist(taskID) {
if _, exist := cleaner.taskMgr.Exist(taskID); exist {
return nil
}

Expand Down
3 changes: 3 additions & 0 deletions cdnsystem/supervisor/cdn/storage/storage_mgr.go
Expand Up @@ -71,6 +71,9 @@ type Manager interface {

// DeleteTask delete task from storage
DeleteTask(taskID string) error

// TryFreeSpace checks if there is enough space for the file, return true while we are sure that there is enough space.
TryFreeSpace(fileLength int64) (bool, error)
}

// FileMetaData meta data of task
Expand Down
3 changes: 3 additions & 0 deletions cdnsystem/supervisor/cdn_mgr.go
Expand Up @@ -34,4 +34,7 @@ type CDNMgr interface {
// Delete the cdn meta with specified taskID.
// The file on the disk will be deleted when the force is true.
Delete(string) error

// TryFreeSpace checks if the free space of the storage is larger than the fileLength.
TryFreeSpace(fileLength int64) (bool, error)
}

0 comments on commit a084386

Please sign in to comment.