Skip to content

Commit

Permalink
fix: when scheduler is not available, replace the scheduler client (#999
Browse files Browse the repository at this point in the history
)

* fix: when scheduler is not available, just replace the scheduler client in single peer task

Signed-off-by: Jim Ma <majinjing3@gmail.com>
  • Loading branch information
jim3ma committed Jan 17, 2022
1 parent 55350fe commit 97f94cf
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 25 deletions.
9 changes: 5 additions & 4 deletions client/daemon/peer/peertask_conductor.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ func (ptm *peerTaskManager) newPeerTaskConductor(
defer cancel()
regCtx, regSpan := tracer.Start(regCtx, config.SpanRegisterTask)
logger.Infof("step 1: peer %s start to register", request.PeerId)
result, err := ptm.schedulerClient.RegisterPeerTask(regCtx, request)
schedulerClient := ptm.schedulerClient
result, err := schedulerClient.RegisterPeerTask(regCtx, request)
regSpan.RecordError(err)
regSpan.End()

Expand All @@ -178,7 +179,7 @@ func (ptm *peerTaskManager) newPeerTaskConductor(
}
needBackSource = true
// can not detect source or scheduler error, create a new dummy scheduler client
ptm.schedulerClient = &dummySchedulerClient{}
schedulerClient = &dummySchedulerClient{}
result = &scheduler.RegisterResult{TaskId: idgen.TaskID(request.Url, request.UrlMeta)}
logger.Warnf("register peer task failed: %s, peer id: %s, try to back source", err, request.PeerId)
}
Expand Down Expand Up @@ -243,7 +244,7 @@ func (ptm *peerTaskManager) newPeerTaskConductor(
}
}

peerPacketStream, err := ptm.schedulerClient.ReportPieceResult(ctx, result.TaskId, request)
peerPacketStream, err := schedulerClient.ReportPieceResult(ctx, result.TaskId, request)
log.Infof("step 2: start report piece result")
if err != nil {
defer span.End()
Expand Down Expand Up @@ -280,7 +281,7 @@ func (ptm *peerTaskManager) newPeerTaskConductor(
pieceParallelCount: atomic.NewInt32(0),
totalPiece: -1,
schedulerOption: ptm.schedulerOption,
schedulerClient: ptm.schedulerClient,
schedulerClient: schedulerClient,
limiter: rate.NewLimiter(limit, int(limit)),
completedLength: atomic.NewInt64(0),
usedTraffic: atomic.NewUint64(0),
Expand Down
1 change: 1 addition & 0 deletions client/daemon/peer/peertask_dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"
)

// when scheduler is not available, use dummySchedulerClient to back source
type dummySchedulerClient struct {
}

Expand Down
53 changes: 32 additions & 21 deletions client/daemon/peer/peertask_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,8 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) {

for _, _tc := range testCases {
t.Run(_tc.name, func(t *testing.T) {
assert := testifyassert.New(t)
require := testifyrequire.New(t)
for _, typ := range taskTypes {
// dup a new test case with the task type
tc := _tc
Expand Down Expand Up @@ -492,20 +494,23 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) {
sourceClient = tc.mockHTTPSourceClient(ctrl, tc.taskData, tc.url)
}

mm := setupMockManager(
ctrl, &tc,
componentsOption{
taskID: taskID,
contentLength: int64(mockContentLength),
pieceSize: uint32(tc.pieceSize),
pieceParallelCount: tc.pieceParallelCount,
pieceDownloader: downloader,
sourceClient: sourceClient,
content: tc.taskData,
scope: tc.sizeScope,
peerPacketDelay: tc.peerPacketDelay,
backSource: tc.backSource,
})
option := componentsOption{
taskID: taskID,
contentLength: int64(mockContentLength),
pieceSize: uint32(tc.pieceSize),
pieceParallelCount: tc.pieceParallelCount,
pieceDownloader: downloader,
sourceClient: sourceClient,
content: tc.taskData,
scope: tc.sizeScope,
peerPacketDelay: tc.peerPacketDelay,
backSource: tc.backSource,
}
// keep peer task running in enough time to check "getOrCreatePeerTaskConductor" always return same
if tc.taskType == taskTypeConductor {
option.peerPacketDelay = []time.Duration{time.Second}
}
mm := setupMockManager(ctrl, &tc, option)
defer mm.CleanUp()

tc.run(assert, require, mm, urlMeta)
Expand Down Expand Up @@ -652,8 +657,8 @@ func (ts *testSpec) runConductorTest(assert *testifyassert.Assertions, require *
}

var (
runningTaskCount int
success bool
noRunningTask = true
success bool
)
select {
case <-ptc.successCh:
Expand All @@ -663,11 +668,17 @@ func (ts *testSpec) runConductorTest(assert *testifyassert.Assertions, require *
}
assert.True(success, "task should success")

ptm.runningPeerTasks.Range(func(key, value interface{}) bool {
runningTaskCount++
return true
})
assert.Equal(0, runningTaskCount, "no running tasks")
for i := 0; i < 3; i++ {
ptm.runningPeerTasks.Range(func(key, value interface{}) bool {
noRunningTask = false
return false
})
if noRunningTask {
break
}
time.Sleep(100 * time.Millisecond)
}
assert.True(noRunningTask, "no running tasks")

// test reuse stream task
rc, _, ok := ptm.tryReuseStreamPeerTask(context.Background(), request)
Expand Down

0 comments on commit 97f94cf

Please sign in to comment.