diff --git a/supernode/config/config.go b/supernode/config/config.go index 6c3e3f73b..d4a6ff331 100644 --- a/supernode/config/config.go +++ b/supernode/config/config.go @@ -144,7 +144,7 @@ type BaseProperties struct { // PeerDownLimit is the download limit of a peer. When a peer starts to download a file/image, // it will download file/image in the form of pieces. PeerDownLimit mean that a peer can only // stand starting PeerDownLimit concurrent downloading tasks. - // default: 4 + // default: 5 PeerDownLimit int `yaml:"peerDownLimit"` // When dfget node starts to play a role of peer, it will provide services for other peers diff --git a/supernode/daemon/mgr/mock/mock_progress_mgr.go b/supernode/daemon/mgr/mock/mock_progress_mgr.go index 523de2261..7f69fabcb 100644 --- a/supernode/daemon/mgr/mock/mock_progress_mgr.go +++ b/supernode/daemon/mgr/mock/mock_progress_mgr.go @@ -183,6 +183,21 @@ func (mr *MockProgressMgrMockRecorder) GetBlackInfoByPeerID(ctx, peerID interfac return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetBlackInfoByPeerID", reflect.TypeOf((*MockProgressMgr)(nil).GetBlackInfoByPeerID), ctx, peerID) } +// UpdateSuperLoad mocks base method +func (m *MockProgressMgr) UpdateSuperLoad(ctx context.Context, taskID string, delta, limit int32) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateSuperLoad", ctx, taskID, delta, limit) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// UpdateSuperLoad indicates an expected call of UpdateSuperLoad +func (mr *MockProgressMgrMockRecorder) UpdateSuperLoad(ctx, taskID, delta, limit interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateSuperLoad", reflect.TypeOf((*MockProgressMgr)(nil).UpdateSuperLoad), ctx, taskID, delta, limit) +} + // DeleteTaskID mocks base method func (m *MockProgressMgr) DeleteTaskID(ctx context.Context, taskID string) error { m.ctrl.T.Helper() diff --git a/supernode/daemon/mgr/progress/progress_delete.go b/supernode/daemon/mgr/progress/progress_delete.go index 2769e668d..8cbc1cc8c 100644 --- a/supernode/daemon/mgr/progress/progress_delete.go +++ b/supernode/daemon/mgr/progress/progress_delete.go @@ -24,6 +24,7 @@ import ( // DeleteTaskID deletes the super progress with specified taskID. func (pm *Manager) DeleteTaskID(ctx context.Context, taskID string) (err error) { + pm.superLoad.remove(taskID) return pm.superProgress.remove(taskID) } diff --git a/supernode/daemon/mgr/progress/progress_manager.go b/supernode/daemon/mgr/progress/progress_manager.go index c7cce2e88..324f85dec 100644 --- a/supernode/daemon/mgr/progress/progress_manager.go +++ b/supernode/daemon/mgr/progress/progress_manager.go @@ -51,39 +51,47 @@ var _ mgr.ProgressMgr = &Manager{} // Manager is an implementation of the interface of ProgressMgr. type Manager struct { // superProgress maintains the super progress. - // key:taskID,value:*superState + // key:taskID string, value:superState *superState superProgress *stateSyncMap // clientProgress maintains the client progress. - // key:CID,value:*clientState + // key:CID string, value:clientState *clientState clientProgress *stateSyncMap // peerProgress maintains the peer progress. - // key:PeerID,value:*peerState + // key:PeerID string, value:peerState *peerState peerProgress *stateSyncMap // pieceProgress maintains the information about // which peers the piece currently exists on - // key:pieceNum@taskID,value:*pieceState + // key:pieceNum@taskID string, value:pieceState *pieceState pieceProgress *stateSyncMap // clientBlackInfo maintains the blacklist of the PID. - // key:srcPID,value:map[dstPID]*Atomic + // key:srcPID string, value:dstPIDMap map[dstPID]*Atomic clientBlackInfo *syncmap.SyncMap + // superLoad maintains the load num downloaded from the supernode for each task. + // key:taskID string, value:superLoadState *superLoadState + superLoad *stateSyncMap + cfg *config.Config } // NewManager returns a new Manager. func NewManager(cfg *config.Config) (*Manager, error) { - return &Manager{ + manager := &Manager{ cfg: cfg, superProgress: newStateSyncMap(), clientProgress: newStateSyncMap(), peerProgress: newStateSyncMap(), pieceProgress: newStateSyncMap(), clientBlackInfo: syncmap.NewSyncMap(), - }, nil + superLoad: newStateSyncMap(), + } + + manager.startMonitorSuperLoad() + return manager, nil } // InitProgress inits the correlation information between peers and pieces, etc. diff --git a/supernode/daemon/mgr/progress/progress_state.go b/supernode/daemon/mgr/progress/progress_state.go index 4d3540ff1..2dd5f8be0 100644 --- a/supernode/daemon/mgr/progress/progress_state.go +++ b/supernode/daemon/mgr/progress/progress_state.go @@ -17,6 +17,8 @@ package progress import ( + "time" + "github.com/dragonflyoss/Dragonfly/pkg/atomiccount" "github.com/dragonflyoss/Dragonfly/pkg/syncmap" @@ -61,6 +63,14 @@ type peerState struct { serviceDownTime int64 } +type superLoadState struct { + // superLoad maintains the load num downloaded from the supernode for each task. + loadValue *atomiccount.AtomicInt + + // loadModTime will record the time when the load be modified. + loadModTime time.Time +} + func newSuperState() *superState { return &superState{ pieceBitSet: &bitset.BitSet{}, @@ -81,3 +91,10 @@ func newPeerState() *peerState { serviceErrorCount: atomiccount.NewAtomicInt(0), } } + +func newSuperLoadState() *superLoadState { + return &superLoadState{ + loadValue: atomiccount.NewAtomicInt(0), + loadModTime: time.Now(), + } +} diff --git a/supernode/daemon/mgr/progress/state_sync_map.go b/supernode/daemon/mgr/progress/state_sync_map.go index 80ef7a8f5..cde3a47fe 100644 --- a/supernode/daemon/mgr/progress/state_sync_map.go +++ b/supernode/daemon/mgr/progress/state_sync_map.go @@ -23,7 +23,7 @@ import ( "github.com/pkg/errors" ) -// stateSyncMap is a thread-safe map. +// stateSyncMap is a thread-safe map for progress state. type stateSyncMap struct { *syncmap.SyncMap } @@ -102,6 +102,20 @@ func (mmap *stateSyncMap) getAsPieceState(key string) (*pieceState, error) { return nil, errors.Wrapf(errortypes.ErrConvertFailed, "key %s: %v", key, v) } +// getAsSuperLoadState returns result as *superLoadState. +// The ErrConvertFailed error will be returned if the assertion fails. +func (mmap *stateSyncMap) getAsSuperLoadState(key string) (*superLoadState, error) { + v, err := mmap.get(key) + if err != nil { + return nil, errors.Wrapf(err, "key: %s", key) + } + + if value, ok := v.(*superLoadState); ok { + return value, nil + } + return nil, errors.Wrapf(errortypes.ErrConvertFailed, "key %s: %v", key, v) +} + // remove deletes the key-value pair from the mmap. // The ErrEmptyValue error will be returned if the key is empty. // And the ErrDataNotFound error will be returned if the key cannot be found. diff --git a/supernode/daemon/mgr/progress/superload_manager.go b/supernode/daemon/mgr/progress/superload_manager.go new file mode 100644 index 000000000..7f8efdef4 --- /dev/null +++ b/supernode/daemon/mgr/progress/superload_manager.go @@ -0,0 +1,78 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package progress + +import ( + "context" + "time" + + "github.com/dragonflyoss/Dragonfly/pkg/errortypes" +) + +const ( + // renewInterval is the interval time to check the superload. + renewInterval = 2 * time.Second + + // renewDelayTime if the superload has not been changed after renewDelayTime, it should be renewed. + renewDelayTime = 30 * time.Second +) + +// UpdateSuperLoad updates the superload of taskID by adding the delta. +// The updated will be `false` if failed to do update operation. +// +// It's considered as a failure when then superload is greater than limit after adding delta. +func (pm *Manager) UpdateSuperLoad(ctx context.Context, taskID string, delta, limit int32) (updated bool, err error) { + v, _ := pm.superLoad.LoadOrStore(taskID, newSuperLoadState()) + loadState, ok := v.(*superLoadState) + if !ok { + return false, errortypes.ErrConvertFailed + } + + if loadState.loadValue.Add(delta) > limit && limit > 0 { + loadState.loadValue.Add(-delta) + return false, nil + } + loadState.loadModTime = time.Now() + + return true, nil +} + +// startMonitorSuperLoad starts a new goroutine to check the superload periodically and +// reset the superload to zero if there is no update for a long time for one task to +// avoid being occupied when supernode doesn't receive the message from peers that downloading piece from supernode for a variety of reasons. +// +func (pm *Manager) startMonitorSuperLoad() { + go func() { + ticker := time.NewTicker(renewInterval) + for range ticker.C { + pm.renewSuperLoad() + } + }() +} + +func (pm *Manager) renewSuperLoad() { + rangeFunc := func(key, value interface{}) bool { + if v, ok := value.(*superLoadState); ok { + if time.Since(v.loadModTime) > renewDelayTime { + v.loadValue.Set(0) + } + } + return true + } + + pm.superLoad.Range(rangeFunc) +} diff --git a/supernode/daemon/mgr/progress_mgr.go b/supernode/daemon/mgr/progress_mgr.go index e9f93382c..6e7761402 100644 --- a/supernode/daemon/mgr/progress_mgr.go +++ b/supernode/daemon/mgr/progress_mgr.go @@ -70,10 +70,10 @@ type ProgressMgr interface { // GetPeerStateByPeerID gets peer state with specified peerID. GetPeerStateByPeerID(ctx context.Context, peerID string) (peerState *PeerState, err error) - // UpdatePeerServiceDown do update operation when a peer server offline. + // UpdateSuperLoad updates the superload of taskID by adding the delta. + // The updated will be `false` if failed to do update operation. // - // This function will update the service down time for the peerID. - // And the supernode will not dispatch tasks to this peer. + // It's considered as a failure when then superload is greater than limit after adding delta. UpdatePeerServiceDown(ctx context.Context, peerID string) (err error) // GetPeersByTaskID gets all peers info with specified taskID. @@ -82,6 +82,11 @@ type ProgressMgr interface { // GetBlackInfoByPeerID gets black info with specified peerID. GetBlackInfoByPeerID(ctx context.Context, peerID string) (dstPIDMap *syncmap.SyncMap, err error) + // UpdateSuperLoad update the superLoad with delta. + // + // The value will be rolled back if it exceeds the limit after updated and returns false. + UpdateSuperLoad(ctx context.Context, taskID string, delta, limit int32) (updated bool, err error) + // DeleteTaskID deletes the super progress with specified taskID. DeleteTaskID(ctx context.Context, taskID string) (err error) diff --git a/supernode/daemon/mgr/scheduler/manager.go b/supernode/daemon/mgr/scheduler/manager.go index 2e7112f8c..58a64b8b4 100644 --- a/supernode/daemon/mgr/scheduler/manager.go +++ b/supernode/daemon/mgr/scheduler/manager.go @@ -172,6 +172,18 @@ func (sm *Manager) getPieceResults(ctx context.Context, taskID, clientID, peerID continue } + // We limit the number of simultaneous connections that supernode can accept for each task. + if sm.cfg.IsSuperPID(dstPID) { + updated, err := sm.progressMgr.UpdateSuperLoad(ctx, taskID, 1, int32(sm.cfg.PeerDownLimit)) + if err != nil { + logrus.Warnf("failed to update super load taskID(%s) clientID(%s): %v", taskID, clientID, err) + continue + } + if !updated { + continue + } + } + if err := sm.progressMgr.UpdateClientProgress(ctx, taskID, clientID, dstPID, pieceNums[i], config.PieceRUNNING); err != nil { logrus.Warnf("scheduler: failed to update client progress running for pieceNum(%d) taskID(%s) clientID(%s) dstPID(%s)", pieceNums[i], taskID, clientID, dstPID) continue diff --git a/supernode/daemon/mgr/task/manager.go b/supernode/daemon/mgr/task/manager.go index 859ca9646..9866fd087 100644 --- a/supernode/daemon/mgr/task/manager.go +++ b/supernode/daemon/mgr/task/manager.go @@ -272,6 +272,15 @@ func (tm *Manager) UpdatePieceStatus(ctx context.Context, taskID, pieceRange str pieceRange, taskID, pieceUpdateRequest.ClientID) } + // when a peer success to download a piece from supernode, + // and the load of supernode for the taskID should be decremented by one. + if tm.cfg.IsSuperPID(pieceUpdateRequest.DstPID) { + _, err := tm.progressMgr.UpdateSuperLoad(ctx, taskID, -1, -1) + if err != nil { + logrus.Warnf("failed to update superLoad taskID(%s) clientID(%s): %v", taskID, pieceUpdateRequest.ClientID, err) + } + } + // get dfgetTask according to the CID srcDfgetTask, err := tm.dfgetTaskMgr.Get(ctx, pieceUpdateRequest.ClientID, taskID) if err != nil { diff --git a/supernode/daemon/mgr/task/manager_util.go b/supernode/daemon/mgr/task/manager_util.go index cfb000bea..b509fc583 100644 --- a/supernode/daemon/mgr/task/manager_util.go +++ b/supernode/daemon/mgr/task/manager_util.go @@ -345,6 +345,9 @@ func (tm *Manager) parseAvailablePeers(ctx context.Context, clientID string, tas tm.metrics.scheduleDurationMilliSeconds.WithLabelValues(peer.IP.String()).Observe(timeutils.SinceInMilliseconds(startTime)) logrus.Debugf("get scheduler result length(%d) with taskID(%s) and clientID(%s)", len(pieceResult), task.ID, clientID) + if len(pieceResult) == 0 { + return false, nil, errortypes.ErrPeerWait + } var pieceInfos []*types.PieceInfo for _, v := range pieceResult { logrus.Debugf("get scheduler result item: %+v with taskID(%s) and clientID(%s)", v, task.ID, clientID)