From 43df3bf8a41f4db090f0ab077060eff313628434 Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Fri, 12 Mar 2021 19:29:25 +0800 Subject: [PATCH] Add upgrade sql file introduced in 2.1.4 1. Add upgrade sql file introduced in 2.1.4 2. Minor improvement for task/execution to cover corner cases Signed-off-by: Wenkai Yin --- .../postgresql/0041_2.1.4_schema.up.sql | 9 ++++++++ src/pkg/task/execution.go | 20 +++++++++++------- src/pkg/task/execution_test.go | 21 ++++++++++++++----- src/pkg/task/task.go | 4 +++- src/pkg/task/task_test.go | 2 ++ 5 files changed, 42 insertions(+), 14 deletions(-) create mode 100644 make/migrations/postgresql/0041_2.1.4_schema.up.sql diff --git a/make/migrations/postgresql/0041_2.1.4_schema.up.sql b/make/migrations/postgresql/0041_2.1.4_schema.up.sql new file mode 100644 index 00000000000..2c671e17c4e --- /dev/null +++ b/make/migrations/postgresql/0041_2.1.4_schema.up.sql @@ -0,0 +1,9 @@ +/* +When upgrading from 2.0 to 2.1, the status and revision of retention schedule execution isn't migrated, correct them here. +As v2.2.0 isn't usable because of several serious bugs, we won't support upgrade from 2.1.4 to 2.2.0 anymore. After we add the +sql file here, users will get error when upgrading from 2.1.4 to 2.2.0 because of this sql file doesn't exist on 2.2.0 +*/ +UPDATE execution +SET revision=0, status=task.status +FROM task +WHERE execution.id=task.execution_id AND execution.vendor_type='SCHEDULER' AND execution.revision IS NULL; \ No newline at end of file diff --git a/src/pkg/task/execution.go b/src/pkg/task/execution.go index 11f236fa5e5..24a6a7df77a 100644 --- a/src/pkg/task/execution.go +++ b/src/pkg/task/execution.go @@ -222,7 +222,7 @@ func (e *executionManager) Stop(ctx context.Context, id int64) error { } // when an execution is in final status, if it contains task that is a periodic or retrying job it will - // run again in the near future, so we must operate the stop action + // run again in the near future, so we must operate the stop action no matter the status is final or not tasks, err := e.taskDAO.List(ctx, &q.Query{ Keywords: map[string]interface{}{ "ExecutionID": id, @@ -231,15 +231,23 @@ func (e *executionManager) Stop(ctx context.Context, id int64) error { if err != nil { return err } - // contains no task and the status isn't final, update the status to stop directly - if len(tasks) == 0 && !job.Status(execution.Status).Final() { + if len(tasks) == 0 { + // in final status, return directly + if job.Status(execution.Status).Final() { + return nil + } + // isn't in final status, update directly. + // as this is used for the corner case(the case that the execution exists but all tasks are disappeared. In normal + // cases, if the execution contains no tasks, it is already set as "success" by the upper level caller directly), + // no need to handle concurrency now := time.Now() return e.executionDAO.Update(ctx, &dao.Execution{ ID: id, Status: job.StoppedStatus.String(), + Revision: execution.Revision + 1, UpdateTime: now, EndTime: now, - }, "Status", "UpdateTime", "EndTime") + }, "Status", "Revision", "UpdateTime", "EndTime") } for _, task := range tasks { @@ -248,10 +256,6 @@ func (e *executionManager) Stop(ctx context.Context, id int64) error { continue } } - - // refresh the status explicitly in case that the execution status - // isn't refreshed by task status change hook - _, _, err = e.executionDAO.RefreshStatus(ctx, id) return err } diff --git a/src/pkg/task/execution_test.go b/src/pkg/task/execution_test.go index a3b47b9acd3..7c0b596a261 100644 --- a/src/pkg/task/execution_test.go +++ b/src/pkg/task/execution_test.go @@ -95,14 +95,28 @@ func (e *executionManagerTestSuite) TestMarkError() { } func (e *executionManagerTestSuite) TestStop() { + // the execution contains no tasks and the status is final + e.execDAO.On("Get", mock.Anything, mock.Anything).Return(&dao.Execution{ + ID: 1, + Status: job.SuccessStatus.String(), + }, nil) + e.taskDAO.On("List", mock.Anything, mock.Anything).Return(nil, nil) + err := e.execMgr.Stop(nil, 1) + e.Require().Nil(err) + e.taskDAO.AssertExpectations(e.T()) + e.execDAO.AssertExpectations(e.T()) + + // reset the mocks + e.SetupTest() + // the execution contains no tasks and the status isn't final e.execDAO.On("Get", mock.Anything, mock.Anything).Return(&dao.Execution{ ID: 1, Status: job.RunningStatus.String(), }, nil) e.taskDAO.On("List", mock.Anything, mock.Anything).Return(nil, nil) - e.execDAO.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) - err := e.execMgr.Stop(nil, 1) + e.execDAO.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + err = e.execMgr.Stop(nil, 1) e.Require().Nil(err) e.taskDAO.AssertExpectations(e.T()) e.execDAO.AssertExpectations(e.T()) @@ -122,7 +136,6 @@ func (e *executionManagerTestSuite) TestStop() { }, }, nil) e.taskMgr.On("Stop", mock.Anything, mock.Anything).Return(nil) - e.execDAO.On("RefreshStatus", mock.Anything, mock.Anything).Return(false, "", nil) err = e.execMgr.Stop(nil, 1) e.Require().Nil(err) e.taskDAO.AssertExpectations(e.T()) @@ -143,7 +156,6 @@ func (e *executionManagerTestSuite) TestStopAndWait() { }, }, nil) e.taskMgr.On("Stop", mock.Anything, mock.Anything).Return(nil) - e.execDAO.On("RefreshStatus", mock.Anything, mock.Anything).Return(false, "", nil) err := e.execMgr.StopAndWait(nil, 1, 1*time.Second) e.Require().NotNil(err) e.taskDAO.AssertExpectations(e.T()) @@ -165,7 +177,6 @@ func (e *executionManagerTestSuite) TestStopAndWait() { }, }, nil) e.taskMgr.On("Stop", mock.Anything, mock.Anything).Return(nil) - e.execDAO.On("RefreshStatus", mock.Anything, mock.Anything).Return(false, "", nil) err = e.execMgr.StopAndWait(nil, 1, 1*time.Second) e.Require().Nil(err) e.taskDAO.AssertExpectations(e.T()) diff --git a/src/pkg/task/task.go b/src/pkg/task/task.go index bca88eb938f..945f734da91 100644 --- a/src/pkg/task/task.go +++ b/src/pkg/task/task.go @@ -184,7 +184,9 @@ func (m *manager) Stop(ctx context.Context, id int64) error { return err } log.Debugf("got job not found error for task %d, update it's status to stop directly", task.ID) - return nil + // as in this case no status hook will be sent, here refresh the execution status directly + _, _, err = m.execDAO.RefreshStatus(ctx, task.ExecutionID) + return err } return err } diff --git a/src/pkg/task/task_test.go b/src/pkg/task/task_test.go index 0d52f09ef2b..0f22aed64a7 100644 --- a/src/pkg/task/task_test.go +++ b/src/pkg/task/task_test.go @@ -94,10 +94,12 @@ func (t *taskManagerTestSuite) TestStop() { t.jsClient.On("PostAction", mock.Anything, mock.Anything).Return(cjob.ErrJobNotFound) t.dao.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + t.execDAO.On("RefreshStatus", mock.Anything, mock.Anything).Return(true, "", nil) err := t.mgr.Stop(nil, 1) t.Require().Nil(err) t.dao.AssertExpectations(t.T()) t.jsClient.AssertExpectations(t.T()) + t.execDAO.AssertExpectations(t.T()) // reset mock t.SetupTest()