Skip to content

Commit

Permalink
Add upgrade sql file introduced in 2.1.4
Browse files Browse the repository at this point in the history
1. Add upgrade sql file introduced in 2.1.4
2. Minor improvement for task/execution to cover corner cases

Signed-off-by: Wenkai Yin <yinw@vmware.com>
  • Loading branch information
ywk253100 committed Mar 15, 2021
1 parent 85f9a49 commit 43df3bf
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 14 deletions.
9 changes: 9 additions & 0 deletions make/migrations/postgresql/0041_2.1.4_schema.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
When upgrading from 2.0 to 2.1, the status and revision of retention schedule execution isn't migrated, correct them here.
As v2.2.0 isn't usable because of several serious bugs, we won't support upgrade from 2.1.4 to 2.2.0 anymore. After we add the
sql file here, users will get error when upgrading from 2.1.4 to 2.2.0 because of this sql file doesn't exist on 2.2.0
*/
UPDATE execution
SET revision=0, status=task.status
FROM task
WHERE execution.id=task.execution_id AND execution.vendor_type='SCHEDULER' AND execution.revision IS NULL;
20 changes: 12 additions & 8 deletions src/pkg/task/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (e *executionManager) Stop(ctx context.Context, id int64) error {
}

// when an execution is in final status, if it contains task that is a periodic or retrying job it will
// run again in the near future, so we must operate the stop action
// run again in the near future, so we must operate the stop action no matter the status is final or not
tasks, err := e.taskDAO.List(ctx, &q.Query{
Keywords: map[string]interface{}{
"ExecutionID": id,
Expand All @@ -231,15 +231,23 @@ func (e *executionManager) Stop(ctx context.Context, id int64) error {
if err != nil {
return err
}
// contains no task and the status isn't final, update the status to stop directly
if len(tasks) == 0 && !job.Status(execution.Status).Final() {
if len(tasks) == 0 {
// in final status, return directly
if job.Status(execution.Status).Final() {
return nil
}
// isn't in final status, update directly.
// as this is used for the corner case(the case that the execution exists but all tasks are disappeared. In normal
// cases, if the execution contains no tasks, it is already set as "success" by the upper level caller directly),
// no need to handle concurrency
now := time.Now()
return e.executionDAO.Update(ctx, &dao.Execution{
ID: id,
Status: job.StoppedStatus.String(),
Revision: execution.Revision + 1,
UpdateTime: now,
EndTime: now,
}, "Status", "UpdateTime", "EndTime")
}, "Status", "Revision", "UpdateTime", "EndTime")
}

for _, task := range tasks {
Expand All @@ -248,10 +256,6 @@ func (e *executionManager) Stop(ctx context.Context, id int64) error {
continue
}
}

// refresh the status explicitly in case that the execution status
// isn't refreshed by task status change hook
_, _, err = e.executionDAO.RefreshStatus(ctx, id)
return err
}

Expand Down
21 changes: 16 additions & 5 deletions src/pkg/task/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,28 @@ func (e *executionManagerTestSuite) TestMarkError() {
}

func (e *executionManagerTestSuite) TestStop() {
// the execution contains no tasks and the status is final
e.execDAO.On("Get", mock.Anything, mock.Anything).Return(&dao.Execution{
ID: 1,
Status: job.SuccessStatus.String(),
}, nil)
e.taskDAO.On("List", mock.Anything, mock.Anything).Return(nil, nil)
err := e.execMgr.Stop(nil, 1)
e.Require().Nil(err)
e.taskDAO.AssertExpectations(e.T())
e.execDAO.AssertExpectations(e.T())

// reset the mocks
e.SetupTest()

// the execution contains no tasks and the status isn't final
e.execDAO.On("Get", mock.Anything, mock.Anything).Return(&dao.Execution{
ID: 1,
Status: job.RunningStatus.String(),
}, nil)
e.taskDAO.On("List", mock.Anything, mock.Anything).Return(nil, nil)
e.execDAO.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
err := e.execMgr.Stop(nil, 1)
e.execDAO.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
err = e.execMgr.Stop(nil, 1)
e.Require().Nil(err)
e.taskDAO.AssertExpectations(e.T())
e.execDAO.AssertExpectations(e.T())
Expand All @@ -122,7 +136,6 @@ func (e *executionManagerTestSuite) TestStop() {
},
}, nil)
e.taskMgr.On("Stop", mock.Anything, mock.Anything).Return(nil)
e.execDAO.On("RefreshStatus", mock.Anything, mock.Anything).Return(false, "", nil)
err = e.execMgr.Stop(nil, 1)
e.Require().Nil(err)
e.taskDAO.AssertExpectations(e.T())
Expand All @@ -143,7 +156,6 @@ func (e *executionManagerTestSuite) TestStopAndWait() {
},
}, nil)
e.taskMgr.On("Stop", mock.Anything, mock.Anything).Return(nil)
e.execDAO.On("RefreshStatus", mock.Anything, mock.Anything).Return(false, "", nil)
err := e.execMgr.StopAndWait(nil, 1, 1*time.Second)
e.Require().NotNil(err)
e.taskDAO.AssertExpectations(e.T())
Expand All @@ -165,7 +177,6 @@ func (e *executionManagerTestSuite) TestStopAndWait() {
},
}, nil)
e.taskMgr.On("Stop", mock.Anything, mock.Anything).Return(nil)
e.execDAO.On("RefreshStatus", mock.Anything, mock.Anything).Return(false, "", nil)
err = e.execMgr.StopAndWait(nil, 1, 1*time.Second)
e.Require().Nil(err)
e.taskDAO.AssertExpectations(e.T())
Expand Down
4 changes: 3 additions & 1 deletion src/pkg/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,9 @@ func (m *manager) Stop(ctx context.Context, id int64) error {
return err
}
log.Debugf("got job not found error for task %d, update it's status to stop directly", task.ID)
return nil
// as in this case no status hook will be sent, here refresh the execution status directly
_, _, err = m.execDAO.RefreshStatus(ctx, task.ExecutionID)
return err
}
return err
}
Expand Down
2 changes: 2 additions & 0 deletions src/pkg/task/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,12 @@ func (t *taskManagerTestSuite) TestStop() {
t.jsClient.On("PostAction", mock.Anything, mock.Anything).Return(cjob.ErrJobNotFound)
t.dao.On("Update", mock.Anything, mock.Anything,
mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
t.execDAO.On("RefreshStatus", mock.Anything, mock.Anything).Return(true, "", nil)
err := t.mgr.Stop(nil, 1)
t.Require().Nil(err)
t.dao.AssertExpectations(t.T())
t.jsClient.AssertExpectations(t.T())
t.execDAO.AssertExpectations(t.T())

// reset mock
t.SetupTest()
Expand Down

0 comments on commit 43df3bf

Please sign in to comment.