Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

Commit

Permalink
feature: update scheduler manager GetSuperPID
Browse files Browse the repository at this point in the history
Signed-off-by: Starnop <starnop@163.com>
  • Loading branch information
starnop committed May 27, 2019
1 parent 4e80d77 commit 950ee65
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 17 deletions.
2 changes: 1 addition & 1 deletion supernode/daemon/mgr/peer/manager.go
@@ -1,4 +1,4 @@
package mgr
package peer

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion supernode/daemon/mgr/peer/manager_test.go
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package mgr
package peer

import (
"context"
Expand Down
20 changes: 10 additions & 10 deletions supernode/daemon/mgr/scheduler/manager.go
Expand Up @@ -23,12 +23,14 @@ var _ mgr.SchedulerMgr = &Manager{}

// Manager is an implement of the interface of SchedulerMgr.
type Manager struct {
cfg *config.Config
progressMgr mgr.ProgressMgr
}

// NewManager returns a new Manager.
func NewManager(progressMgr mgr.ProgressMgr) (*Manager, error) {
func NewManager(cfg *config.Config, progressMgr mgr.ProgressMgr) (*Manager, error) {
return &Manager{
cfg: cfg,
progressMgr: progressMgr,
}, nil
}
Expand All @@ -43,14 +45,16 @@ func (sm *Manager) Schedule(ctx context.Context, taskID, clientID, peerID string
if len(pieceAvailable) == 0 {
return nil, errors.Wrapf(errorType.ErrPeerWait, "taskID: %s", taskID)
}
logrus.Debugf("scheduler get available pieces %v for taskID(%s)", pieceAvailable, taskID)

// get runnning pieces
pieceRunning, err := sm.progressMgr.GetPieceProgressByCID(ctx, taskID, clientID, "running")
if err != nil {
return nil, err
}
logrus.Debugf("scheduler get running pieces %v for taskID(%s)", pieceRunning, taskID)
runningCount := len(pieceRunning)
if runningCount > config.PeerDownLimit {
if runningCount >= config.PeerDownLimit {
return nil, errors.Wrapf(errorType.PeerContinue, "taskID: %s,clientID: %s", taskID, clientID)
}

Expand All @@ -59,6 +63,7 @@ func (sm *Manager) Schedule(ctx context.Context, taskID, clientID, peerID string
if err != nil {
return nil, err
}
logrus.Debugf("scheduler get pieces %v with prioritize for taskID(%s)", pieceNums, taskID)

return sm.getPieceResults(ctx, taskID, peerID, pieceNums, runningCount)
}
Expand Down Expand Up @@ -127,15 +132,15 @@ func (sm *Manager) getPieceResults(ctx context.Context, taskID, peerID string, p
return nil, err
}
if srcPeerState.ClientErrorCount > config.FailCountLimit {
logrus.Warnf("peerID: %s got errors for %d times which reaches error limit: %d", peerID, srcPeerState.ClientErrorCount, config.FailCountLimit)
logrus.Warnf("peerID: %s got errors for %d times which reaches error limit: %d for taskID(%s)", peerID, srcPeerState.ClientErrorCount, config.FailCountLimit, taskID)
useSupernode = true
}

pieceResults := make([]*mgr.PieceResult, 0)
for i := 0; i < len(pieceNums); i++ {
var dstPID string
if useSupernode {
dstPID = getSupernodePID()
dstPID = sm.cfg.GetSuperPID()
} else {
// get peerIDs by pieceNum
peerIDs, err := sm.progressMgr.GetPeerIDsByPieceNum(ctx, taskID, pieceNums[i])
Expand Down Expand Up @@ -168,7 +173,7 @@ func (sm *Manager) getPieceResults(ctx context.Context, taskID, peerID string, p
func (sm *Manager) tryGetPID(ctx context.Context, taskID string, pieceNum int, peerIDs []string) (dstPID string) {
defer func() {
if dstPID == "" {
dstPID = getSupernodePID()
dstPID = sm.cfg.GetSuperPID()
}
}()

Expand Down Expand Up @@ -237,11 +242,6 @@ func getCenterNum(runningPieces []int) int {
return totalDistance / (len(runningPieces))
}

// TODO: return supernode peerID
func getSupernodePID() string {
return ""
}

func abs(i int) int {
if i < 0 {
return -i
Expand Down
6 changes: 5 additions & 1 deletion supernode/daemon/mgr/scheduler/manager_test.go
Expand Up @@ -23,6 +23,7 @@ import (
"testing"

cutil "github.com/dragonflyoss/Dragonfly/common/util"
"github.com/dragonflyoss/Dragonfly/supernode/config"
"github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/mock"

"github.com/go-check/check"
Expand All @@ -48,7 +49,10 @@ func (s *SchedulerMgrTestSuite) SetUpSuite(c *check.C) {
s.mockCtl = gomock.NewController(c)
s.mockProgressMgr = mock.NewMockProgressMgr(s.mockCtl)
s.mockProgressMgr.EXPECT().GetPeerIDsByPieceNum(gomock.Any(), gomock.Any(), gomock.Any()).Return([]string{"peerID"}, nil).AnyTimes()
s.manager, _ = NewManager(s.mockProgressMgr)

cfg := config.NewConfig()
cfg.SetSuperPID("fooPid")
s.manager, _ = NewManager(cfg, s.mockProgressMgr)
}

func (s *SchedulerMgrTestSuite) TearDownSuite(c *check.C) {
Expand Down
7 changes: 3 additions & 4 deletions supernode/daemon/mgr/scheduler_mgr.go
Expand Up @@ -6,10 +6,9 @@ import (

// PieceResult contains the information about which piece to download from which node.
type PieceResult struct {
TaskID string
PieceNum int
PieceSize int32
DstPID string
TaskID string
PieceNum int
DstPID string
}

// SchedulerMgr is responsible for calculating scheduling results according to certain rules.
Expand Down

0 comments on commit 950ee65

Please sign in to comment.