Skip to content

Commit

Permalink
fix: scheduler pick candidate and associate child encounter dead lock (
Browse files Browse the repository at this point in the history
…#500)

* fix dead lock

Signed-off-by: santong <weipeng.swp@alibaba-inc.com>
  • Loading branch information
244372610 committed Jul 27, 2021
1 parent 733962f commit fbbfcd3
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 55 deletions.
4 changes: 1 addition & 3 deletions scheduler/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,12 @@ func NewDefaultSchedulerConfig() *SchedulerConfig {
AScheduler: "",
BScheduler: "",
WorkerNum: runtime.GOMAXPROCS(0),
Monitor: false,
AccessWindow: 3 * time.Minute,
CandidateParentCount: 10,
Scheduler: "basic",
CDNLoad: 100,
ClientLoad: 10,
OpenMonitor: true,
OpenMonitor: false,
GC: NewDefaultGCConfig(),
}
}
Expand Down Expand Up @@ -186,7 +185,6 @@ type SchedulerConfig struct {
AScheduler string `yaml:"ascheduler" mapstructure:"ascheduler"`
BScheduler string `yaml:"bscheduler" mapstructure:"bscheduler"`
WorkerNum int `yaml:"workerNum" mapstructure:"workerNum"`
Monitor bool `yaml:"monitor" mapstructure:"monitor"`
// AccessWindow should less than CDN task expireTime
AccessWindow time.Duration `yaml:"accessWindow" mapstructure:"accessWindow"`
CandidateParentCount int `yaml:"candidateParentCount" mapstructure:"candidateParentCount"`
Expand Down
26 changes: 15 additions & 11 deletions scheduler/core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,19 +247,21 @@ var _ event = peerDownloadFailEvent{}
func (e peerDownloadFailEvent) apply(s *state) {
e.peer.SetStatus(types.PeerStatusFail)
removePeerFromCurrentTree(e.peer, s)
for _, child := range e.peer.GetChildren() {
e.peer.GetChildren().Range(func(key, value interface{}) bool {
child := (value).(*types.Peer)
parent, candidates, hasParent := s.sched.ScheduleParent(child)
if child.PacketChan == nil {
logger.Warnf("reportPeerFailResult: there is no packet chan associated with peer %s", e.peer.PeerID)
continue
}
if !hasParent {
logger.WithTaskAndPeerID(child.Task.TaskID, child.PeerID).Warnf("peerDownloadFailEvent: there is no available parent,reschedule it in one second")
s.waitScheduleParentPeerQueue.AddAfter(e.peer, time.Second)
return
return true
}
if child.PacketChan == nil {
logger.Warnf("reportPeerFailResult: there is no packet chan associated with peer %s", e.peer.PeerID)
return true
}
child.PacketChan <- constructSuccessPeerPacket(child, parent, candidates)
}
return true
})
s.peerManager.Delete(e.peer.PeerID)
}

Expand Down Expand Up @@ -318,19 +320,21 @@ func constructFailPeerPacket(peer *types.Peer, errCode base.Code) *schedulerRPC.
func handlePeerLeave(peer *types.Peer, s *state) {
peer.MarkLeave()
removePeerFromCurrentTree(peer, s)
for _, child := range peer.GetChildren() {
peer.GetChildren().Range(func(key, value interface{}) bool {
child := value.(*types.Peer)
parent, candidates, hasParent := s.sched.ScheduleParent(child)
if !hasParent {
logger.WithTaskAndPeerID(child.Task.TaskID, child.PeerID).Warnf("handlePeerLeave: there is no available parent,reschedule it in one second")
s.waitScheduleParentPeerQueue.AddAfter(child, time.Second)
continue
return true
}
if child.PacketChan == nil {
logger.Debugf("handlePeerLeave: there is no packet chan with peer %s", child.PeerID)
continue
return true
}
child.PacketChan <- constructSuccessPeerPacket(child, parent, candidates)
}
return true
})
s.peerManager.Delete(peer.PeerID)
}

Expand Down
8 changes: 5 additions & 3 deletions scheduler/core/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,14 @@ func (m *monitor) printDebugInfo() string {
if len(path) >= 1 {
msgs = append(msgs, node.PeerID+" || "+strings.Join(nPath, "-"))
}
for _, child := range node.GetChildren() {
node.GetChildren().Range(func(key, value interface{}) bool {
child := (value).(*types.Peer)
if child == nil {
continue
return true
}
printTree(child, nPath)
}
return true
})
}

for _, root := range roots {
Expand Down
60 changes: 22 additions & 38 deletions scheduler/types/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
"go.uber.org/atomic"
)

type PeerStatus uint8
Expand Down Expand Up @@ -63,13 +64,13 @@ type Peer struct {
// createTime
CreateTime time.Time
// finishedNum specifies downloaded finished piece number
finishedNum int32
finishedNum atomic.Int32
lastAccessTime time.Time
parent *Peer
children map[string]*Peer
children sync.Map
status PeerStatus
costHistory []int
leave bool
leave atomic.Bool
}

func NewPeer(peerID string, task *Task, host *PeerHost) *Peer {
Expand All @@ -79,7 +80,6 @@ func NewPeer(peerID string, task *Task, host *PeerHost) *Peer {
Host: host,
CreateTime: time.Now(),
lastAccessTime: time.Now(),
children: make(map[string]*Peer),
status: PeerStatusWaiting,
}
}
Expand All @@ -89,9 +89,11 @@ func (peer *Peer) GetWholeTreeNode() int {
peer.lock.RLock()
defer peer.lock.RUnlock()
count := 1
for _, peerNode := range peer.children {
peer.children.Range(func(key, value interface{}) bool {
peerNode := value.(*Peer)
count += peerNode.GetWholeTreeNode()
}
return true
})
return count
}

Expand All @@ -105,24 +107,20 @@ func (peer *Peer) Touch() {
peer.lock.Lock()
defer peer.lock.Unlock()
peer.lastAccessTime = time.Now()
if peer.status == PeerStatusZombie && !peer.leave {
if peer.status == PeerStatusZombie && !peer.leave.Load() {
peer.status = PeerStatusRunning
}
peer.Task.Touch()
}

func (peer *Peer) associateChild(child *Peer) {
peer.lock.Lock()
defer peer.lock.Unlock()
peer.children[child.PeerID] = child
peer.children.Store(child.PeerID, child)
peer.Host.IncUploadLoad()
peer.Task.peers.Update(peer)
}

func (peer *Peer) disassociateChild(child *Peer) {
peer.lock.Lock()
defer peer.lock.Unlock()
delete(peer.children, child.PeerID)
peer.children.Delete(child.PeerID)
peer.Host.DecUploadLoad()
peer.Task.peers.Update(peer)
}
Expand Down Expand Up @@ -162,8 +160,8 @@ func (peer *Peer) GetCost() int {
func (peer *Peer) AddPieceInfo(finishedCount int32, cost int) {
peer.lock.Lock()
defer peer.lock.Unlock()
if finishedCount > peer.finishedNum {
peer.finishedNum = finishedCount
if finishedCount > peer.finishedNum.Load() {
peer.finishedNum.Store(finishedCount)
peer.costHistory = append(peer.costHistory, cost)
if len(peer.costHistory) > 20 {
peer.costHistory = peer.costHistory[len(peer.costHistory)-20:]
Expand Down Expand Up @@ -221,11 +219,11 @@ func (peer *Peer) IsWaiting() bool {
if peer.parent == nil {
return false
}
return peer.finishedNum >= peer.parent.finishedNum
return peer.finishedNum.Load() >= peer.parent.finishedNum.Load()
}

func (peer *Peer) GetSortKeys() (key1, key2 int) {
key1 = int(peer.finishedNum)
key1 = int(peer.finishedNum.Load())
key2 = peer.getFreeLoad()
return
}
Expand All @@ -238,13 +236,11 @@ func (peer *Peer) getFreeLoad() int {
}

func (peer *Peer) GetFinishNum() int32 {
peer.lock.RLock()
defer peer.lock.RUnlock()
return peer.finishedNum
return peer.finishedNum.Load()
}

func GetDiffPieceNum(src *Peer, dst *Peer) int32 {
diff := src.finishedNum - dst.finishedNum
diff := src.finishedNum.Load() - dst.finishedNum.Load()
if diff > 0 {
return diff
}
Expand All @@ -257,10 +253,10 @@ func (peer *Peer) GetParent() *Peer {
return peer.parent
}

func (peer *Peer) GetChildren() map[string]*Peer {
func (peer *Peer) GetChildren() *sync.Map {
peer.lock.RLock()
defer peer.lock.RUnlock()
return peer.children
return &peer.children
}

func (peer *Peer) SetStatus(status PeerStatus) {
Expand All @@ -287,12 +283,6 @@ func (peer *Peer) IsSuccess() bool {
return peer.status == PeerStatusSuccess
}

func (peer *Peer) IncFinishNum() {
peer.lock.Lock()
defer peer.lock.Unlock()
peer.finishedNum++
}

func (peer *Peer) IsDone() bool {
return peer.status == PeerStatusSuccess || peer.status == PeerStatusFail
}
Expand All @@ -302,9 +292,7 @@ func (peer *Peer) IsBad() bool {
}

func (peer *Peer) GetFinishedNum() int32 {
peer.lock.RLock()
defer peer.lock.RUnlock()
return peer.finishedNum
return peer.finishedNum.Load()
}

func (peer *Peer) GetStatus() PeerStatus {
Expand All @@ -314,13 +302,9 @@ func (peer *Peer) GetStatus() PeerStatus {
}

func (peer *Peer) MarkLeave() {
peer.lock.Lock()
defer peer.lock.Unlock()
peer.leave = true
peer.leave.Store(true)
}

func (peer *Peer) IsLeave() bool {
peer.lock.RLock()
defer peer.lock.RUnlock()
return peer.leave
return peer.leave.Load()
}

0 comments on commit fbbfcd3

Please sign in to comment.