Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rescheduler parent/children later when no candidates and add schedule log #497

Merged
merged 12 commits into from
Jul 27, 2021
8 changes: 7 additions & 1 deletion client/config/peerhost_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package config
import (
"net"

"d7y.io/dragonfly/v2/pkg/basic/dfnet"
"golang.org/x/time/rate"

"d7y.io/dragonfly/v2/client/clientutil"
Expand All @@ -42,7 +43,12 @@ var peerHostConfig = DaemonOption{
GCInterval: clientutil.Duration{Duration: DefaultGCInterval},
KeepStorage: false,
Scheduler: SchedulerOption{
NetAddrs: nil,
NetAddrs: []dfnet.NetAddr{
{
Type: dfnet.TCP,
Addr: "127.0.0.1:8002",
},
},
ScheduleTimeout: clientutil.Duration{Duration: DefaultScheduleTimeout},
},
Host: HostOption{
Expand Down
1 change: 1 addition & 0 deletions client/daemon/peer/peertask_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ loop:
break loop
}

logger.Debugf("receive peerPacket %v for peer %s", peerPacket, pt.peerID)
if peerPacket.Code != dfcodes.Success {
pt.Errorf("receive peer packet with error: %d", peerPacket.Code)
if pt.isExitPeerPacketCode(peerPacket) {
Expand Down
2 changes: 1 addition & 1 deletion scheduler/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func NewDefaultSchedulerConfig() *SchedulerConfig {
Scheduler: "basic",
CDNLoad: 100,
ClientLoad: 10,
OpenMonitor: false,
OpenMonitor: true,
GC: NewDefaultGCConfig(),
}
}
Expand Down
6 changes: 6 additions & 0 deletions scheduler/core/evaluator/basic/basic_evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ func (eval *baseEvaluator) NeedAdjustParent(peer *types.Peer) bool {
logger.Debugf("peer %s need adjust parent because it current parent is bad", peer.PeerID)
return true
}

if peer.GetParent() != nil && peer.GetParent().IsLeave() {
logger.Debugf("peer %s need adjust parent because it current parent is status is leave", peer.PeerID)
return true
}

costHistory := peer.GetCostHistory()
if len(costHistory) < 4 {
return false
Expand Down
143 changes: 109 additions & 34 deletions scheduler/core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package core

import (
"context"
"time"

"d7y.io/dragonfly/v2/internal/dfcodes"
logger "d7y.io/dragonfly/v2/internal/dflog"
Expand All @@ -27,6 +28,7 @@ import (
"d7y.io/dragonfly/v2/scheduler/core/scheduler"
"d7y.io/dragonfly/v2/scheduler/daemon"
"d7y.io/dragonfly/v2/scheduler/types"
"k8s.io/client-go/util/workqueue"
)

type event interface {
Expand All @@ -35,39 +37,81 @@ type event interface {
}

type state struct {
sched scheduler.Scheduler
peerManager daemon.PeerMgr
cdnManager daemon.CDNMgr
sched scheduler.Scheduler
peerManager daemon.PeerMgr
cdnManager daemon.CDNMgr
waitScheduleParentPeerQueue workqueue.DelayingInterface
}

func newState(sched scheduler.Scheduler, peerManager daemon.PeerMgr, cdnManager daemon.CDNMgr) *state {
return &state{
sched: sched,
peerManager: peerManager,
cdnManager: cdnManager,
sched: sched,
peerManager: peerManager,
cdnManager: cdnManager,
waitScheduleParentPeerQueue: workqueue.NewNamedDelayingQueue("wait reSchedule parent"),
}
}

type peerScheduleParentEvent struct {
func (s *state) start() {
for {
v, shutdown := s.waitScheduleParentPeerQueue.Get()
if shutdown {
break
}
peer := v.(*types.Peer)
if peer.IsDone() || peer.IsLeave() {
logger.WithTaskAndPeerID(peer.Task.TaskID,
peer.PeerID).Debugf("waitScheduleParentPeerQueue: peer has left from waitScheduleParentPeerQueue because peer is done or leave, "+
"peer status is %s, "+
"isLeave %t", peer.GetStatus(), peer.IsLeave())
s.waitScheduleParentPeerQueue.Done(v)
continue
}
parent, candidates, hashParent := s.sched.ScheduleParent(peer)
if !hashParent && !peer.Host.CDN {
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Warnf("waitScheduleParentPeerQueue: there is no available parent,reschedule it in one second")
s.waitScheduleParentPeerQueue.Done(v)
s.waitScheduleParentPeerQueue.AddAfter(peer, time.Second)
continue
}
if peer.PacketChan == nil {
logger.Errorf("waitScheduleParentPeerQueue: there is no packet chan associated with peer %s", peer.PeerID)
return
}
peer.PacketChan <- constructSuccessPeerPacket(peer, parent, candidates)
logger.WithTaskAndPeerID(peer.Task.TaskID,
peer.PeerID).Debugf("waitScheduleParentPeerQueue: peer has left from waitScheduleParentPeerQueue because it has scheduled new parent %v", parent)
s.waitScheduleParentPeerQueue.Done(v)
}
}

func (s *state) stop() {
if !s.waitScheduleParentPeerQueue.ShuttingDown() {
s.waitScheduleParentPeerQueue.ShutDown()
}
}

type startReportPieceResultEvent struct {
peer *types.Peer
}

var _ event = peerScheduleParentEvent{}
var _ event = startReportPieceResultEvent{}

func (e peerScheduleParentEvent) apply(s *state) {
func (e startReportPieceResultEvent) apply(s *state) {
parent, candidates, hasParent := s.sched.ScheduleParent(e.peer)
if e.peer.PacketChan == nil {
logger.Errorf("report piece result: there is no packet chan associated with peer %s", e.peer.PeerID)
if !hasParent {
logger.WithTaskAndPeerID(e.peer.Task.TaskID, e.peer.PeerID).Warnf("peerScheduleParentEvent: there is no available parent,reschedule it in one second")
s.waitScheduleParentPeerQueue.AddAfter(e.peer, time.Second)
return
}
if !hasParent {
e.peer.PacketChan <- constructFailPeerPacket(e.peer, dfcodes.SchedWithoutParentPeer)
if e.peer.PacketChan == nil {
logger.Errorf("start report piece result: there is no packet chan associated with peer %s", e.peer.PeerID)
return
}
e.peer.PacketChan <- constructSuccessPeerPacket(e.peer, parent, candidates)
}

func (e peerScheduleParentEvent) hashKey() string {
func (e startReportPieceResultEvent) hashKey() string {
return e.peer.PeerID
}

Expand All @@ -83,16 +127,24 @@ func (e peerDownloadPieceSuccessEvent) apply(s *state) {
oldParent := e.peer.GetParent()
var candidates []*types.Peer
parentPeer, ok := s.peerManager.Get(e.pr.DstPid)
if !ok {
parentPeer, candidates, _ = s.sched.ScheduleParent(e.peer)
if !ok || parentPeer.IsLeave() {
var hasParent bool
parentPeer, candidates, hasParent = s.sched.ScheduleParent(e.peer)
if !hasParent {
logger.WithTaskAndPeerID(e.peer.Task.TaskID, e.peer.PeerID).Warnf("peerDownloadPieceSuccessEvent: there is no available parent,reschedule it in one second")
s.waitScheduleParentPeerQueue.AddAfter(e.peer, time.Second)
return
}
}
parentPeer.Touch()
if oldParent != nil {
candidates = append(candidates, oldParent)
}
if e.peer.PacketChan == nil {
logger.Errorf("peerDownloadPieceSuccessEvent: there is no packet chan with peer %s", e.peer.PeerID)
return
}
// TODO if parentPeer is equal with oldParent, need schedule again ?
e.peer.PacketChan <- constructSuccessPeerPacket(e.peer, parentPeer, candidates)
return
}
Expand Down Expand Up @@ -170,6 +222,7 @@ var _ event = peerDownloadSuccessEvent{}

func (e peerDownloadSuccessEvent) apply(s *state) {
e.peer.SetStatus(types.PeerStatusSuccess)
removePeerFromCurrentTree(e.peer, s)
children := s.sched.ScheduleChildren(e.peer)
for _, child := range children {
if child.PacketChan == nil {
Expand All @@ -193,17 +246,19 @@ var _ event = peerDownloadFailEvent{}

func (e peerDownloadFailEvent) apply(s *state) {
e.peer.SetStatus(types.PeerStatusFail)
removePeerFromCurrentTree(e.peer, s)
for _, child := range e.peer.GetChildren() {
parent, candidates, hasParent := s.sched.ScheduleParent(child)
if child.PacketChan == nil {
logger.Warnf("reportPeerDownloadResult: there is no packet chan associated with peer %s", e.peer.PeerID)
logger.Warnf("reportPeerFailResult: there is no packet chan associated with peer %s", e.peer.PeerID)
continue
}
if hasParent {
child.PacketChan <- constructSuccessPeerPacket(child, parent, candidates)
} else {
child.PacketChan <- constructFailPeerPacket(child, dfcodes.SchedWithoutParentPeer)
if !hasParent {
logger.WithTaskAndPeerID(child.Task.TaskID, child.PeerID).Warnf("peerDownloadFailEvent: there is no available parent,reschedule it in one second")
s.waitScheduleParentPeerQueue.AddAfter(e.peer, time.Second)
return
}
child.PacketChan <- constructSuccessPeerPacket(child, parent, candidates)
}
s.peerManager.Delete(e.peer.PeerID)
}
Expand Down Expand Up @@ -240,14 +295,16 @@ func constructSuccessPeerPacket(peer *types.Peer, parent *types.Peer, candidates
PeerId: candidate.PeerID,
})
}
return &schedulerRPC.PeerPacket{
peerPacket := &schedulerRPC.PeerPacket{
TaskId: peer.Task.TaskID,
SrcPid: peer.PeerID,
ParallelCount: 0,
ParallelCount: 1,
MainPeer: mainPeer,
StealPeers: stealPeers,
Code: dfcodes.Success,
}
logger.Debugf("send peerPacket %+v to peer %s", peerPacket, peer.PeerID)
return peerPacket
}

func constructFailPeerPacket(peer *types.Peer, errCode base.Code) *schedulerRPC.PeerPacket {
Expand All @@ -260,31 +317,33 @@ func constructFailPeerPacket(peer *types.Peer, errCode base.Code) *schedulerRPC.

func handlePeerLeave(peer *types.Peer, s *state) {
peer.MarkLeave()
peer.ReplaceParent(nil)
removePeerFromCurrentTree(peer, s)
for _, child := range peer.GetChildren() {
parent, candidates, hasParent := s.sched.ScheduleParent(child)
if !hasParent {
logger.WithTaskAndPeerID(child.Task.TaskID, child.PeerID).Warnf("handlePeerLeave: there is no available parent,reschedule it in one second")
s.waitScheduleParentPeerQueue.AddAfter(child, time.Second)
continue
}
if child.PacketChan == nil {
logger.Debugf("handlePeerLeave: there is no packet chan with peer %s", child.PeerID)
continue
}
if hasParent {
child.PacketChan <- constructSuccessPeerPacket(child, parent, candidates)
} else {
child.PacketChan <- constructFailPeerPacket(child, dfcodes.SchedWithoutParentPeer)
}
child.PacketChan <- constructSuccessPeerPacket(child, parent, candidates)
}
s.peerManager.Delete(peer.PeerID)
}

func handleReplaceParent(peer *types.Peer, s *state) {
parent, candidates, hasParent := s.sched.ScheduleParent(peer)
if peer.PacketChan == nil {
logger.Errorf("handleReplaceParent: there is no packet chan with peer %s", peer.PeerID)
return
}
if !hasParent {
logger.Errorf("handleReplaceParent: failed to schedule parent to peer %s", peer.PeerID)
peer.PacketChan <- constructFailPeerPacket(peer, dfcodes.SchedWithoutParentPeer)
//peer.PacketChan <- constructFailPeerPacket(peer, dfcodes.SchedWithoutParentPeer)
s.waitScheduleParentPeerQueue.AddAfter(peer, time.Second)
return
}
if peer.PacketChan == nil {
logger.Errorf("handleReplaceParent: there is no packet chan with peer %s", peer.PeerID)
return
}
peer.PacketChan <- constructSuccessPeerPacket(peer, parent, candidates)
Expand All @@ -303,3 +362,19 @@ func handleSeedTaskFail(task *types.Task) {
})
}
}

func removePeerFromCurrentTree(peer *types.Peer, s *state) {
parent := peer.GetParent()
peer.ReplaceParent(nil)
// parent frees up upload resources
if parent != nil {
children := s.sched.ScheduleChildren(parent)
for _, child := range children {
if child.PacketChan == nil {
logger.Debugf("removePeerFromCurrentTree: there is no packet chan with peer %s", peer.PeerID)
continue
}
child.PacketChan <- constructSuccessPeerPacket(child, peer, nil)
}
}
}
10 changes: 7 additions & 3 deletions scheduler/core/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,22 @@ func (m *monitor) printDebugInfo() string {

buffer := bytes.NewBuffer([]byte{})
table := tablewriter.NewWriter(buffer)
table.SetHeader([]string{"PeerID", "Finished Piece Num", "Finished", "Free Load"})
table.SetHeader([]string{"PeerID", "URL", "parent node", "status", "start time", "Finished Piece Num", "Finished", "Free Load"})

m.peerManager.ListPeers().Range(func(key interface{}, value interface{}) (ok bool) {
ok = true
peer := value.(*types.Peer)
if peer == nil {
return
}
parentNode := ""
if peer.GetParent() == nil {
roots = append(roots, peer)
} else {
parentNode = peer.GetParent().PeerID
}
table.Append([]string{peer.PeerID, strconv.Itoa(int(peer.GetFinishedNum())),

table.Append([]string{peer.PeerID, peer.Task.URL, parentNode, peer.GetStatus().String(), peer.CreateTime.String(), strconv.Itoa(int(peer.GetFinishedNum())),
strconv.FormatBool(peer.IsSuccess()), strconv.Itoa(peer.Host.GetFreeUploadLoad())})
return
})
Expand All @@ -96,7 +100,7 @@ func (m *monitor) printDebugInfo() string {
return
}
nPath := append(path, fmt.Sprintf("%s(%d)", node.PeerID, node.GetWholeTreeNode()))
if len(path) > 1 {
if len(path) >= 1 {
msgs = append(msgs, node.PeerID+" || "+strings.Join(nPath, "-"))
}
for _, child := range node.GetChildren() {
Expand Down
12 changes: 7 additions & 5 deletions scheduler/core/scheduler/basic/basic_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ func (s *Scheduler) ScheduleChildren(peer *types.Peer) (children []*types.Peer)
func (s *Scheduler) ScheduleParent(peer *types.Peer) (*types.Peer, []*types.Peer, bool) {
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debug("start scheduler parent flow")
if !s.evaluator.NeedAdjustParent(peer) {
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("peer does not need to replace the parent node, current parent is %v", peer.GetParent())
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("peer does not need to replace the parent node, peer is %v and current parent is %v",
peer, peer.GetParent())
if peer.GetParent() == nil {
return nil, nil, false
}
Expand Down Expand Up @@ -137,6 +138,7 @@ func (s *Scheduler) ScheduleParent(peer *types.Peer) (*types.Peer, []*types.Peer
}
return primary, candidateParents, true
}
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID)
return nil, nil, false
}

Expand All @@ -147,7 +149,7 @@ func (s *Scheduler) selectCandidateChildren(peer *types.Peer, limit int) (list [
return false
}
if candidateNode.IsDone() || candidateNode.IsLeave() || candidateNode == peer {
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("*candidate child peer %s is not selected because it is %v",
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("*candidate child peer %s is not selected because it is %+v",
candidateNode.PeerID, candidateNode)
return false
}
Expand All @@ -166,7 +168,7 @@ func (s *Scheduler) selectCandidateChildren(peer *types.Peer, limit int) (list [
peer.PeerID).Debugf("candidate child peer %s is selected because it has parent and parent status is not health", candidateNode.PeerID)
return true
}
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("=candidate child peer %s is not selected because it is %v",
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("=candidate child peer %s is not selected because it is %+v",
candidateNode.PeerID, candidateNode)
return false
})
Expand All @@ -180,11 +182,11 @@ func (s *Scheduler) selectCandidateParents(peer *types.Peer, limit int) (list []
}
if s.evaluator.IsBadNode(candidateNode) || candidateNode.IsLeave() || candidateNode == peer || candidateNode.Host.
GetFreeUploadLoad() <= 0 {
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("candidate parent peer %s is not selected because it is %v",
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("candidate parent peer %s is not selected because it is %+v",
candidateNode.PeerID, candidateNode)
return false
}
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("candidate parent peer %s is selected because it is %v",
logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("candidate parent peer %s is selected because it is %+v",
candidateNode.PeerID, candidateNode)
return true
})
Expand Down
Loading