Skip to content

Commit

Permalink
Introduce system artifact manager cleanup job
Browse files Browse the repository at this point in the history
Signed-off-by: prahaladdarkin <prahaladd@vmware.com>
  • Loading branch information
prahaladdarkin committed Jun 2, 2022
1 parent 2852a76 commit e217d79
Show file tree
Hide file tree
Showing 8 changed files with 398 additions and 3 deletions.
26 changes: 26 additions & 0 deletions src/controller/systemartifact/callback.go
@@ -0,0 +1,26 @@
package systemartifact

import (
"context"
"github.com/goharbor/harbor/src/jobservice/logger"
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/pkg/scheduler"
"github.com/goharbor/harbor/src/pkg/task"
)

const (
SystemArtifactCleanupCallback = "SYSTEM_ARTIFACT_CLEANUP"
)

func init() {
if err := scheduler.RegisterCallbackFunc(SystemArtifactCleanupCallback, cleanupCallBack); err != nil {
log.Fatalf("failed to register the callback for the system artifact cleanup schedule, error %v", err)
}
}

func cleanupCallBack(ctx context.Context, param string) error {

err := Ctl.Start(ctx, true, task.ExecutionTriggerSchedule)
logger.Errorf("System artifact cleanup job encountered errors: %v", err)
return err
}
152 changes: 152 additions & 0 deletions src/controller/systemartifact/execution.go
@@ -0,0 +1,152 @@
package systemartifact

import (
"context"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/logger"
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/lib/orm"
"github.com/goharbor/harbor/src/lib/q"
"github.com/goharbor/harbor/src/lib/retry"
"github.com/goharbor/harbor/src/pkg/scheduler"
"github.com/goharbor/harbor/src/pkg/systemartifact"
"github.com/goharbor/harbor/src/pkg/task"
"time"
)

const (
VendorTypeSystemArtifactCleanup = "SYSTEM_ARTIFACT_CLEANUP"
cronTypeDaily = "Daily"
cronSpec = "0 0 0 * * *"
)

func init() {
task.SetExecutionSweeperCount(VendorTypeSystemArtifactCleanup, 50)
}

var Ctl = NewController()

type Controller interface {
Start(ctx context.Context, async bool, trigger string) error
}

func NewController() Controller {
return &controller{
execMgr: task.ExecMgr,
taskMgr: task.Mgr,
systemArtifactMgr: systemartifact.Mgr,
makeCtx: orm.Context,
}
}

type controller struct {
execMgr task.ExecutionManager
taskMgr task.Manager
systemArtifactMgr systemartifact.Manager
makeCtx func() context.Context
}

func (c *controller) Start(ctx context.Context, async bool, trigger string) error {
execId, err := c.execMgr.Create(ctx, VendorTypeSystemArtifactCleanup, 0, trigger)
if err != nil {
return err
}
// cleanup job would always be scheduled in async mode in production
// allowing for sync mode execution only for test mode purposes
// if there are any trigger settings then pass them to the cleanup manager first
jobParams := job.Parameters{}

if !async {
err := c.createCleanupTask(ctx, jobParams, execId)
if err != nil {
log.Errorf("failed to create system artifact clean-up task: %v", err)
return err
}

logger.Info("Created job for scan data export successfully")
return nil
}
go func(ctx context.Context) {
err := retry.Retry(func() error {
_, err := c.execMgr.Get(ctx, execId)
return err
})
if err != nil {
log.Errorf("failed to get the execution %d for the export data cleanup job", execId)
return
}
err = c.createCleanupTask(ctx, jobParams, execId)
if err != nil {
logger.Errorf("Encountered error in scan data artifact cleanup : %v", err)
return
}
}(c.makeCtx())

return nil
}

func (c *controller) createCleanupTask(ctx context.Context, jobParams job.Parameters, execId int64) error {
j := &task.Job{
Name: job.SystemArtifactCleanup,
Metadata: &job.Metadata{
JobKind: job.KindGeneric,
},
Parameters: jobParams,
}

_, err := c.taskMgr.Create(ctx, execId, j)

if err != nil {
logger.Errorf("Unable to create a scan data export job in clean-up mode : %v", err)
c.markError(ctx, execId, err)
return err
}
return nil
}

func (c *controller) markError(ctx context.Context, executionID int64, err error) {

// try to stop the execution first in case that some tasks are already created
if err := c.execMgr.StopAndWait(ctx, executionID, 10*time.Second); err != nil {
logger.Errorf("failed to stop the execution %d: %v", executionID, err)
}
if err := c.execMgr.MarkError(ctx, executionID, err.Error()); err != nil {
logger.Errorf("failed to mark error for the execution %d: %v", executionID, err)
}
}

// ScheduleCleanupTask schedules a system artifact cleanup task
func ScheduleCleanupTask(ctx context.Context) {
scheduleSystemArtifactCleanJob(ctx)
}

func scheduleSystemArtifactCleanJob(ctx context.Context) {
schedule, err := getSystemArtifactCleanupSchedule(ctx)
if err != nil {
return
}
if schedule != nil {
logger.Debugf(" Export data cleanup job already scheduled with ID : %v.", schedule.ID)
return
}
scheduleId, err := scheduler.Sched.Schedule(ctx, VendorTypeSystemArtifactCleanup, 0, cronTypeDaily, cronSpec, SystemArtifactCleanupCallback, nil, nil)
if err != nil {
log.Errorf("Encountered error when scheduling scan data export cleanup job : %v", err)
return
}
log.Infof("Scheduled scan data export cleanup job with ID : %v", scheduleId)
}

func getSystemArtifactCleanupSchedule(ctx context.Context) (*scheduler.Schedule, error) {
query := q.New(map[string]interface{}{"vendor_type": VendorTypeSystemArtifactCleanup})
schedules, err := scheduler.Sched.ListSchedules(ctx, query)
if err != nil {
logger.Errorf("Unable to check if export data cleanup job is already scheduled : %v", err)
return nil, err
}
if len(schedules) > 0 {
logger.Infof("Found export data cleanup job with schedule id : %v", schedules[0].ID)
return schedules[0], nil
}
return nil, nil
}
128 changes: 128 additions & 0 deletions src/controller/systemartifact/execution_test.go
@@ -0,0 +1,128 @@
package systemartifact

import (
"context"
"github.com/goharbor/harbor/src/lib/orm"
"github.com/goharbor/harbor/src/pkg/task"
ormtesting "github.com/goharbor/harbor/src/testing/lib/orm"
"github.com/goharbor/harbor/src/testing/mock"
"github.com/goharbor/harbor/src/testing/pkg/systemartifact"
testingTask "github.com/goharbor/harbor/src/testing/pkg/task"
"github.com/pkg/errors"
testifymock "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"testing"
)

type SystemArtifactCleanupTestSuite struct {
suite.Suite
execMgr *testingTask.ExecutionManager
taskMgr *testingTask.Manager
cleanupMgr *systemartifact.Manager
ctl *controller
}

func (suite *SystemArtifactCleanupTestSuite) SetupSuite() {
}

func (suite *SystemArtifactCleanupTestSuite) TestStartCleanup() {
suite.taskMgr = &testingTask.Manager{}
suite.execMgr = &testingTask.ExecutionManager{}
suite.cleanupMgr = &systemartifact.Manager{}
suite.ctl = &controller{
execMgr: suite.execMgr,
taskMgr: suite.taskMgr,
systemArtifactMgr: suite.cleanupMgr,
makeCtx: func() context.Context { return orm.NewContext(nil, &ormtesting.FakeOrmer{}) },
}

{

ctx := context.TODO()

executionID := int64(1)
taskId := int64(1)

suite.execMgr.On("Create", ctx, "SYSTEM_ARTIFACT_CLEANUP", int64(0), "SCHEDULE").Return(executionID, nil).Once()

suite.taskMgr.On("Create", ctx, executionID, mock.Anything).Return(taskId, nil).Once()

suite.execMgr.On("MarkDone", ctx, executionID, mock.Anything).Return(nil).Once()

err := suite.ctl.Start(ctx, false, "SCHEDULE")
suite.NoError(err)
jobMatcher := testifymock.MatchedBy(func(j *task.Job) bool {
return "SYSTEM_ARTIFACT_CLEANUP" == j.Name
})
suite.taskMgr.AssertCalled(suite.T(), "Create", ctx, executionID, jobMatcher)
}
}

func (suite *SystemArtifactCleanupTestSuite) TestStartCleanupErrorDuringCreate() {
suite.taskMgr = &testingTask.Manager{}
suite.execMgr = &testingTask.ExecutionManager{}
suite.cleanupMgr = &systemartifact.Manager{}
suite.ctl = &controller{
execMgr: suite.execMgr,
taskMgr: suite.taskMgr,
systemArtifactMgr: suite.cleanupMgr,
makeCtx: func() context.Context { return orm.NewContext(nil, &ormtesting.FakeOrmer{}) },
}

{

ctx := context.TODO()

executionID := int64(1)

suite.execMgr.On(
"Create", ctx, "SYSTEM_ARTIFACT_CLEANUP", int64(0), "SCHEDULE",
).Return(int64(0), errors.New("test error")).Once()

suite.execMgr.On("MarkDone", ctx, executionID, mock.Anything).Return(nil).Once()

err := suite.ctl.Start(ctx, false, "SCHEDULE")
suite.Error(err)
}
}

func (suite *SystemArtifactCleanupTestSuite) TestStartCleanupErrorDuringTaskCreate() {
suite.taskMgr = &testingTask.Manager{}
suite.execMgr = &testingTask.ExecutionManager{}
suite.cleanupMgr = &systemartifact.Manager{}
suite.ctl = &controller{
execMgr: suite.execMgr,
taskMgr: suite.taskMgr,
systemArtifactMgr: suite.cleanupMgr,
makeCtx: func() context.Context { return orm.NewContext(nil, &ormtesting.FakeOrmer{}) },
}

{

ctx := context.TODO()

executionID := int64(1)
taskId := int64(0)

suite.execMgr.On(
"Create", ctx, "SYSTEM_ARTIFACT_CLEANUP", int64(0), "SCHEDULE",
).Return(executionID, nil).Once()

suite.taskMgr.On("Create", ctx, executionID, mock.Anything).Return(taskId, errors.New("test error")).Once()

suite.execMgr.On("MarkError", ctx, executionID, mock.Anything).Return(nil).Once()
suite.execMgr.On("StopAndWait", ctx, executionID, mock.Anything).Return(nil).Once()

err := suite.ctl.Start(ctx, false, "SCHEDULE")
suite.Error(err)
}
}

func (suite *SystemArtifactCleanupTestSuite) TearDownSuite() {
suite.execMgr = nil
suite.taskMgr = nil
}

func TestScanDataExportExecutionTestSuite(t *testing.T) {
suite.Run(t, &SystemArtifactCleanupTestSuite{})
}
3 changes: 3 additions & 0 deletions src/core/main.go
Expand Up @@ -25,6 +25,8 @@ import (
"syscall"
"time"

"github.com/goharbor/harbor/src/controller/systemartifact"

"github.com/beego/beego"
"github.com/goharbor/harbor/src/core/session"

Expand Down Expand Up @@ -224,6 +226,7 @@ func main() {

log.Info("Fix empty subiss for meta info data.")
oidc.FixEmptySubIss(orm.Context())
systemartifact.ScheduleCleanupTask(ctx)
beego.RunWithMiddleWares("", middlewares.MiddleWares()...)
}

Expand Down
45 changes: 45 additions & 0 deletions src/jobservice/job/impl/systemartifact/cleanup.go
@@ -0,0 +1,45 @@
package systemartifact

import (
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/pkg/systemartifact"
)

type Cleanup struct {
sysArtifactManager systemartifact.Manager
}

func (c *Cleanup) MaxFails() uint {
return 1
}

func (c *Cleanup) MaxCurrency() uint {
return 1
}

func (c *Cleanup) ShouldRetry() bool {
return true
}

func (c *Cleanup) Validate(params job.Parameters) error {
return nil
}

func (c *Cleanup) Run(ctx job.Context, params job.Parameters) error {
logger := ctx.GetLogger()
logger.Infof("Running system data artifact cleanup job...")
c.init()
numRecordsDeleted, totalSizeReclaimed, err := c.sysArtifactManager.Cleanup(ctx.SystemContext())
if err != nil {
logger.Errorf("Error when executing system artifact cleanup job: %v", err)
return err
}
logger.Infof("Num System artifacts cleaned up: %d, Total space reclaimed: %d.", numRecordsDeleted, totalSizeReclaimed)
return nil
}

func (c *Cleanup) init() {
if c.sysArtifactManager == nil {
c.sysArtifactManager = systemartifact.NewManager()
}
}

0 comments on commit e217d79

Please sign in to comment.