Skip to content

Commit

Permalink
Rescheduler parent/children later when no candidates and add schedule…
Browse files Browse the repository at this point in the history
… log (#497)

Signed-off-by: santong <weipeng.swp@alibaba-inc.com>
  • Loading branch information
244372610 committed Jul 27, 2021
1 parent 0e4f057 commit 733962f
Show file tree
Hide file tree
Showing 12 changed files with 151 additions and 49 deletions.
8 changes: 7 additions & 1 deletion client/config/peerhost_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package config
import (
"net"

"d7y.io/dragonfly/v2/pkg/basic/dfnet"
"golang.org/x/time/rate"

"d7y.io/dragonfly/v2/client/clientutil"
Expand All @@ -42,7 +43,12 @@ var peerHostConfig = DaemonOption{
GCInterval: clientutil.Duration{Duration: DefaultGCInterval},
KeepStorage: false,
Scheduler: SchedulerOption{
NetAddrs: nil,
NetAddrs: []dfnet.NetAddr{
{
Type: dfnet.TCP,
Addr: "127.0.0.1:8002",
},
},
ScheduleTimeout: clientutil.Duration{Duration: DefaultScheduleTimeout},
},
Host: HostOption{
Expand Down
1 change: 1 addition & 0 deletions client/daemon/peer/peertask_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ loop:
break loop
}

logger.Debugf("receive peerPacket %v for peer %s", peerPacket, pt.peerID)
if peerPacket.Code != dfcodes.Success {
pt.Errorf("receive peer packet with error: %d", peerPacket.Code)
if pt.isExitPeerPacketCode(peerPacket) {
Expand Down
2 changes: 1 addition & 1 deletion scheduler/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func NewDefaultSchedulerConfig() *SchedulerConfig {
Scheduler: "basic",
CDNLoad: 100,
ClientLoad: 10,
OpenMonitor: false,
OpenMonitor: true,
GC: NewDefaultGCConfig(),
}
}
Expand Down
6 changes: 6 additions & 0 deletions scheduler/core/evaluator/basic/basic_evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ func (eval *baseEvaluator) NeedAdjustParent(peer *types.Peer) bool {
logger.Debugf("peer %s need adjust parent because it current parent is bad", peer.PeerID)
return true
}

if peer.GetParent() != nil && peer.GetParent().IsLeave() {
logger.Debugf("peer %s need adjust parent because it current parent is status is leave", peer.PeerID)
return true
}

costHistory := peer.GetCostHistory()
if len(costHistory) < 4 {
return false
Expand Down
143 changes: 109 additions & 34 deletions scheduler/core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package core

import (
"context"
"time"

"d7y.io/dragonfly/v2/internal/dfcodes"
logger "d7y.io/dragonfly/v2/internal/dflog"
Expand All @@ -27,6 +28,7 @@ import (
"d7y.io/dragonfly/v2/scheduler/core/scheduler"
"d7y.io/dragonfly/v2/scheduler/daemon"
"d7y.io/dragonfly/v2/scheduler/types"
"k8s.io/client-go/util/workqueue"
)

type event interface {
Expand All @@ -35,39 +37,81 @@ type event interface {
}

type state struct {
sched scheduler.Scheduler
peerManager daemon.PeerMgr
cdnManager daemon.CDNMgr
sched scheduler.Scheduler
peerManager daemon.PeerMgr
cdnManager daemon.CDNMgr
waitScheduleParentPeerQueue workqueue.DelayingInterface
}

func newState(sched scheduler.Scheduler, peerManager daemon.PeerMgr, cdnManager daemon.CDNMgr) *state {
return &state{
sched: sched,
peerManager: peerManager,
cdnManager: cdnManager,
sched: sched,
peerManager: peerManager,
cdnManager: cdnManager,
waitScheduleParentPeerQueue: workqueue.NewNamedDelayingQueue("wait reSchedule parent"),
}
}

type peerScheduleParentEvent struct {
func (s *state) start() {
for {
v, shutdown := s.waitScheduleParentPeerQueue.Get()
if shutdown {
break
}
peer := v.(*types.Peer)
if peer.IsDone() || peer.IsLeave() {
logger.WithTaskAndPeerID(peer.Task.TaskID,
peer.PeerID).Debugf("waitScheduleParentPeerQueue: peer has left from waitScheduleParentPeerQueue because peer is done or leave, "+
"peer status is %s, "+
"isLeave %t", peer.GetStatus(), peer.IsLeave())
s.waitScheduleParentPeerQueue.Done(v)
continue
}
parent, candidates, hashParent := s.sched.ScheduleParent(peer)
if !hashParent && !peer.Host.CDN {
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Warnf("waitScheduleParentPeerQueue: there is no available parent,reschedule it in one second")
s.waitScheduleParentPeerQueue.Done(v)
s.waitScheduleParentPeerQueue.AddAfter(peer, time.Second)
continue
}
if peer.PacketChan == nil {
logger.Errorf("waitScheduleParentPeerQueue: there is no packet chan associated with peer %s", peer.PeerID)
return
}
peer.PacketChan <- constructSuccessPeerPacket(peer, parent, candidates)
logger.WithTaskAndPeerID(peer.Task.TaskID,
peer.PeerID).Debugf("waitScheduleParentPeerQueue: peer has left from waitScheduleParentPeerQueue because it has scheduled new parent %v", parent)
s.waitScheduleParentPeerQueue.Done(v)
}
}

func (s *state) stop() {
if !s.waitScheduleParentPeerQueue.ShuttingDown() {
s.waitScheduleParentPeerQueue.ShutDown()
}
}

type startReportPieceResultEvent struct {
peer *types.Peer
}

var _ event = peerScheduleParentEvent{}
var _ event = startReportPieceResultEvent{}

func (e peerScheduleParentEvent) apply(s *state) {
func (e startReportPieceResultEvent) apply(s *state) {
parent, candidates, hasParent := s.sched.ScheduleParent(e.peer)
if e.peer.PacketChan == nil {
logger.Errorf("report piece result: there is no packet chan associated with peer %s", e.peer.PeerID)
if !hasParent {
logger.WithTaskAndPeerID(e.peer.Task.TaskID, e.peer.PeerID).Warnf("peerScheduleParentEvent: there is no available parent,reschedule it in one second")
s.waitScheduleParentPeerQueue.AddAfter(e.peer, time.Second)
return
}
if !hasParent {
e.peer.PacketChan <- constructFailPeerPacket(e.peer, dfcodes.SchedWithoutParentPeer)
if e.peer.PacketChan == nil {
logger.Errorf("start report piece result: there is no packet chan associated with peer %s", e.peer.PeerID)
return
}
e.peer.PacketChan <- constructSuccessPeerPacket(e.peer, parent, candidates)
}

func (e peerScheduleParentEvent) hashKey() string {
func (e startReportPieceResultEvent) hashKey() string {
return e.peer.PeerID
}

Expand All @@ -83,16 +127,24 @@ func (e peerDownloadPieceSuccessEvent) apply(s *state) {
oldParent := e.peer.GetParent()
var candidates []*types.Peer
parentPeer, ok := s.peerManager.Get(e.pr.DstPid)
if !ok {
parentPeer, candidates, _ = s.sched.ScheduleParent(e.peer)
if !ok || parentPeer.IsLeave() {
var hasParent bool
parentPeer, candidates, hasParent = s.sched.ScheduleParent(e.peer)
if !hasParent {
logger.WithTaskAndPeerID(e.peer.Task.TaskID, e.peer.PeerID).Warnf("peerDownloadPieceSuccessEvent: there is no available parent,reschedule it in one second")
s.waitScheduleParentPeerQueue.AddAfter(e.peer, time.Second)
return
}
}
parentPeer.Touch()
if oldParent != nil {
candidates = append(candidates, oldParent)
}
if e.peer.PacketChan == nil {
logger.Errorf("peerDownloadPieceSuccessEvent: there is no packet chan with peer %s", e.peer.PeerID)
return
}
// TODO if parentPeer is equal with oldParent, need schedule again ?
e.peer.PacketChan <- constructSuccessPeerPacket(e.peer, parentPeer, candidates)
return
}
Expand Down Expand Up @@ -170,6 +222,7 @@ var _ event = peerDownloadSuccessEvent{}

func (e peerDownloadSuccessEvent) apply(s *state) {
e.peer.SetStatus(types.PeerStatusSuccess)
removePeerFromCurrentTree(e.peer, s)
children := s.sched.ScheduleChildren(e.peer)
for _, child := range children {
if child.PacketChan == nil {
Expand All @@ -193,17 +246,19 @@ var _ event = peerDownloadFailEvent{}

func (e peerDownloadFailEvent) apply(s *state) {
e.peer.SetStatus(types.PeerStatusFail)
removePeerFromCurrentTree(e.peer, s)
for _, child := range e.peer.GetChildren() {
parent, candidates, hasParent := s.sched.ScheduleParent(child)
if child.PacketChan == nil {
logger.Warnf("reportPeerDownloadResult: there is no packet chan associated with peer %s", e.peer.PeerID)
logger.Warnf("reportPeerFailResult: there is no packet chan associated with peer %s", e.peer.PeerID)
continue
}
if hasParent {
child.PacketChan <- constructSuccessPeerPacket(child, parent, candidates)
} else {
child.PacketChan <- constructFailPeerPacket(child, dfcodes.SchedWithoutParentPeer)
if !hasParent {
logger.WithTaskAndPeerID(child.Task.TaskID, child.PeerID).Warnf("peerDownloadFailEvent: there is no available parent,reschedule it in one second")
s.waitScheduleParentPeerQueue.AddAfter(e.peer, time.Second)
return
}
child.PacketChan <- constructSuccessPeerPacket(child, parent, candidates)
}
s.peerManager.Delete(e.peer.PeerID)
}
Expand Down Expand Up @@ -240,14 +295,16 @@ func constructSuccessPeerPacket(peer *types.Peer, parent *types.Peer, candidates
PeerId: candidate.PeerID,
})
}
return &schedulerRPC.PeerPacket{
peerPacket := &schedulerRPC.PeerPacket{
TaskId: peer.Task.TaskID,
SrcPid: peer.PeerID,
ParallelCount: 0,
ParallelCount: 1,
MainPeer: mainPeer,
StealPeers: stealPeers,
Code: dfcodes.Success,
}
logger.Debugf("send peerPacket %+v to peer %s", peerPacket, peer.PeerID)
return peerPacket
}

func constructFailPeerPacket(peer *types.Peer, errCode base.Code) *schedulerRPC.PeerPacket {
Expand All @@ -260,31 +317,33 @@ func constructFailPeerPacket(peer *types.Peer, errCode base.Code) *schedulerRPC.

func handlePeerLeave(peer *types.Peer, s *state) {
peer.MarkLeave()
peer.ReplaceParent(nil)
removePeerFromCurrentTree(peer, s)
for _, child := range peer.GetChildren() {
parent, candidates, hasParent := s.sched.ScheduleParent(child)
if !hasParent {
logger.WithTaskAndPeerID(child.Task.TaskID, child.PeerID).Warnf("handlePeerLeave: there is no available parent,reschedule it in one second")
s.waitScheduleParentPeerQueue.AddAfter(child, time.Second)
continue
}
if child.PacketChan == nil {
logger.Debugf("handlePeerLeave: there is no packet chan with peer %s", child.PeerID)
continue
}
if hasParent {
child.PacketChan <- constructSuccessPeerPacket(child, parent, candidates)
} else {
child.PacketChan <- constructFailPeerPacket(child, dfcodes.SchedWithoutParentPeer)
}
child.PacketChan <- constructSuccessPeerPacket(child, parent, candidates)
}
s.peerManager.Delete(peer.PeerID)
}

func handleReplaceParent(peer *types.Peer, s *state) {
parent, candidates, hasParent := s.sched.ScheduleParent(peer)
if peer.PacketChan == nil {
logger.Errorf("handleReplaceParent: there is no packet chan with peer %s", peer.PeerID)
return
}
if !hasParent {
logger.Errorf("handleReplaceParent: failed to schedule parent to peer %s", peer.PeerID)
peer.PacketChan <- constructFailPeerPacket(peer, dfcodes.SchedWithoutParentPeer)
//peer.PacketChan <- constructFailPeerPacket(peer, dfcodes.SchedWithoutParentPeer)
s.waitScheduleParentPeerQueue.AddAfter(peer, time.Second)
return
}
if peer.PacketChan == nil {
logger.Errorf("handleReplaceParent: there is no packet chan with peer %s", peer.PeerID)
return
}
peer.PacketChan <- constructSuccessPeerPacket(peer, parent, candidates)
Expand All @@ -303,3 +362,19 @@ func handleSeedTaskFail(task *types.Task) {
})
}
}

func removePeerFromCurrentTree(peer *types.Peer, s *state) {
parent := peer.GetParent()
peer.ReplaceParent(nil)
// parent frees up upload resources
if parent != nil {
children := s.sched.ScheduleChildren(parent)
for _, child := range children {
if child.PacketChan == nil {
logger.Debugf("removePeerFromCurrentTree: there is no packet chan with peer %s", peer.PeerID)
continue
}
child.PacketChan <- constructSuccessPeerPacket(child, peer, nil)
}
}
}
10 changes: 7 additions & 3 deletions scheduler/core/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,22 @@ func (m *monitor) printDebugInfo() string {

buffer := bytes.NewBuffer([]byte{})
table := tablewriter.NewWriter(buffer)
table.SetHeader([]string{"PeerID", "Finished Piece Num", "Finished", "Free Load"})
table.SetHeader([]string{"PeerID", "URL", "parent node", "status", "start time", "Finished Piece Num", "Finished", "Free Load"})

m.peerManager.ListPeers().Range(func(key interface{}, value interface{}) (ok bool) {
ok = true
peer := value.(*types.Peer)
if peer == nil {
return
}
parentNode := ""
if peer.GetParent() == nil {
roots = append(roots, peer)
} else {
parentNode = peer.GetParent().PeerID
}
table.Append([]string{peer.PeerID, strconv.Itoa(int(peer.GetFinishedNum())),

table.Append([]string{peer.PeerID, peer.Task.URL, parentNode, peer.GetStatus().String(), peer.CreateTime.String(), strconv.Itoa(int(peer.GetFinishedNum())),
strconv.FormatBool(peer.IsSuccess()), strconv.Itoa(peer.Host.GetFreeUploadLoad())})
return
})
Expand All @@ -96,7 +100,7 @@ func (m *monitor) printDebugInfo() string {
return
}
nPath := append(path, fmt.Sprintf("%s(%d)", node.PeerID, node.GetWholeTreeNode()))
if len(path) > 1 {
if len(path) >= 1 {
msgs = append(msgs, node.PeerID+" || "+strings.Join(nPath, "-"))
}
for _, child := range node.GetChildren() {
Expand Down
12 changes: 7 additions & 5 deletions scheduler/core/scheduler/basic/basic_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ func (s *Scheduler) ScheduleChildren(peer *types.Peer) (children []*types.Peer)
func (s *Scheduler) ScheduleParent(peer *types.Peer) (*types.Peer, []*types.Peer, bool) {
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debug("start scheduler parent flow")
if !s.evaluator.NeedAdjustParent(peer) {
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("peer does not need to replace the parent node, current parent is %v", peer.GetParent())
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("peer does not need to replace the parent node, peer is %v and current parent is %v",
peer, peer.GetParent())
if peer.GetParent() == nil {
return nil, nil, false
}
Expand Down Expand Up @@ -137,6 +138,7 @@ func (s *Scheduler) ScheduleParent(peer *types.Peer) (*types.Peer, []*types.Peer
}
return primary, candidateParents, true
}
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID)
return nil, nil, false
}

Expand All @@ -147,7 +149,7 @@ func (s *Scheduler) selectCandidateChildren(peer *types.Peer, limit int) (list [
return false
}
if candidateNode.IsDone() || candidateNode.IsLeave() || candidateNode == peer {
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("*candidate child peer %s is not selected because it is %v",
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("*candidate child peer %s is not selected because it is %+v",
candidateNode.PeerID, candidateNode)
return false
}
Expand All @@ -166,7 +168,7 @@ func (s *Scheduler) selectCandidateChildren(peer *types.Peer, limit int) (list [
peer.PeerID).Debugf("candidate child peer %s is selected because it has parent and parent status is not health", candidateNode.PeerID)
return true
}
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("=candidate child peer %s is not selected because it is %v",
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("=candidate child peer %s is not selected because it is %+v",
candidateNode.PeerID, candidateNode)
return false
})
Expand All @@ -180,11 +182,11 @@ func (s *Scheduler) selectCandidateParents(peer *types.Peer, limit int) (list []
}
if s.evaluator.IsBadNode(candidateNode) || candidateNode.IsLeave() || candidateNode == peer || candidateNode.Host.
GetFreeUploadLoad() <= 0 {
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("candidate parent peer %s is not selected because it is %v",
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("candidate parent peer %s is not selected because it is %+v",
candidateNode.PeerID, candidateNode)
return false
}
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("candidate parent peer %s is selected because it is %v",
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("candidate parent peer %s is selected because it is %+v",
candidateNode.PeerID, candidateNode)
return true
})
Expand Down

0 comments on commit 733962f

Please sign in to comment.