From a084386c72974d7bbf5702953efc58520c606f1b Mon Sep 17 00:00:00 2001 From: zzy987 <67889264+zzy987@users.noreply.github.com> Date: Wed, 1 Sep 2021 14:17:26 +0800 Subject: [PATCH] Check free space when registering task (#585) Signed-off-by: zzy987 <67889264+zzy987@users.noreply.github.com> --- cdnsystem/cdn.go | 3 - cdnsystem/errors/errors.go | 7 ++ cdnsystem/rpcserver/rpcserver.go | 5 + cdnsystem/supervisor/cdn/manager.go | 4 + cdnsystem/supervisor/cdn/storage/disk/disk.go | 58 ++++++++- .../supervisor/cdn/storage/disk/disk_test.go | 115 ++++++++++++++++++ .../supervisor/cdn/storage/hybrid/hybrid.go | 65 ++++++++-- .../cdn/storage/mock/mock_storage_mgr.go | 15 +++ .../supervisor/cdn/storage/storage_gc.go | 2 +- .../supervisor/cdn/storage/storage_mgr.go | 3 + cdnsystem/supervisor/cdn_mgr.go | 3 + cdnsystem/supervisor/mock/mock_cdn_mgr.go | 15 +++ cdnsystem/supervisor/mock/mock_task_mgr.go | 23 +--- cdnsystem/supervisor/task/manager.go | 16 +-- cdnsystem/supervisor/task/manager_util.go | 8 ++ cdnsystem/supervisor/task_mgr.go | 6 +- 16 files changed, 299 insertions(+), 49 deletions(-) create mode 100644 cdnsystem/supervisor/cdn/storage/disk/disk_test.go diff --git a/cdnsystem/cdn.go b/cdnsystem/cdn.go index ef7e80b31a2..eebb3639111 100644 --- a/cdnsystem/cdn.go +++ b/cdnsystem/cdn.go @@ -84,9 +84,6 @@ func New(cfg *config.Config) (*Server, error) { // Initialize storage manager storageMgr.Initialize(taskMgr) - if err != nil { - return nil, errors.Wrapf(err, "create storage manager") - } // Initialize storage manager cdnSeedServer, err := rpcserver.NewCdnSeedServer(cfg, taskMgr) diff --git a/cdnsystem/errors/errors.go b/cdnsystem/errors/errors.go index 0de97f0e5f0..d35bae1ea6c 100644 --- a/cdnsystem/errors/errors.go +++ b/cdnsystem/errors/errors.go @@ -92,6 +92,9 @@ var ( // ErrConvertFailed represents failed to convert. ErrConvertFailed = errors.New("convert failed") + + // ErrResourcesLacked represents a lack of resources, for example, the disk does not have enough space. + ErrResourcesLacked = errors.New("resources lacked") ) // IsSystemError checks the error is a system error or not. @@ -152,3 +155,7 @@ func IsFileNotExist(err error) bool { _, ok := err.(ErrFileNotExist) return ok } + +func IsResourcesLacked(err error) bool { + return errors.Cause(err) == ErrResourcesLacked +} diff --git a/cdnsystem/rpcserver/rpcserver.go b/cdnsystem/rpcserver/rpcserver.go index 6bf9aa77a4a..33c4188d124 100644 --- a/cdnsystem/rpcserver/rpcserver.go +++ b/cdnsystem/rpcserver/rpcserver.go @@ -143,6 +143,11 @@ func (css *CdnSeedServer) ObtainSeeds(ctx context.Context, req *cdnsystem.SeedRe // register task pieceChan, err := css.taskMgr.Register(ctx, registerRequest) if err != nil { + if cdnerrors.IsResourcesLacked(err) { + err = dferrors.Newf(dfcodes.ResourceLacked, "resources lacked for task(%s): %v", req.TaskId, err) + span.RecordError(err) + return err + } err = dferrors.Newf(dfcodes.CdnTaskRegistryFail, "failed to register seed task(%s): %v", req.TaskId, err) span.RecordError(err) return err diff --git a/cdnsystem/supervisor/cdn/manager.go b/cdnsystem/supervisor/cdn/manager.go index e5fc5787bfe..3cb61cf2001 100644 --- a/cdnsystem/supervisor/cdn/manager.go +++ b/cdnsystem/supervisor/cdn/manager.go @@ -174,6 +174,10 @@ func (cm *Manager) Delete(taskID string) error { return nil } +func (cm *Manager) TryFreeSpace(fileLength int64) (bool, error) { + return cm.cacheStore.TryFreeSpace(fileLength) +} + func (cm *Manager) handleCDNResult(task *types.SeedTask, sourceDigest string, downloadMetadata *downloadMetadata) (bool, error) { logger.WithTaskID(task.TaskID).Debugf("handle cdn result, downloadMetaData: %+v", downloadMetadata) var isSuccess = true diff --git a/cdnsystem/supervisor/cdn/storage/disk/disk.go b/cdnsystem/supervisor/cdn/storage/disk/disk.go index f058c5c8bf9..5645bd72451 100644 --- a/cdnsystem/supervisor/cdn/storage/disk/disk.go +++ b/cdnsystem/supervisor/cdn/storage/disk/disk.go @@ -20,9 +20,13 @@ import ( "encoding/json" "fmt" "io" + "os" + "path" "strings" "time" + "go.uber.org/atomic" + cdnerrors "d7y.io/dragonfly/v2/cdnsystem/errors" "d7y.io/dragonfly/v2/cdnsystem/storedriver" "d7y.io/dragonfly/v2/cdnsystem/storedriver/local" @@ -131,7 +135,7 @@ func (s *diskStorageMgr) GC() error { for _, taskID := range gcTaskIDs { synclock.Lock(taskID, false) // try to ensure the taskID is not using again - if s.taskMgr.Exist(taskID) { + if _, exist := s.taskMgr.Exist(taskID); exist { synclock.UnLock(taskID, false) continue } @@ -230,3 +234,55 @@ func (s *diskStorageMgr) DeleteTask(taskID string) error { func (s *diskStorageMgr) ResetRepo(task *types.SeedTask) error { return s.DeleteTask(task.TaskID) } + +func (s *diskStorageMgr) TryFreeSpace(fileLength int64) (bool, error) { + freeSpace, err := s.diskDriver.GetFreeSpace() + if err != nil { + return false, err + } + if freeSpace > 500*unit.GB && freeSpace.ToNumber() > fileLength { + return true, nil + } + + remainder := atomic.NewInt64(0) + r := &storedriver.Raw{ + WalkFn: func(filePath string, info os.FileInfo, err error) error { + if fileutils.IsRegular(filePath) { + taskID := strings.Split(path.Base(filePath), ".")[0] + task, exist := s.taskMgr.Exist(taskID) + if exist { + var totalLen int64 = 0 + if task.CdnFileLength > 0 { + totalLen = task.CdnFileLength + } else { + totalLen = task.SourceFileLength + } + if totalLen > 0 { + remainder.Add(totalLen - info.Size()) + } + } else { + logger.Warnf("failed to get task: %s", taskID) + } + } + return nil + }, + } + s.diskDriver.Walk(r) + + enoughSpace := freeSpace.ToNumber()-remainder.Load() > fileLength + if !enoughSpace { + s.cleaner.GC("disk", true) + remainder.Store(0) + s.diskDriver.Walk(r) + freeSpace, err = s.diskDriver.GetFreeSpace() + if err != nil { + return false, err + } + enoughSpace = freeSpace.ToNumber()-remainder.Load() > fileLength + } + if !enoughSpace { + return false, nil + } + + return true, nil +} diff --git a/cdnsystem/supervisor/cdn/storage/disk/disk_test.go b/cdnsystem/supervisor/cdn/storage/disk/disk_test.go new file mode 100644 index 00000000000..b9048efe430 --- /dev/null +++ b/cdnsystem/supervisor/cdn/storage/disk/disk_test.go @@ -0,0 +1,115 @@ +package disk + +import ( + "fmt" + "testing" + + "d7y.io/dragonfly/v2/cdnsystem/storedriver" + "d7y.io/dragonfly/v2/cdnsystem/supervisor/cdn/storage" + "d7y.io/dragonfly/v2/cdnsystem/supervisor/mock" + "github.com/golang/mock/gomock" + + "d7y.io/dragonfly/v2/pkg/unit" + + "github.com/stretchr/testify/suite" +) + +func TestDiskStorageMgrSuite(t *testing.T) { + suite.Run(t, new(DiskStorageMgrSuite)) +} + +type DiskStorageMgrSuite struct { + m *diskStorageMgr + suite.Suite +} + +func (suite *DiskStorageMgrSuite) TestTryFreeSpace() { + ctrl := gomock.NewController(suite.T()) + diskDriver := storedriver.NewMockDriver(ctrl) + taskMgr := mock.NewMockSeedTaskMgr(ctrl) + suite.m = &diskStorageMgr{ + diskDriver: diskDriver, + taskMgr: taskMgr, + } + diskDriver.EXPECT().GetTotalSpace().Return(100*unit.GB, nil) + cleaner, _ := storage.NewStorageCleaner(suite.m.getDefaultGcConfig(), diskDriver, suite.m, taskMgr) + suite.m.cleaner = cleaner + + tests := []struct { + name string + setupSuite func() + fileLength int64 + success func(bool, error) bool + }{ + { + name: "very large free space", + setupSuite: func() { + // call GetFreeSpace 1 time in TryFreeSpace and return + diskDriver.EXPECT().GetFreeSpace().Return(unit.TB, nil) + }, + fileLength: unit.MB.ToNumber(), + success: func(ok bool, err error) bool { + return ok == true && err == nil + }, + }, + { + name: "try a small file", + setupSuite: func() { + // call GetFreeSpace 1 time in TryFreeSpace + diskDriver.EXPECT().GetFreeSpace().Return(100*unit.GB, nil) + // call Walk 1 time in TryFreeSpace + diskDriver.EXPECT().Walk(gomock.Any()) + }, + fileLength: unit.KB.ToNumber(), + success: func(ok bool, err error) bool { + return ok == true && err == nil + }, + }, + { + name: "try a very large file", + setupSuite: func() { + // call GetFreeSpace 2 times in TryFreeSpace, 1 time in GC + diskDriver.EXPECT().GetFreeSpace().Return(100*unit.GB, nil).Times(3) + // call Walk 2 times in TryFreeSpace, 1 time in GC + diskDriver.EXPECT().Walk(gomock.Any()).Times(3) + }, + fileLength: unit.TB.ToNumber(), + success: func(ok bool, err error) bool { + return ok == false && err == nil + }, + }, + { + name: "if get free space meets error", + setupSuite: func() { + // call GetFreeSpace 1 times in TryFreeSpace and return + diskDriver.EXPECT().GetFreeSpace().Return(unit.ToBytes(0), fmt.Errorf("a error for test")) + }, + fileLength: unit.MB.ToNumber(), + success: func(ok bool, err error) bool { + return ok == false && err != nil && err.Error() == "a error for test" + }, + }, + { + name: "ok after gc", + setupSuite: func() { + // first call GetFreeSpace 1 times in TryFreeSpace, 1 time in GC + diskDriver.EXPECT().GetFreeSpace().Return(100*unit.MB, nil).Times(2) + // then call GetFreeSpace 1 times in TryFreeSpace, get another value + diskDriver.EXPECT().GetFreeSpace().Return(100*unit.GB, nil) + // call Walk 2 times in TryFreeSpace, 1 time in GC + diskDriver.EXPECT().Walk(gomock.Any()).Times(3) + }, + fileLength: unit.GB.ToNumber(), + success: func(ok bool, err error) bool { + return ok == true && err == nil + }, + }, + } + + for _, tt := range tests { + suite.Run(tt.name, func() { + tt.setupSuite() + suite.True(tt.success(suite.m.TryFreeSpace(tt.fileLength))) + }) + } +} diff --git a/cdnsystem/supervisor/cdn/storage/hybrid/hybrid.go b/cdnsystem/supervisor/cdn/storage/hybrid/hybrid.go index 5aca00ab8b4..11329a08dbb 100644 --- a/cdnsystem/supervisor/cdn/storage/hybrid/hybrid.go +++ b/cdnsystem/supervisor/cdn/storage/hybrid/hybrid.go @@ -177,10 +177,7 @@ func (h *hybridStorageMgr) gcTasks(gcTaskIDs []string, isDisk bool) int { for _, taskID := range gcTaskIDs { synclock.Lock(taskID, false) // try to ensure the taskID is not using again - if _, err := h.taskMgr.Get(taskID); err == nil || !cdnerrors.IsDataNotFound(err) { - if err != nil { - logger.GcLogger.With("type", "hybrid").Errorf("gc disk: failed to get taskID(%s): %v", taskID, err) - } + if _, exist := h.taskMgr.Exist(taskID); exist { synclock.UnLock(taskID, false) continue } @@ -297,6 +294,58 @@ func (h *hybridStorageMgr) StatDownloadFile(taskID string) (*storedriver.Storage return h.diskDriver.Stat(storage.GetDownloadRaw(taskID)) } +func (h *hybridStorageMgr) TryFreeSpace(fileLength int64) (bool, error) { + diskFreeSpace, err := h.diskDriver.GetFreeSpace() + if err != nil { + return false, err + } + if diskFreeSpace > 500*unit.GB && diskFreeSpace.ToNumber() > fileLength { + return true, nil + } + + remainder := atomic.NewInt64(0) + r := &storedriver.Raw{ + WalkFn: func(filePath string, info os.FileInfo, err error) error { + if fileutils.IsRegular(filePath) { + taskID := strings.Split(path.Base(filePath), ".")[0] + task, exist := h.taskMgr.Exist(taskID) + if exist { + var totalLen int64 = 0 + if task.CdnFileLength > 0 { + totalLen = task.CdnFileLength + } else { + totalLen = task.SourceFileLength + } + if totalLen > 0 { + remainder.Add(totalLen - info.Size()) + } + } else { + logger.Warnf("failed to get task: %s", taskID) + } + } + return nil + }, + } + h.diskDriver.Walk(r) + + enoughSpace := diskFreeSpace.ToNumber()-remainder.Load() > fileLength + if !enoughSpace { + h.diskDriverCleaner.GC("hybrid", true) + remainder.Store(0) + h.diskDriver.Walk(r) + diskFreeSpace, err = h.diskDriver.GetFreeSpace() + if err != nil { + return false, err + } + enoughSpace = diskFreeSpace.ToNumber()-remainder.Load() > fileLength + } + if !enoughSpace { + return false, nil + } + + return true, nil +} + func (h *hybridStorageMgr) deleteDiskFiles(taskID string) error { return h.deleteTaskFiles(taskID, true, true) } @@ -352,9 +401,9 @@ func (h *hybridStorageMgr) tryShmSpace(url, taskID string, fileLength int64) (st h.memoryDriver.Walk(&storedriver.Raw{ WalkFn: func(filePath string, info os.FileInfo, err error) error { if fileutils.IsRegular(filePath) { - taskID := path.Base(filePath) - task, err := h.taskMgr.Get(taskID) - if err == nil { + taskID := strings.Split(path.Base(filePath), ".")[0] + task, exist := h.taskMgr.Exist(taskID) + if exist { var totalLen int64 = 0 if task.CdnFileLength > 0 { totalLen = task.CdnFileLength @@ -365,7 +414,7 @@ func (h *hybridStorageMgr) tryShmSpace(url, taskID string, fileLength int64) (st remainder.Add(totalLen - info.Size()) } } else { - logger.Warnf("failed to get task: %s: %v", taskID, err) + logger.Warnf("failed to get task: %s", taskID) } } return nil diff --git a/cdnsystem/supervisor/cdn/storage/mock/mock_storage_mgr.go b/cdnsystem/supervisor/cdn/storage/mock/mock_storage_mgr.go index 0e7720c5bc0..eafa7a3c77a 100644 --- a/cdnsystem/supervisor/cdn/storage/mock/mock_storage_mgr.go +++ b/cdnsystem/supervisor/cdn/storage/mock/mock_storage_mgr.go @@ -166,6 +166,21 @@ func (mr *MockManagerMockRecorder) StatDownloadFile(arg0 interface{}) *gomock.Ca return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StatDownloadFile", reflect.TypeOf((*MockManager)(nil).StatDownloadFile), arg0) } +// TryFreeSpace mocks base method. +func (m *MockManager) TryFreeSpace(arg0 int64) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TryFreeSpace", arg0) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// TryFreeSpace indicates an expected call of TryFreeSpace. +func (mr *MockManagerMockRecorder) TryFreeSpace(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TryFreeSpace", reflect.TypeOf((*MockManager)(nil).TryFreeSpace), arg0) +} + // WriteDownloadFile mocks base method. func (m *MockManager) WriteDownloadFile(arg0 string, arg1, arg2 int64, arg3 io.Reader) error { m.ctrl.T.Helper() diff --git a/cdnsystem/supervisor/cdn/storage/storage_gc.go b/cdnsystem/supervisor/cdn/storage/storage_gc.go index 6938f3ad431..053e382d7bf 100644 --- a/cdnsystem/supervisor/cdn/storage/storage_gc.go +++ b/cdnsystem/supervisor/cdn/storage/storage_gc.go @@ -100,7 +100,7 @@ func (cleaner *Cleaner) GC(storagePattern string, force bool) ([]string, error) walkTaskIds[taskID] = true // we should return directly when we success to get info which means it is being used - if cleaner.taskMgr.Exist(taskID) { + if _, exist := cleaner.taskMgr.Exist(taskID); exist { return nil } diff --git a/cdnsystem/supervisor/cdn/storage/storage_mgr.go b/cdnsystem/supervisor/cdn/storage/storage_mgr.go index 3d63aad8eca..03b1f790227 100644 --- a/cdnsystem/supervisor/cdn/storage/storage_mgr.go +++ b/cdnsystem/supervisor/cdn/storage/storage_mgr.go @@ -71,6 +71,9 @@ type Manager interface { // DeleteTask delete task from storage DeleteTask(taskID string) error + + // TryFreeSpace checks if there is enough space for the file, return true while we are sure that there is enough space. + TryFreeSpace(fileLength int64) (bool, error) } // FileMetaData meta data of task diff --git a/cdnsystem/supervisor/cdn_mgr.go b/cdnsystem/supervisor/cdn_mgr.go index 2e4b22bf5e1..fa0367302e2 100644 --- a/cdnsystem/supervisor/cdn_mgr.go +++ b/cdnsystem/supervisor/cdn_mgr.go @@ -34,4 +34,7 @@ type CDNMgr interface { // Delete the cdn meta with specified taskID. // The file on the disk will be deleted when the force is true. Delete(string) error + + // TryFreeSpace checks if the free space of the storage is larger than the fileLength. + TryFreeSpace(fileLength int64) (bool, error) } diff --git a/cdnsystem/supervisor/mock/mock_cdn_mgr.go b/cdnsystem/supervisor/mock/mock_cdn_mgr.go index 058ae937099..31a7bcf3dd8 100644 --- a/cdnsystem/supervisor/mock/mock_cdn_mgr.go +++ b/cdnsystem/supervisor/mock/mock_cdn_mgr.go @@ -63,3 +63,18 @@ func (mr *MockCDNMgrMockRecorder) TriggerCDN(arg0, arg1 interface{}) *gomock.Cal mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TriggerCDN", reflect.TypeOf((*MockCDNMgr)(nil).TriggerCDN), arg0, arg1) } + +// TryFreeSpace mocks base method. +func (m *MockCDNMgr) TryFreeSpace(arg0 int64) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TryFreeSpace", arg0) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// TryFreeSpace indicates an expected call of TryFreeSpace. +func (mr *MockCDNMgrMockRecorder) TryFreeSpace(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TryFreeSpace", reflect.TypeOf((*MockCDNMgr)(nil).TryFreeSpace), arg0) +} diff --git a/cdnsystem/supervisor/mock/mock_task_mgr.go b/cdnsystem/supervisor/mock/mock_task_mgr.go index ab4517797e0..f4d3dc40919 100644 --- a/cdnsystem/supervisor/mock/mock_task_mgr.go +++ b/cdnsystem/supervisor/mock/mock_task_mgr.go @@ -9,7 +9,6 @@ import ( reflect "reflect" types "d7y.io/dragonfly/v2/cdnsystem/types" - syncmap "d7y.io/dragonfly/v2/pkg/structure/syncmap" gomock "github.com/golang/mock/gomock" ) @@ -51,11 +50,12 @@ func (mr *MockSeedTaskMgrMockRecorder) Delete(arg0 interface{}) *gomock.Call { } // Exist mocks base method. -func (m *MockSeedTaskMgr) Exist(arg0 string) bool { +func (m *MockSeedTaskMgr) Exist(arg0 string) (*types.SeedTask, bool) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Exist", arg0) - ret0, _ := ret[0].(bool) - return ret0 + ret0, _ := ret[0].(*types.SeedTask) + ret1, _ := ret[1].(bool) + return ret0, ret1 } // Exist indicates an expected call of Exist. @@ -79,21 +79,6 @@ func (mr *MockSeedTaskMgrMockRecorder) Get(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockSeedTaskMgr)(nil).Get), arg0) } -// GetAccessTime mocks base method. -func (m *MockSeedTaskMgr) GetAccessTime() (*syncmap.SyncMap, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetAccessTime") - ret0, _ := ret[0].(*syncmap.SyncMap) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GetAccessTime indicates an expected call of GetAccessTime. -func (mr *MockSeedTaskMgrMockRecorder) GetAccessTime() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAccessTime", reflect.TypeOf((*MockSeedTaskMgr)(nil).GetAccessTime)) -} - // GetPieces mocks base method. func (m *MockSeedTaskMgr) GetPieces(arg0 context.Context, arg1 string) ([]*types.SeedPiece, error) { m.ctrl.T.Helper() diff --git a/cdnsystem/supervisor/task/manager.go b/cdnsystem/supervisor/task/manager.go index d9be8db2f65..85b1b789454 100644 --- a/cdnsystem/supervisor/task/manager.go +++ b/cdnsystem/supervisor/task/manager.go @@ -19,7 +19,6 @@ package task import ( "context" "encoding/json" - "fmt" "time" "d7y.io/dragonfly/v2/cdnsystem/config" @@ -181,13 +180,9 @@ func (tm Manager) Get(taskID string) (*types.SeedTask, error) { return task, err } -func (tm Manager) Exist(taskID string) bool { - _, err := tm.getTask(taskID) - return err == nil || !cdnerrors.IsDataNotFound(err) -} - -func (tm Manager) GetAccessTime() (*syncmap.SyncMap, error) { - return tm.accessTimeMap, nil +func (tm Manager) Exist(taskID string) (*types.SeedTask, bool) { + task, err := tm.getTask(taskID) + return task, err == nil } func (tm Manager) Delete(taskID string) error { @@ -215,10 +210,7 @@ func (tm *Manager) GC() error { var removedTaskCount int startTime := time.Now() // get all taskIDs and the corresponding accessTime - taskAccessMap, err := tm.GetAccessTime() - if err != nil { - return fmt.Errorf("gc tasks: failed to get task accessTime map for GC: %v", err) - } + taskAccessMap := tm.accessTimeMap // range all tasks and determine whether they are expired taskIDs := taskAccessMap.ListKeyAsStringSlice() diff --git a/cdnsystem/supervisor/task/manager_util.go b/cdnsystem/supervisor/task/manager_util.go index 78948968067..824726eb34a 100644 --- a/cdnsystem/supervisor/task/manager_util.go +++ b/cdnsystem/supervisor/task/manager_util.go @@ -111,6 +111,14 @@ func (tm *Manager) addOrUpdateTask(ctx context.Context, request *types.TaskRegis // if not support file length header request ,return -1 task.SourceFileLength = sourceFileLength logger.WithTaskID(taskID).Debugf("get file content length: %d", sourceFileLength) + if task.SourceFileLength > 0 { + ok, err := tm.cdnMgr.TryFreeSpace(task.SourceFileLength) + if err != nil { + logger.Errorf("failed to try free space: %v", err) + } else if !ok { + return nil, cdnerrors.ErrResourcesLacked + } + } // if success to get the information successfully with the req.Header then update the task.Header to req.Header. if request.Header != nil { diff --git a/cdnsystem/supervisor/task_mgr.go b/cdnsystem/supervisor/task_mgr.go index 1b383ec313b..41d57a04cba 100644 --- a/cdnsystem/supervisor/task_mgr.go +++ b/cdnsystem/supervisor/task_mgr.go @@ -21,7 +21,6 @@ import ( "context" "d7y.io/dragonfly/v2/cdnsystem/types" - "d7y.io/dragonfly/v2/pkg/structure/syncmap" ) // SeedTaskMgr as an interface defines all operations against SeedTask. @@ -36,10 +35,7 @@ type SeedTaskMgr interface { Get(string) (*types.SeedTask, error) // Exist check task existence with specified taskId. - Exist(string) bool - - // GetAccessTime get all tasks accessTime. - GetAccessTime() (*syncmap.SyncMap, error) + Exist(string) (*types.SeedTask, bool) // Delete delete a task. Delete(string) error