diff --git a/app/controlplane/internal/service/workflowrun.go b/app/controlplane/internal/service/workflowrun.go index 224686d9a..23d43fc45 100644 --- a/app/controlplane/internal/service/workflowrun.go +++ b/app/controlplane/internal/service/workflowrun.go @@ -212,6 +212,10 @@ func pbWorkflowRunStatusToBiz(st pb.RunStatus) (biz.WorkflowRunStatus, error) { } func bizProjectVersionToPb(v *biz.ProjectVersion) *pb.ProjectVersion { + if v == nil { + return nil + } + return &pb.ProjectVersion{ Id: v.ID.String(), Version: v.Version, diff --git a/app/controlplane/pkg/biz/mocks/WorkflowRunRepo.go b/app/controlplane/pkg/biz/mocks/WorkflowRunRepo.go index caa02009b..1914406e0 100644 --- a/app/controlplane/pkg/biz/mocks/WorkflowRunRepo.go +++ b/app/controlplane/pkg/biz/mocks/WorkflowRunRepo.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.20.0. DO NOT EDIT. +// Code generated by mockery v2.43.2. DO NOT EDIT. package mocks @@ -27,6 +27,10 @@ type WorkflowRunRepo struct { func (_m *WorkflowRunRepo) Create(ctx context.Context, opts *biz.WorkflowRunRepoCreateOpts) (*biz.WorkflowRun, error) { ret := _m.Called(ctx, opts) + if len(ret) == 0 { + panic("no return value specified for Create") + } + var r0 *biz.WorkflowRun var r1 error if rf, ok := ret.Get(0).(func(context.Context, *biz.WorkflowRunRepoCreateOpts) (*biz.WorkflowRun, error)); ok { @@ -53,6 +57,10 @@ func (_m *WorkflowRunRepo) Create(ctx context.Context, opts *biz.WorkflowRunRepo func (_m *WorkflowRunRepo) Expire(ctx context.Context, id uuid.UUID) error { ret := _m.Called(ctx, id) + if len(ret) == 0 { + panic("no return value specified for Expire") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, uuid.UUID) error); ok { r0 = rf(ctx, id) @@ -67,6 +75,10 @@ func (_m *WorkflowRunRepo) Expire(ctx context.Context, id uuid.UUID) error { func (_m *WorkflowRunRepo) FindByAttestationDigest(ctx context.Context, digest string) (*biz.WorkflowRun, error) { ret := _m.Called(ctx, digest) + if len(ret) == 0 { + panic("no return value specified for FindByAttestationDigest") + } + var r0 *biz.WorkflowRun var r1 error if rf, ok := ret.Get(0).(func(context.Context, string) (*biz.WorkflowRun, error)); ok { @@ -93,6 +105,10 @@ func (_m *WorkflowRunRepo) FindByAttestationDigest(ctx context.Context, digest s func (_m *WorkflowRunRepo) FindByID(ctx context.Context, ID uuid.UUID) (*biz.WorkflowRun, error) { ret := _m.Called(ctx, ID) + if len(ret) == 0 { + panic("no return value specified for FindByID") + } + var r0 *biz.WorkflowRun var r1 error if rf, ok := ret.Get(0).(func(context.Context, uuid.UUID) (*biz.WorkflowRun, error)); ok { @@ -119,6 +135,10 @@ func (_m *WorkflowRunRepo) FindByID(ctx context.Context, ID uuid.UUID) (*biz.Wor func (_m *WorkflowRunRepo) FindByIDInOrg(ctx context.Context, orgID uuid.UUID, ID uuid.UUID) (*biz.WorkflowRun, error) { ret := _m.Called(ctx, orgID, ID) + if len(ret) == 0 { + panic("no return value specified for FindByIDInOrg") + } + var r0 *biz.WorkflowRun var r1 error if rf, ok := ret.Get(0).(func(context.Context, uuid.UUID, uuid.UUID) (*biz.WorkflowRun, error)); ok { @@ -145,6 +165,10 @@ func (_m *WorkflowRunRepo) FindByIDInOrg(ctx context.Context, orgID uuid.UUID, I func (_m *WorkflowRunRepo) List(ctx context.Context, orgID uuid.UUID, f *biz.RunListFilters, p *pagination.CursorOptions) ([]*biz.WorkflowRun, string, error) { ret := _m.Called(ctx, orgID, f, p) + if len(ret) == 0 { + panic("no return value specified for List") + } + var r0 []*biz.WorkflowRun var r1 string var r2 error @@ -174,25 +198,29 @@ func (_m *WorkflowRunRepo) List(ctx context.Context, orgID uuid.UUID, f *biz.Run return r0, r1, r2 } -// ListNotFinishedOlderThan provides a mock function with given fields: ctx, olderThan -func (_m *WorkflowRunRepo) ListNotFinishedOlderThan(ctx context.Context, olderThan time.Time) ([]*biz.WorkflowRun, error) { - ret := _m.Called(ctx, olderThan) +// ListNotFinishedOlderThan provides a mock function with given fields: ctx, olderThan, limit +func (_m *WorkflowRunRepo) ListNotFinishedOlderThan(ctx context.Context, olderThan time.Time, limit int) ([]*biz.WorkflowRun, error) { + ret := _m.Called(ctx, olderThan, limit) + + if len(ret) == 0 { + panic("no return value specified for ListNotFinishedOlderThan") + } var r0 []*biz.WorkflowRun var r1 error - if rf, ok := ret.Get(0).(func(context.Context, time.Time) ([]*biz.WorkflowRun, error)); ok { - return rf(ctx, olderThan) + if rf, ok := ret.Get(0).(func(context.Context, time.Time, int) ([]*biz.WorkflowRun, error)); ok { + return rf(ctx, olderThan, limit) } - if rf, ok := ret.Get(0).(func(context.Context, time.Time) []*biz.WorkflowRun); ok { - r0 = rf(ctx, olderThan) + if rf, ok := ret.Get(0).(func(context.Context, time.Time, int) []*biz.WorkflowRun); ok { + r0 = rf(ctx, olderThan, limit) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]*biz.WorkflowRun) } } - if rf, ok := ret.Get(1).(func(context.Context, time.Time) error); ok { - r1 = rf(ctx, olderThan) + if rf, ok := ret.Get(1).(func(context.Context, time.Time, int) error); ok { + r1 = rf(ctx, olderThan, limit) } else { r1 = ret.Error(1) } @@ -204,6 +232,10 @@ func (_m *WorkflowRunRepo) ListNotFinishedOlderThan(ctx context.Context, olderTh func (_m *WorkflowRunRepo) MarkAsFinished(ctx context.Context, ID uuid.UUID, status biz.WorkflowRunStatus, reason string) error { ret := _m.Called(ctx, ID, status, reason) + if len(ret) == 0 { + panic("no return value specified for MarkAsFinished") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, uuid.UUID, biz.WorkflowRunStatus, string) error); ok { r0 = rf(ctx, ID, status, reason) @@ -218,6 +250,10 @@ func (_m *WorkflowRunRepo) MarkAsFinished(ctx context.Context, ID uuid.UUID, sta func (_m *WorkflowRunRepo) SaveAttestation(ctx context.Context, ID uuid.UUID, att *dsse.Envelope, digest string) error { ret := _m.Called(ctx, ID, att, digest) + if len(ret) == 0 { + panic("no return value specified for SaveAttestation") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, uuid.UUID, *dsse.Envelope, string) error); ok { r0 = rf(ctx, ID, att, digest) @@ -228,13 +264,12 @@ func (_m *WorkflowRunRepo) SaveAttestation(ctx context.Context, ID uuid.UUID, at return r0 } -type mockConstructorTestingTNewWorkflowRunRepo interface { +// NewWorkflowRunRepo creates a new instance of WorkflowRunRepo. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewWorkflowRunRepo(t interface { mock.TestingT Cleanup(func()) -} - -// NewWorkflowRunRepo creates a new instance of WorkflowRunRepo. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewWorkflowRunRepo(t mockConstructorTestingTNewWorkflowRunRepo) *WorkflowRunRepo { +}) *WorkflowRunRepo { mock := &WorkflowRunRepo{} mock.Mock.Test(t) diff --git a/app/controlplane/pkg/biz/workflowrun.go b/app/controlplane/pkg/biz/workflowrun.go index 6e59103f9..d040eb7f9 100644 --- a/app/controlplane/pkg/biz/workflowrun.go +++ b/app/controlplane/pkg/biz/workflowrun.go @@ -78,7 +78,7 @@ type WorkflowRunRepo interface { SaveAttestation(ctx context.Context, ID uuid.UUID, att *dsse.Envelope, digest string) error List(ctx context.Context, orgID uuid.UUID, f *RunListFilters, p *pagination.CursorOptions) ([]*WorkflowRun, string, error) // List the runs that have not finished and are older than a given time - ListNotFinishedOlderThan(ctx context.Context, olderThan time.Time) ([]*WorkflowRun, error) + ListNotFinishedOlderThan(ctx context.Context, olderThan time.Time, limit int) ([]*WorkflowRun, error) // Set run as expired Expire(ctx context.Context, id uuid.UUID) error } @@ -149,7 +149,8 @@ func (uc *WorkflowRunExpirerUseCase) Run(ctx context.Context, opts *WorkflowRunE func (uc *WorkflowRunExpirerUseCase) ExpirationSweep(ctx context.Context, olderThan time.Time) error { uc.logger.Debugf("expiration sweep - runs older than %s", olderThan.Format(time.RFC822)) - toExpire, err := uc.wfRunRepo.ListNotFinishedOlderThan(ctx, olderThan) + const maxNumberOfRunsToExpire = 100 + toExpire, err := uc.wfRunRepo.ListNotFinishedOlderThan(ctx, olderThan, maxNumberOfRunsToExpire) if err != nil { return err } diff --git a/app/controlplane/pkg/biz/workflowrun_test.go b/app/controlplane/pkg/biz/workflowrun_test.go index 678990e04..cd73a580a 100644 --- a/app/controlplane/pkg/biz/workflowrun_test.go +++ b/app/controlplane/pkg/biz/workflowrun_test.go @@ -1,5 +1,5 @@ // -// Copyright 2023 The Chainloop Authors. +// Copyright 2024 The Chainloop Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -76,7 +76,7 @@ func (s *workflowrunExpirerTestSuite) SetupTest() { func (s *workflowrunExpirerTestSuite) TestSweepListError() { assert := assert.New(s.T()) - s.repo.On("ListNotFinishedOlderThan", s.ctx, s.threshold).Return(nil, s.err) + s.repo.On("ListNotFinishedOlderThan", s.ctx, s.threshold, 100).Return(nil, s.err) err := s.useCase.ExpirationSweep(s.ctx, s.threshold) assert.ErrorIs(s.err, err) } @@ -84,7 +84,7 @@ func (s *workflowrunExpirerTestSuite) TestSweepListError() { func (s *workflowrunExpirerTestSuite) TestSweepExpireError() { assert := assert.New(s.T()) - s.repo.On("ListNotFinishedOlderThan", s.ctx, s.threshold).Return(s.toExpire, nil) + s.repo.On("ListNotFinishedOlderThan", s.ctx, s.threshold, 100).Return(s.toExpire, nil) s.repo.On("Expire", s.ctx, s.toExpire[0].ID).Return(s.err) err := s.useCase.ExpirationSweep(s.ctx, s.threshold) assert.Error(err) @@ -92,7 +92,7 @@ func (s *workflowrunExpirerTestSuite) TestSweepExpireError() { func (s *workflowrunExpirerTestSuite) TestSweepExpireOK() { assert := assert.New(s.T()) - s.repo.On("ListNotFinishedOlderThan", s.ctx, s.threshold).Return(s.toExpire, nil) + s.repo.On("ListNotFinishedOlderThan", s.ctx, s.threshold, 100).Return(s.toExpire, nil) s.repo.On("Expire", s.ctx, s.toExpire[0].ID).Return(nil) s.repo.On("Expire", s.ctx, s.toExpire[1].ID).Return(nil) diff --git a/app/controlplane/pkg/data/ent/migrate/migrations/20241122101039.sql b/app/controlplane/pkg/data/ent/migrate/migrations/20241122101039.sql new file mode 100644 index 000000000..ca6dfa1b5 --- /dev/null +++ b/app/controlplane/pkg/data/ent/migrate/migrations/20241122101039.sql @@ -0,0 +1,2 @@ +-- Create index "workflow_workflow_contract" to table: "workflows" +CREATE INDEX "workflow_workflow_contract" ON "workflows" ("workflow_contract") WHERE (deleted_at IS NULL); diff --git a/app/controlplane/pkg/data/ent/migrate/migrations/atlas.sum b/app/controlplane/pkg/data/ent/migrate/migrations/atlas.sum index bcd8e7b7b..850a6a6f1 100644 --- a/app/controlplane/pkg/data/ent/migrate/migrations/atlas.sum +++ b/app/controlplane/pkg/data/ent/migrate/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:8xTN53Ko/Clh8hYLyMM0srzO/e46I2LqvYem1Pd0zKk= +h1:0W3czzI3+TOY9TZ8+RAb/lBpwmVolPs0x20MYEkaWcE= 20230706165452_init-schema.sql h1:VvqbNFEQnCvUVyj2iDYVQQxDM0+sSXqocpt/5H64k8M= 20230710111950-cas-backend.sql h1:A8iBuSzZIEbdsv9ipBtscZQuaBp3V5/VMw7eZH6GX+g= 20230712094107-cas-backends-workflow-runs.sql h1:a5rzxpVGyd56nLRSsKrmCFc9sebg65RWzLghKHh5xvI= @@ -64,3 +64,4 @@ h1:8xTN53Ko/Clh8hYLyMM0srzO/e46I2LqvYem1Pd0zKk= 20241114164704.sql h1:raGzf5OXqWpILpRLluUBQzOKErcYOSDde3d3FOW9f44= 20241120213143.sql h1:H4rw5GXrUCa75BtilsytT4paAge15JkoRhepfErQOYA= 20241120214218.sql h1:zur/2VsvAWT2xgmqX/3cRt95+GHap0OwF158RiJuNHI= +20241122101039.sql h1:RMk8MDWj/RhbVKX9ERHEI96tFFPuMEwNZMk65FanOvM= diff --git a/app/controlplane/pkg/data/ent/migrate/schema.go b/app/controlplane/pkg/data/ent/migrate/schema.go index d8b9875f1..befe0fb57 100644 --- a/app/controlplane/pkg/data/ent/migrate/schema.go +++ b/app/controlplane/pkg/data/ent/migrate/schema.go @@ -467,6 +467,14 @@ var ( Where: "deleted_at IS NULL", }, }, + { + Name: "workflow_workflow_contract", + Unique: false, + Columns: []*schema.Column{WorkflowsColumns[12]}, + Annotation: &entsql.IndexAnnotation{ + Where: "deleted_at IS NULL", + }, + }, }, } // WorkflowContractsColumns holds the columns for the "workflow_contracts" table. diff --git a/app/controlplane/pkg/data/ent/schema/workflow.go b/app/controlplane/pkg/data/ent/schema/workflow.go index 92fba08d8..6b850c362 100644 --- a/app/controlplane/pkg/data/ent/schema/workflow.go +++ b/app/controlplane/pkg/data/ent/schema/workflow.go @@ -86,5 +86,8 @@ func (Workflow) Indexes() []ent.Index { index.Fields("organization_id", "id").Unique().Annotations( entsql.IndexWhere("deleted_at IS NULL"), ), + index.Edges("contract").Annotations( + entsql.IndexWhere("deleted_at IS NULL"), + ), } } diff --git a/app/controlplane/pkg/data/workflow.go b/app/controlplane/pkg/data/workflow.go index c1dbc1858..7dbbde8e4 100644 --- a/app/controlplane/pkg/data/workflow.go +++ b/app/controlplane/pkg/data/workflow.go @@ -168,6 +168,7 @@ func (r *WorkflowRepo) List(ctx context.Context, orgID uuid.UUID, filter *biz.Wo // Apply pagination options and execute the query workflows, err := wfQuery. WithLatestWorkflowRun(). + WithProject(). Order(ent.Desc(workflow.FieldCreatedAt)). Limit(pagination.Limit()). Offset(pagination.Offset()). @@ -258,7 +259,7 @@ func (r *WorkflowRepo) GetOrgScoped(ctx context.Context, orgID, workflowID uuid. workflow, err := orgScopedQuery(r.data.DB, orgID). QueryWorkflows(). Where(workflow.ID(workflowID), workflow.DeletedAtIsNil()). - WithContract().WithOrganization().WithLatestWorkflowRun(). + WithContract().WithOrganization().WithLatestWorkflowRun().WithProject(). Only(ctx) if err != nil { @@ -282,7 +283,7 @@ func (r *WorkflowRepo) GetOrgScopedByProjectAndName(ctx context.Context, orgID u } wf, err := r.data.DB.Workflow.Query().Where(workflow.ProjectIDEQ(p.ID), workflow.Name(workflowName), workflow.DeletedAtIsNil()). - WithContract().WithOrganization().WithProject().WithLatestWorkflowRun().First(ctx) + WithContract().WithOrganization().WithProject().WithLatestWorkflowRun().WithProject().First(ctx) if err != nil { if ent.IsNotFound(err) { return nil, biz.NewErrNotFound("workflow") @@ -300,7 +301,7 @@ func (r *WorkflowRepo) IncRunsCounter(ctx context.Context, workflowID uuid.UUID) func (r *WorkflowRepo) FindByID(ctx context.Context, id uuid.UUID) (*biz.Workflow, error) { workflow, err := r.data.DB.Workflow.Query(). Where(workflow.DeletedAtIsNil(), workflow.ID(id)). - WithContract().WithOrganization().WithLatestWorkflowRun(). + WithContract().WithOrganization().WithLatestWorkflowRun().WithProject(). Only(ctx) if err != nil { if ent.IsNotFound(err) { @@ -351,22 +352,13 @@ func entWFToBizWF(ctx context.Context, w *ent.Workflow) (*biz.Workflow, error) { Public: w.Public, Description: w.Description, OrgID: w.OrganizationID, + ProjectID: w.ProjectID, } - // Set project either pre-loaded or queried - if project := w.Edges.Project; project != nil { - wf.Project = project.Name - } else { - project, err := w.QueryProject().Only(ctx) - if err != nil { - return nil, err - } - wf.Project = project.Name - wf.ProjectID = project.ID - } - - if wf.Project == "" { - return nil, fmt.Errorf("workflow %s has no project", w.ID) + // Set p either pre-loaded or queried + if p := w.Edges.Project; p != nil { + wf.Project = p.Name + wf.ProjectID = p.ID } if contract := w.Edges.Contract; contract != nil { diff --git a/app/controlplane/pkg/data/workflowcontract.go b/app/controlplane/pkg/data/workflowcontract.go index 0958b0fab..af08313b9 100644 --- a/app/controlplane/pkg/data/workflowcontract.go +++ b/app/controlplane/pkg/data/workflowcontract.go @@ -396,7 +396,7 @@ func getWorkflowReferences(ctx context.Context, schema *ent.WorkflowContract) ([ workflows := schema.Edges.Workflows if workflows == nil { var err error - workflows, err = schema.QueryWorkflows(). + workflows, err = schema.QueryWorkflows().WithProject(). Where(workflow.DeletedAtIsNil()).WithProject(). Select(workflowcontract.FieldID).All(ctx) if err != nil { diff --git a/app/controlplane/pkg/data/workflowrun.go b/app/controlplane/pkg/data/workflowrun.go index 4e174b310..1dab11488 100644 --- a/app/controlplane/pkg/data/workflowrun.go +++ b/app/controlplane/pkg/data/workflowrun.go @@ -228,7 +228,7 @@ func (r *WorkflowRunRepo) List(ctx context.Context, orgID uuid.UUID, filters *bi }) } - workflowRuns, err := wfRunsQuery.All(ctx) + workflowRuns, err := wfRunsQuery.WithVersion().All(ctx) if err != nil { return nil, "", err } @@ -252,14 +252,15 @@ func (r *WorkflowRunRepo) List(ctx context.Context, orgID uuid.UUID, filters *bi return result, cursor, nil } -func (r *WorkflowRunRepo) ListNotFinishedOlderThan(ctx context.Context, olderThan time.Time) ([]*biz.WorkflowRun, error) { +func (r *WorkflowRunRepo) ListNotFinishedOlderThan(ctx context.Context, olderThan time.Time, limit int) ([]*biz.WorkflowRun, error) { + q := r.data.DB.WorkflowRun.Query().WithWorkflow().Where(workflowrun.CreatedAtLTE(olderThan)).Where(workflowrun.StateEQ(biz.WorkflowRunInitialized)) + if limit > 0 { + q = q.Limit(limit) + } + // TODO: Look into adding upper bound on the createdAt column to prevent full table scans // For now this is fine especially because we have a composite index - workflowRuns, err := r.data.DB.WorkflowRun.Query(). - WithWorkflow(). - Where(workflowrun.CreatedAtLTE(olderThan)). - Where(workflowrun.StateEQ(biz.WorkflowRunInitialized)). - All(ctx) + workflowRuns, err := q.All(ctx) if err != nil { return nil, err } @@ -316,17 +317,10 @@ func entWrToBizWr(ctx context.Context, wr *ent.WorkflowRun) (*biz.WorkflowRun, e } // Load version preloaded or otherwise query it - var err error - version := wr.Edges.Version - if version == nil { - version, err = wr.QueryVersion().Only(ctx) - if err != nil { - return nil, fmt.Errorf("failed to query version: %w", err) - } + if wr.Edges.Version != nil { + r.ProjectVersion = entProjectVersionToBiz(wr.Edges.Version) } - r.ProjectVersion = entProjectVersionToBiz(version) - if backends := wr.Edges.CasBackends; backends != nil { for _, b := range backends { r.CASBackends = append(r.CASBackends, entCASBackendToBiz(b))