Skip to content

Commit

Permalink
feat: scheduler handles seed peer failed (#1325)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi committed May 24, 2022
1 parent 3ea1175 commit 21ca12a
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 151 deletions.
6 changes: 6 additions & 0 deletions scheduler/resource/seed_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package resource

import (
"context"
"time"

"github.com/pkg/errors"

Expand All @@ -34,6 +35,11 @@ const (
SeedBizTag = "d7y/seed"
)

const (
// Default value of seed peer failed timeout.
SeedPeerFailedTimeout = 30 * time.Minute
)

type SeedPeer interface {
// TriggerTask triggers the seed peer to download the task.
TriggerTask(context.Context, *Task) (*Peer, *rpcscheduler.PeerResult, error)
Expand Down
2 changes: 1 addition & 1 deletion scheduler/resource/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (t *Task) LoadSeedPeer() (*Peer, bool) {
// IsSeedPeerFailed returns whether the seed peer in the task failed.
func (t *Task) IsSeedPeerFailed() bool {
seedPeer, ok := t.LoadSeedPeer()
return ok && seedPeer.FSM.Is(PeerStateFailed)
return ok && seedPeer.FSM.Is(PeerStateFailed) && time.Since(seedPeer.CreateAt.Load()) < SeedPeerFailedTimeout
}

// LoadPiece return piece for a key.
Expand Down
12 changes: 12 additions & 0 deletions scheduler/resource/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,18 @@ func TestTask_IsSeedPeerFailed(t *testing.T) {
task.StorePeer(mockSeedPeer)
mockSeedPeer.FSM.SetState(PeerStateSucceeded)

assert.False(task.IsSeedPeerFailed())
},
},
{
name: "seed peer failed timeout",
expect: func(t *testing.T, task *Task, mockPeer *Peer, mockSeedPeer *Peer) {
assert := assert.New(t)
task.StorePeer(mockPeer)
task.StorePeer(mockSeedPeer)
mockSeedPeer.CreateAt.Store(time.Now().Add(-SeedPeerFailedTimeout))
mockSeedPeer.FSM.SetState(PeerStateFailed)

assert.False(task.IsSeedPeerFailed())
},
},
Expand Down
9 changes: 4 additions & 5 deletions scheduler/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,20 +80,19 @@ func (s *scheduler) ScheduleParent(ctx context.Context, peer *resource.Peer, blo
default:
}

// If the scheduling exceeds the RetryBackSourceLimit or the latest seed peer state is PeerStateFailed,
// If the scheduling exceeds the RetryBackSourceLimit or peer needs back-to-source,
// peer will download the task back-to-source.
isSeedPeerFailed := peer.Task.IsSeedPeerFailed()
needBackToSource := peer.NeedBackToSource.Load()
if (n >= s.config.RetryBackSourceLimit || isSeedPeerFailed || needBackToSource) &&
if (n >= s.config.RetryBackSourceLimit || needBackToSource) &&
peer.Task.CanBackToSource() {
stream, ok := peer.LoadStream()
if !ok {
peer.Log.Error("load stream failed")
return
}

peer.Log.Infof("peer downloads back-to-source, scheduling %d times, seed peer is failed %t, peer need back-to-source %t",
n, isSeedPeerFailed, needBackToSource)
peer.Log.Infof("peer downloads back-to-source, scheduling %d times, peer need back-to-source %t",
n, needBackToSource)

// Notify peer back-to-source.
if err := stream.Send(&rpcscheduler.PeerPacket{Code: base.Code_SchedNeedBackSource}); err != nil {
Expand Down
153 changes: 8 additions & 145 deletions scheduler/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,62 +142,38 @@ func TestScheduler_ScheduleParent(t *testing.T) {
},
},
{
name: "seed peer state is PeerStateFailed and peer stream load failed",
name: "peer needs back-to-source and peer stream load failed",
mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
task := peer.Task
task.StorePeer(peer)
task.StorePeer(seedPeer)
peer.NeedBackToSource.Store(true)
peer.FSM.SetState(resource.PeerStateRunning)
seedPeer.FSM.SetState(resource.PeerStateFailed)
},
expect: func(t *testing.T, peer *resource.Peer) {
assert := assert.New(t)
assert.True(peer.FSM.Is(resource.PeerStateRunning))
},
},
{
name: "seed peer state is PeerStateFailed and send Code_SchedNeedBackSource code failed",
name: "peer needs back-to-source and send Code_SchedNeedBackSource code failed",
mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
task := peer.Task
task.StorePeer(peer)
task.StorePeer(seedPeer)
peer.NeedBackToSource.Store(true)
peer.FSM.SetState(resource.PeerStateRunning)
seedPeer.FSM.SetState(resource.PeerStateFailed)
peer.StoreParent(seedPeer)
peer.StoreStream(stream)

mr.Send(gomock.Eq(&rpcscheduler.PeerPacket{Code: base.Code_SchedNeedBackSource})).Return(errors.New("foo")).Times(1)
},
expect: func(t *testing.T, peer *resource.Peer) {
assert := assert.New(t)
_, ok := peer.LoadParent()
assert.True(ok)
assert.True(peer.FSM.Is(resource.PeerStateRunning))
},
},
{
name: "seed peer state is PeerStateFailed and send Code_SchedNeedBackSource code success",
mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
task := peer.Task
task.StorePeer(peer)
task.StorePeer(seedPeer)
seedPeer.FSM.SetState(resource.PeerStateFailed)
peer.FSM.SetState(resource.PeerStateRunning)
peer.StoreParent(seedPeer)
peer.StoreStream(stream)

mr.Send(gomock.Eq(&rpcscheduler.PeerPacket{Code: base.Code_SchedNeedBackSource})).Return(nil).Times(1)
},
expect: func(t *testing.T, peer *resource.Peer) {
assert := assert.New(t)
_, ok := peer.LoadParent()
assert.False(ok)
assert.True(peer.FSM.Is(resource.PeerStateBackToSource))
assert.True(peer.Task.FSM.Is(resource.TaskStatePending))
assert.True(peer.FSM.Is(resource.PeerStateRunning))
},
},
{
name: "peer need back-to-source and send Code_SchedNeedBackSource code success",
name: "peer needs back-to-source and send Code_SchedNeedBackSource code success",
mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
task := peer.Task
task.StorePeer(peer)
Expand All @@ -216,37 +192,13 @@ func TestScheduler_ScheduleParent(t *testing.T) {
},
},
{
name: "seed peer state is PeerStateFailed and task state is PeerStateFailed",
name: "peer needs back-to-source and task state is TaskStateFailed",
mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
task := peer.Task
task.StorePeer(peer)
task.StorePeer(seedPeer)
seedPeer.FSM.SetState(resource.PeerStateFailed)
peer.FSM.SetState(resource.PeerStateRunning)
task.FSM.SetState(resource.TaskStateFailed)
peer.StoreParent(seedPeer)
peer.StoreStream(stream)

mr.Send(gomock.Eq(&rpcscheduler.PeerPacket{Code: base.Code_SchedNeedBackSource})).Return(nil).Times(1)
},
expect: func(t *testing.T, peer *resource.Peer) {
assert := assert.New(t)
_, ok := peer.LoadParent()
assert.False(ok)
assert.True(peer.FSM.Is(resource.PeerStateBackToSource))
assert.True(peer.Task.FSM.Is(resource.TaskStateRunning))
},
},
{
name: "seed peer state is PeerStateFailed and task state is PeerStateFailed",
mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
task := peer.Task
task.StorePeer(peer)
task.StorePeer(seedPeer)
peer.NeedBackToSource.Store(true)
peer.FSM.SetState(resource.PeerStateRunning)
seedPeer.FSM.SetState(resource.PeerStateFailed)
task.FSM.SetState(resource.TaskStateFailed)
peer.StoreParent(seedPeer)
peer.StoreStream(stream)

mr.Send(gomock.Eq(&rpcscheduler.PeerPacket{Code: base.Code_SchedNeedBackSource})).Return(nil).Times(1)
Expand All @@ -272,95 +224,6 @@ func TestScheduler_ScheduleParent(t *testing.T) {
assert.True(peer.FSM.Is(resource.PeerStateRunning))
},
},
{
name: "seed peer state is PeerStateFailed and send Code_SchedNeedBackSource code failed",
mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
task := peer.Task
task.StorePeer(peer)
peer.FSM.SetState(resource.PeerStateRunning)
peer.StoreParent(seedPeer)
peer.StoreStream(stream)

gomock.InOrder(
md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, false).Times(1),
mr.Send(gomock.Eq(&rpcscheduler.PeerPacket{Code: base.Code_SchedNeedBackSource})).Return(errors.New("foo")).Times(1),
)
},
expect: func(t *testing.T, peer *resource.Peer) {
assert := assert.New(t)
_, ok := peer.LoadParent()
assert.True(ok)
assert.True(peer.FSM.Is(resource.PeerStateRunning))
},
},
{
name: "seed peer state is PeerStateFailed and send Code_SchedNeedBackSource code success",
mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
task := peer.Task
task.StorePeer(peer)
peer.FSM.SetState(resource.PeerStateRunning)
peer.StoreParent(seedPeer)
peer.StoreStream(stream)

gomock.InOrder(
md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, false).Times(1),
mr.Send(gomock.Eq(&rpcscheduler.PeerPacket{Code: base.Code_SchedNeedBackSource})).Return(nil).Times(1),
)
},
expect: func(t *testing.T, peer *resource.Peer) {
assert := assert.New(t)
_, ok := peer.LoadParent()
assert.False(ok)
assert.True(peer.FSM.Is(resource.PeerStateBackToSource))
assert.True(peer.Task.FSM.Is(resource.TaskStatePending))
},
},
{
name: "seed peer state is PeerStateFailed and task state is PeerStateFailed",
mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
task := peer.Task
task.StorePeer(peer)
peer.FSM.SetState(resource.PeerStateRunning)
task.FSM.SetState(resource.TaskStateFailed)
peer.StoreParent(seedPeer)
peer.StoreStream(stream)

gomock.InOrder(
md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, false).Times(1),
mr.Send(gomock.Eq(&rpcscheduler.PeerPacket{Code: base.Code_SchedNeedBackSource})).Return(nil).Times(1),
)
},
expect: func(t *testing.T, peer *resource.Peer) {
assert := assert.New(t)
_, ok := peer.LoadParent()
assert.False(ok)
assert.True(peer.FSM.Is(resource.PeerStateBackToSource))
assert.True(peer.Task.FSM.Is(resource.TaskStateRunning))
},
},
{
name: "seed peer state is PeerStateFailed and task state is PeerStateFailed",
mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
task := peer.Task
task.StorePeer(peer)
peer.FSM.SetState(resource.PeerStateRunning)
task.FSM.SetState(resource.TaskStateFailed)
peer.StoreParent(seedPeer)
peer.StoreStream(stream)

gomock.InOrder(
md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, false).Times(1),
mr.Send(gomock.Eq(&rpcscheduler.PeerPacket{Code: base.Code_SchedNeedBackSource})).Return(nil).Times(1),
)
},
expect: func(t *testing.T, peer *resource.Peer) {
assert := assert.New(t)
_, ok := peer.LoadParent()
assert.False(ok)
assert.True(peer.FSM.Is(resource.PeerStateBackToSource))
assert.True(peer.Task.FSM.Is(resource.TaskStateRunning))
},
},
{
name: "schedule exceeds RetryLimit and peer stream load failed",
mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, mr *rpcschedulermocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
Expand Down
4 changes: 4 additions & 0 deletions scheduler/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,10 @@ func (s *Service) registerTask(ctx context.Context, req *rpcscheduler.PeerTaskRe

// Start trigger seed peer task.
if s.config.SeedPeer.Enable {
if task.IsSeedPeerFailed() {
return task, true, nil
}

go s.triggerSeedPeerTask(ctx, task)
return task, false, nil
}
Expand Down

0 comments on commit 21ca12a

Please sign in to comment.