Skip to content

Commit

Permalink
feat: overwrite task url and url meta (#1740)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi committed Oct 10, 2022
1 parent e9aae40 commit acb903b
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 36 deletions.
22 changes: 16 additions & 6 deletions scheduler/service/service.go
Expand Up @@ -538,12 +538,22 @@ func (s *Service) LeaveTask(ctx context.Context, req *schedulerv1.PeerTarget) er

// registerTask creates a new task or reuses a previous task.
func (s *Service) registerTask(ctx context.Context, req *schedulerv1.PeerTaskRequest) (*resource.Task, bool, error) {
task := resource.NewTask(req.TaskId, req.Url, commonv1.TaskType_Normal, req.UrlMeta, resource.WithBackToSourceLimit(int32(s.config.Scheduler.BackSourceCount)))
task, loaded := s.resource.TaskManager().LoadOrStore(task)
if loaded && !task.FSM.Is(resource.TaskStateFailed) &&
!task.FSM.Is(resource.TaskStateLeave) && task.HasAvailablePeer() {
task.Log.Infof("task state is %s", task.FSM.Current())
return task, false, nil
task, loaded := s.resource.TaskManager().Load(req.TaskId)
if loaded {
// Task is the pointer, if the task already exists, the next request will
// update the task's Url and UrlMeta in task manager.
task.URL = req.Url
task.URLMeta = req.UrlMeta

if !task.FSM.Is(resource.TaskStateFailed) &&
!task.FSM.Is(resource.TaskStateLeave) && task.HasAvailablePeer() {
task.Log.Infof("task state is %s", task.FSM.Current())
return task, false, nil
}
} else {
// Create a task for the first time.
task = resource.NewTask(req.TaskId, req.Url, commonv1.TaskType_Normal, req.UrlMeta, resource.WithBackToSourceLimit(int32(s.config.Scheduler.BackSourceCount)))
s.resource.TaskManager().Store(task)
}

// Trigger task.
Expand Down
69 changes: 39 additions & 30 deletions scheduler/service/service_test.go
Expand Up @@ -154,7 +154,7 @@ func TestService_RegisterPeerTask(t *testing.T) {
mockPeer.Task.FSM.SetState(resource.TaskStateRunning)
gomock.InOrder(
mr.TaskManager().Return(taskManager).Times(1),
mt.LoadOrStore(gomock.Any()).Return(mockPeer.Task, false).Times(1),
mt.Load(gomock.Any()).Return(mockPeer.Task, true).Times(1),
)
},
expect: func(t *testing.T, peer *resource.Peer, result *schedulerv1.RegisterResult, err error) {
Expand Down Expand Up @@ -183,7 +183,7 @@ func TestService_RegisterPeerTask(t *testing.T) {
mockPeer.Task.StorePeer(mockSeedPeer)
gomock.InOrder(
mr.TaskManager().Return(taskManager).Times(1),
mt.LoadOrStore(gomock.Any()).Return(mockPeer.Task, true).Times(1),
mt.Load(gomock.Any()).Return(mockPeer.Task, true).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockPeer.Host.ID)).Return(mockPeer.Host, true).Times(1),
mr.PeerManager().Return(peerManager).Times(1),
Expand Down Expand Up @@ -217,7 +217,7 @@ func TestService_RegisterPeerTask(t *testing.T) {
mockPeer.FSM.SetState(resource.PeerStateFailed)
gomock.InOrder(
mr.TaskManager().Return(taskManager).Times(1),
mt.LoadOrStore(gomock.Any()).Return(mockPeer.Task, true).Times(1),
mt.Load(gomock.Any()).Return(mockPeer.Task, true).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockPeer.Host.ID)).Return(mockPeer.Host, true).Times(1),
mr.PeerManager().Return(peerManager).Times(1),
Expand Down Expand Up @@ -251,7 +251,7 @@ func TestService_RegisterPeerTask(t *testing.T) {
mockPeer.Task.ContentLength.Store(-1)
gomock.InOrder(
mr.TaskManager().Return(taskManager).Times(1),
mt.LoadOrStore(gomock.Any()).Return(mockPeer.Task, true).Times(1),
mt.Load(gomock.Any()).Return(mockPeer.Task, true).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockPeer.Host.ID)).Return(mockPeer.Host, true).Times(1),
mr.PeerManager().Return(peerManager).Times(1),
Expand Down Expand Up @@ -286,7 +286,7 @@ func TestService_RegisterPeerTask(t *testing.T) {
mockPeer.Task.ContentLength.Store(0)
gomock.InOrder(
mr.TaskManager().Return(taskManager).Times(1),
mt.LoadOrStore(gomock.Any()).Return(mockPeer.Task, true).Times(1),
mt.Load(gomock.Any()).Return(mockPeer.Task, true).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockPeer.Host.ID)).Return(mockPeer.Host, true).Times(1),
mr.PeerManager().Return(peerManager).Times(1),
Expand Down Expand Up @@ -325,7 +325,7 @@ func TestService_RegisterPeerTask(t *testing.T) {
mockPeer.Task.DirectPiece = []byte{1}
gomock.InOrder(
mr.TaskManager().Return(taskManager).Times(1),
mt.LoadOrStore(gomock.Any()).Return(mockPeer.Task, true).Times(1),
mt.Load(gomock.Any()).Return(mockPeer.Task, true).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockPeer.Host.ID)).Return(mockPeer.Host, true).Times(1),
mr.PeerManager().Return(peerManager).Times(1),
Expand Down Expand Up @@ -365,7 +365,7 @@ func TestService_RegisterPeerTask(t *testing.T) {
mockPeer.FSM.SetState(resource.PeerStateFailed)
gomock.InOrder(
mr.TaskManager().Return(taskManager).Times(1),
mt.LoadOrStore(gomock.Any()).Return(mockPeer.Task, true).Times(1),
mt.Load(gomock.Any()).Return(mockPeer.Task, true).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockPeer.Host.ID)).Return(mockPeer.Host, true).Times(1),
mr.PeerManager().Return(peerManager).Times(1),
Expand Down Expand Up @@ -401,7 +401,7 @@ func TestService_RegisterPeerTask(t *testing.T) {
mockPeer.FSM.SetState(resource.PeerStateFailed)
gomock.InOrder(
mr.TaskManager().Return(taskManager).Times(1),
mt.LoadOrStore(gomock.Any()).Return(mockPeer.Task, true).Times(1),
mt.Load(gomock.Any()).Return(mockPeer.Task, true).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockPeer.Host.ID)).Return(mockPeer.Host, true).Times(1),
mr.PeerManager().Return(peerManager).Times(1),
Expand Down Expand Up @@ -437,7 +437,7 @@ func TestService_RegisterPeerTask(t *testing.T) {
mockPeer.Task.TotalPieceCount.Store(1)
gomock.InOrder(
mr.TaskManager().Return(taskManager).Times(1),
mt.LoadOrStore(gomock.Any()).Return(mockPeer.Task, true).Times(1),
mt.Load(gomock.Any()).Return(mockPeer.Task, true).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockPeer.Host.ID)).Return(mockPeer.Host, true).Times(1),
mr.PeerManager().Return(peerManager).Times(1),
Expand Down Expand Up @@ -480,7 +480,7 @@ func TestService_RegisterPeerTask(t *testing.T) {

gomock.InOrder(
mr.TaskManager().Return(taskManager).Times(1),
mt.LoadOrStore(gomock.Any()).Return(mockPeer.Task, true).Times(1),
mt.Load(gomock.Any()).Return(mockPeer.Task, true).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockPeer.Host.ID)).Return(mockPeer.Host, true).Times(1),
mr.PeerManager().Return(peerManager).Times(1),
Expand Down Expand Up @@ -523,7 +523,7 @@ func TestService_RegisterPeerTask(t *testing.T) {

gomock.InOrder(
mr.TaskManager().Return(taskManager).Times(1),
mt.LoadOrStore(gomock.Any()).Return(mockPeer.Task, true).Times(1),
mt.Load(gomock.Any()).Return(mockPeer.Task, true).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockPeer.Host.ID)).Return(mockPeer.Host, true).Times(1),
mr.PeerManager().Return(peerManager).Times(1),
Expand Down Expand Up @@ -565,7 +565,7 @@ func TestService_RegisterPeerTask(t *testing.T) {

gomock.InOrder(
mr.TaskManager().Return(taskManager).Times(1),
mt.LoadOrStore(gomock.Any()).Return(mockPeer.Task, true).Times(1),
mt.Load(gomock.Any()).Return(mockPeer.Task, true).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockPeer.Host.ID)).Return(mockPeer.Host, true).Times(1),
mr.PeerManager().Return(peerManager).Times(1),
Expand Down Expand Up @@ -605,7 +605,7 @@ func TestService_RegisterPeerTask(t *testing.T) {

gomock.InOrder(
mr.TaskManager().Return(taskManager).Times(1),
mt.LoadOrStore(gomock.Any()).Return(mockPeer.Task, true).Times(1),
mt.Load(gomock.Any()).Return(mockPeer.Task, true).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockPeer.Host.ID)).Return(mockPeer.Host, true).Times(1),
mr.PeerManager().Return(peerManager).Times(1),
Expand Down Expand Up @@ -645,7 +645,7 @@ func TestService_RegisterPeerTask(t *testing.T) {

gomock.InOrder(
mr.TaskManager().Return(taskManager).Times(1),
mt.LoadOrStore(gomock.Any()).Return(mockPeer.Task, true).Times(1),
mt.Load(gomock.Any()).Return(mockPeer.Task, true).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockPeer.Host.ID)).Return(mockPeer.Host, true).Times(1),
mr.PeerManager().Return(peerManager).Times(1),
Expand Down Expand Up @@ -683,7 +683,7 @@ func TestService_RegisterPeerTask(t *testing.T) {
mockPeer.FSM.SetState(resource.PeerStateFailed)
gomock.InOrder(
mr.TaskManager().Return(taskManager).Times(1),
mt.LoadOrStore(gomock.Any()).Return(mockPeer.Task, true).Times(1),
mt.Load(gomock.Any()).Return(mockPeer.Task, true).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockPeer.Host.ID)).Return(mockPeer.Host, true).Times(1),
mr.PeerManager().Return(peerManager).Times(1),
Expand Down Expand Up @@ -718,7 +718,7 @@ func TestService_RegisterPeerTask(t *testing.T) {
mockPeer.Task.TotalPieceCount.Store(2)
gomock.InOrder(
mr.TaskManager().Return(taskManager).Times(1),
mt.LoadOrStore(gomock.Any()).Return(mockPeer.Task, true).Times(1),
mt.Load(gomock.Any()).Return(mockPeer.Task, true).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockPeer.Host.ID)).Return(mockPeer.Host, true).Times(1),
mr.PeerManager().Return(peerManager).Times(1),
Expand Down Expand Up @@ -1770,7 +1770,7 @@ func TestService_registerTask(t *testing.T) {
mockPeer.FSM.SetState(resource.PeerStateRunning)
gomock.InOrder(
mr.TaskManager().Return(taskManager).Times(1),
mt.LoadOrStore(gomock.Any()).Return(mockTask, true).Times(1),
mt.Load(gomock.Any()).Return(mockTask, true).Times(1),
)

task, needBackToSource, err := svc.registerTask(context.Background(), req)
Expand Down Expand Up @@ -1798,7 +1798,7 @@ func TestService_registerTask(t *testing.T) {
mockPeer.FSM.SetState(resource.PeerStateRunning)
gomock.InOrder(
mr.TaskManager().Return(taskManager).Times(1),
mt.LoadOrStore(gomock.Any()).Return(mockTask, true).Times(1),
mt.Load(gomock.Any()).Return(mockTask, true).Times(1),
)

task, needBackToSource, err := svc.registerTask(context.Background(), req)
Expand Down Expand Up @@ -1826,7 +1826,7 @@ func TestService_registerTask(t *testing.T) {
mockPeer.FSM.SetState(resource.PeerStateRunning)
gomock.InOrder(
mr.TaskManager().Return(taskManager).Times(1),
mt.LoadOrStore(gomock.Any()).Return(mockTask, true).Times(1),
mt.Load(gomock.Any()).Return(mockTask, true).Times(1),
)

task, needBackToSource, err := svc.registerTask(context.Background(), req)
Expand Down Expand Up @@ -1859,7 +1859,9 @@ func TestService_registerTask(t *testing.T) {
mockTask.FSM.SetState(resource.TaskStatePending)
gomock.InOrder(
mr.TaskManager().Return(taskManager).Times(1),
mt.LoadOrStore(gomock.Any()).Return(mockTask, false).Times(1),
mt.Load(gomock.Any()).Return(mockTask, false).Times(1),
mr.TaskManager().Return(taskManager).Times(1),
mt.Store(gomock.Any()).Return().Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Any()).Return(nil, false).Times(1),
mr.SeedPeer().Do(func() { wg.Done() }).Return(seedPeer).Times(1),
Expand All @@ -1870,7 +1872,8 @@ func TestService_registerTask(t *testing.T) {
assert := assert.New(t)
assert.NoError(err)
assert.False(needBackToSource)
assert.EqualValues(mockTask, task)
assert.EqualValues(mockTaskURL, task.URL)
assert.EqualValues(mockTaskURLMeta, task.URLMeta)
},
},
{
Expand All @@ -1896,7 +1899,7 @@ func TestService_registerTask(t *testing.T) {
mockTask.FSM.SetState(resource.TaskStateFailed)
gomock.InOrder(
mr.TaskManager().Return(taskManager).Times(1),
mt.LoadOrStore(gomock.Any()).Return(mockTask, true).Times(1),
mt.Load(gomock.Any()).Return(mockTask, true).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Any()).Return(nil, false),
mr.SeedPeer().Do(func() { wg.Done() }).Return(seedPeer).Times(1),
Expand Down Expand Up @@ -1933,7 +1936,7 @@ func TestService_registerTask(t *testing.T) {
mockTask.FSM.SetState(resource.TaskStateLeave)
gomock.InOrder(
mr.TaskManager().Return(taskManager).Times(1),
mt.LoadOrStore(gomock.Any()).Return(mockTask, true).Times(1),
mt.Load(gomock.Any()).Return(mockTask, true).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Any()).Return(nil, false),
mr.SeedPeer().Do(func() { wg.Done() }).Return(seedPeer).Times(1),
Expand Down Expand Up @@ -1967,7 +1970,7 @@ func TestService_registerTask(t *testing.T) {
mockTask.FSM.SetState(resource.TaskStateFailed)
gomock.InOrder(
mr.TaskManager().Return(taskManager).Times(1),
mt.LoadOrStore(gomock.Any()).Return(mockTask, true).Times(1),
mt.Load(gomock.Any()).Return(mockTask, true).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Any()).Return(mockHost, true).Times(1),
)
Expand Down Expand Up @@ -2002,7 +2005,9 @@ func TestService_registerTask(t *testing.T) {
mockTask.FSM.SetState(resource.TaskStatePending)
gomock.InOrder(
mr.TaskManager().Return(taskManager).Times(1),
mt.LoadOrStore(gomock.Any()).Return(mockTask, false).Times(1),
mt.Load(gomock.Any()).Return(mockTask, false).Times(1),
mr.TaskManager().Return(taskManager).Times(1),
mt.Store(gomock.Any()).Return().Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Any()).Return(nil, false).Times(1),
mr.SeedPeer().Do(func() { wg.Done() }).Return(seedPeer).Times(1),
Expand All @@ -2013,7 +2018,8 @@ func TestService_registerTask(t *testing.T) {
assert := assert.New(t)
assert.NoError(err)
assert.False(needBackToSource)
assert.EqualValues(mockTask, task)
assert.EqualValues(mockTaskURL, task.URL)
assert.EqualValues(mockTaskURLMeta, task.URLMeta)
},
},
{
Expand All @@ -2039,7 +2045,7 @@ func TestService_registerTask(t *testing.T) {
mockTask.FSM.SetState(resource.TaskStateFailed)
gomock.InOrder(
mr.TaskManager().Return(taskManager).Times(1),
mt.LoadOrStore(gomock.Any()).Return(mockTask, true).Times(1),
mt.Load(gomock.Any()).Return(mockTask, true).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Any()).Return(nil, false).Times(1),
mr.SeedPeer().Do(func() { wg.Done() }).Return(seedPeer).Times(1),
Expand Down Expand Up @@ -2072,7 +2078,9 @@ func TestService_registerTask(t *testing.T) {
mockTask.FSM.SetState(resource.TaskStatePending)
gomock.InOrder(
mr.TaskManager().Return(taskManager).Times(1),
mt.LoadOrStore(gomock.Any()).Return(mockTask, false).Times(1),
mt.Load(gomock.Any()).Return(mockTask, false).Times(1),
mr.TaskManager().Return(taskManager).Times(1),
mt.Store(gomock.Any()).Return().Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Any()).Return(nil, false).Times(1),
)
Expand All @@ -2081,7 +2089,8 @@ func TestService_registerTask(t *testing.T) {
assert := assert.New(t)
assert.NoError(err)
assert.True(needBackToSource)
assert.EqualValues(mockTask, task)
assert.EqualValues(mockTaskURL, task.URL)
assert.EqualValues(mockTaskURLMeta, task.URLMeta)
},
},
{
Expand All @@ -2103,7 +2112,7 @@ func TestService_registerTask(t *testing.T) {
mockTask.FSM.SetState(resource.TaskStateFailed)
gomock.InOrder(
mr.TaskManager().Return(taskManager).Times(1),
mt.LoadOrStore(gomock.Any()).Return(mockTask, true).Times(1),
mt.Load(gomock.Any()).Return(mockTask, true).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Any()).Return(nil, false).Times(1),
)
Expand Down

0 comments on commit acb903b

Please sign in to comment.