Skip to content

Commit

Permalink
Merge branch 'main' of github.com:dragonflyoss/Dragonfly2 into featur…
Browse files Browse the repository at this point in the history
…e/lock-task

Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi committed Jan 17, 2022
2 parents 3b91071 + 97f94cf commit 47fc8c6
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 32 deletions.
9 changes: 5 additions & 4 deletions client/daemon/peer/peertask_conductor.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ func (ptm *peerTaskManager) newPeerTaskConductor(
defer cancel()
regCtx, regSpan := tracer.Start(regCtx, config.SpanRegisterTask)
logger.Infof("step 1: peer %s start to register", request.PeerId)
result, err := ptm.schedulerClient.RegisterPeerTask(regCtx, request)
schedulerClient := ptm.schedulerClient
result, err := schedulerClient.RegisterPeerTask(regCtx, request)
regSpan.RecordError(err)
regSpan.End()

Expand All @@ -178,7 +179,7 @@ func (ptm *peerTaskManager) newPeerTaskConductor(
}
needBackSource = true
// can not detect source or scheduler error, create a new dummy scheduler client
ptm.schedulerClient = &dummySchedulerClient{}
schedulerClient = &dummySchedulerClient{}
result = &scheduler.RegisterResult{TaskId: idgen.TaskID(request.Url, request.UrlMeta)}
logger.Warnf("register peer task failed: %s, peer id: %s, try to back source", err, request.PeerId)
}
Expand Down Expand Up @@ -243,7 +244,7 @@ func (ptm *peerTaskManager) newPeerTaskConductor(
}
}

peerPacketStream, err := ptm.schedulerClient.ReportPieceResult(ctx, result.TaskId, request)
peerPacketStream, err := schedulerClient.ReportPieceResult(ctx, result.TaskId, request)
log.Infof("step 2: start report piece result")
if err != nil {
defer span.End()
Expand Down Expand Up @@ -280,7 +281,7 @@ func (ptm *peerTaskManager) newPeerTaskConductor(
pieceParallelCount: atomic.NewInt32(0),
totalPiece: -1,
schedulerOption: ptm.schedulerOption,
schedulerClient: ptm.schedulerClient,
schedulerClient: schedulerClient,
limiter: rate.NewLimiter(limit, int(limit)),
completedLength: atomic.NewInt64(0),
usedTraffic: atomic.NewUint64(0),
Expand Down
1 change: 1 addition & 0 deletions client/daemon/peer/peertask_dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"
)

// when scheduler is not available, use dummySchedulerClient to back source
type dummySchedulerClient struct {
}

Expand Down
53 changes: 32 additions & 21 deletions client/daemon/peer/peertask_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,8 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) {

for _, _tc := range testCases {
t.Run(_tc.name, func(t *testing.T) {
assert := testifyassert.New(t)
require := testifyrequire.New(t)
for _, typ := range taskTypes {
// dup a new test case with the task type
tc := _tc
Expand Down Expand Up @@ -492,20 +494,23 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) {
sourceClient = tc.mockHTTPSourceClient(ctrl, tc.taskData, tc.url)
}

mm := setupMockManager(
ctrl, &tc,
componentsOption{
taskID: taskID,
contentLength: int64(mockContentLength),
pieceSize: uint32(tc.pieceSize),
pieceParallelCount: tc.pieceParallelCount,
pieceDownloader: downloader,
sourceClient: sourceClient,
content: tc.taskData,
scope: tc.sizeScope,
peerPacketDelay: tc.peerPacketDelay,
backSource: tc.backSource,
})
option := componentsOption{
taskID: taskID,
contentLength: int64(mockContentLength),
pieceSize: uint32(tc.pieceSize),
pieceParallelCount: tc.pieceParallelCount,
pieceDownloader: downloader,
sourceClient: sourceClient,
content: tc.taskData,
scope: tc.sizeScope,
peerPacketDelay: tc.peerPacketDelay,
backSource: tc.backSource,
}
// keep peer task running in enough time to check "getOrCreatePeerTaskConductor" always return same
if tc.taskType == taskTypeConductor {
option.peerPacketDelay = []time.Duration{time.Second}
}
mm := setupMockManager(ctrl, &tc, option)
defer mm.CleanUp()

tc.run(assert, require, mm, urlMeta)
Expand Down Expand Up @@ -652,8 +657,8 @@ func (ts *testSpec) runConductorTest(assert *testifyassert.Assertions, require *
}

var (
runningTaskCount int
success bool
noRunningTask = true
success bool
)
select {
case <-ptc.successCh:
Expand All @@ -663,11 +668,17 @@ func (ts *testSpec) runConductorTest(assert *testifyassert.Assertions, require *
}
assert.True(success, "task should success")

ptm.runningPeerTasks.Range(func(key, value interface{}) bool {
runningTaskCount++
return true
})
assert.Equal(0, runningTaskCount, "no running tasks")
for i := 0; i < 3; i++ {
ptm.runningPeerTasks.Range(func(key, value interface{}) bool {
noRunningTask = false
return false
})
if noRunningTask {
break
}
time.Sleep(100 * time.Millisecond)
}
assert.True(noRunningTask, "no running tasks")

// test reuse stream task
rc, _, ok := ptm.tryReuseStreamPeerTask(context.Background(), request)
Expand Down
4 changes: 0 additions & 4 deletions scheduler/resource/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,6 @@ type Task struct {
// Peer sync map
Peers *sync.Map

// Task mutex
MU *sync.Mutex

// CreateAt is task create time
CreateAt *atomic.Time

Expand All @@ -119,7 +116,6 @@ func NewTask(id, url string, backToSourceLimit int, meta *base.UrlMeta) *Task {
Peers: &sync.Map{},
CreateAt: atomic.NewTime(time.Now()),
UpdateAt: atomic.NewTime(time.Now()),
MU: &sync.Mutex{},
Log: logger.WithTaskIDAndURL(id, url),
}

Expand Down
3 changes: 0 additions & 3 deletions scheduler/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,6 @@ func (s *service) CDN() resource.CDN {

func (s *service) RegisterTask(ctx context.Context, req *rpcscheduler.PeerTaskRequest) (*resource.Task, error) {
task := resource.NewTask(idgen.TaskID(req.Url, req.UrlMeta), req.Url, s.config.Scheduler.BackSourceCount, req.UrlMeta)

task.MU.Lock()
defer task.MU.Unlock()
task, ok := s.resource.TaskManager().LoadOrStore(task)
if ok && (task.FSM.Is(resource.TaskStateRunning) || task.FSM.Is(resource.TaskStateSucceeded)) {
// Task is healthy and can be reused
Expand Down

0 comments on commit 47fc8c6

Please sign in to comment.