Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions app/controlplane/internal/service/workflowrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ func pbWorkflowRunStatusToBiz(st pb.RunStatus) (biz.WorkflowRunStatus, error) {
}

func bizProjectVersionToPb(v *biz.ProjectVersion) *pb.ProjectVersion {
if v == nil {
return nil
}

return &pb.ProjectVersion{
Id: v.ID.String(),
Version: v.Version,
Expand Down
65 changes: 50 additions & 15 deletions app/controlplane/pkg/biz/mocks/WorkflowRunRepo.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions app/controlplane/pkg/biz/workflowrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type WorkflowRunRepo interface {
SaveAttestation(ctx context.Context, ID uuid.UUID, att *dsse.Envelope, digest string) error
List(ctx context.Context, orgID uuid.UUID, f *RunListFilters, p *pagination.CursorOptions) ([]*WorkflowRun, string, error)
// List the runs that have not finished and are older than a given time
ListNotFinishedOlderThan(ctx context.Context, olderThan time.Time) ([]*WorkflowRun, error)
ListNotFinishedOlderThan(ctx context.Context, olderThan time.Time, limit int) ([]*WorkflowRun, error)
// Set run as expired
Expire(ctx context.Context, id uuid.UUID) error
}
Expand Down Expand Up @@ -149,7 +149,8 @@ func (uc *WorkflowRunExpirerUseCase) Run(ctx context.Context, opts *WorkflowRunE
func (uc *WorkflowRunExpirerUseCase) ExpirationSweep(ctx context.Context, olderThan time.Time) error {
uc.logger.Debugf("expiration sweep - runs older than %s", olderThan.Format(time.RFC822))

toExpire, err := uc.wfRunRepo.ListNotFinishedOlderThan(ctx, olderThan)
const maxNumberOfRunsToExpire = 100
toExpire, err := uc.wfRunRepo.ListNotFinishedOlderThan(ctx, olderThan, maxNumberOfRunsToExpire)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions app/controlplane/pkg/biz/workflowrun_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2023 The Chainloop Authors.
// Copyright 2024 The Chainloop Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -76,23 +76,23 @@ func (s *workflowrunExpirerTestSuite) SetupTest() {
func (s *workflowrunExpirerTestSuite) TestSweepListError() {
assert := assert.New(s.T())

s.repo.On("ListNotFinishedOlderThan", s.ctx, s.threshold).Return(nil, s.err)
s.repo.On("ListNotFinishedOlderThan", s.ctx, s.threshold, 100).Return(nil, s.err)
err := s.useCase.ExpirationSweep(s.ctx, s.threshold)
assert.ErrorIs(s.err, err)
}

func (s *workflowrunExpirerTestSuite) TestSweepExpireError() {
assert := assert.New(s.T())

s.repo.On("ListNotFinishedOlderThan", s.ctx, s.threshold).Return(s.toExpire, nil)
s.repo.On("ListNotFinishedOlderThan", s.ctx, s.threshold, 100).Return(s.toExpire, nil)
s.repo.On("Expire", s.ctx, s.toExpire[0].ID).Return(s.err)
err := s.useCase.ExpirationSweep(s.ctx, s.threshold)
assert.Error(err)
}
func (s *workflowrunExpirerTestSuite) TestSweepExpireOK() {
assert := assert.New(s.T())

s.repo.On("ListNotFinishedOlderThan", s.ctx, s.threshold).Return(s.toExpire, nil)
s.repo.On("ListNotFinishedOlderThan", s.ctx, s.threshold, 100).Return(s.toExpire, nil)

s.repo.On("Expire", s.ctx, s.toExpire[0].ID).Return(nil)
s.repo.On("Expire", s.ctx, s.toExpire[1].ID).Return(nil)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Create index "workflow_workflow_contract" to table: "workflows"
CREATE INDEX "workflow_workflow_contract" ON "workflows" ("workflow_contract") WHERE (deleted_at IS NULL);
3 changes: 2 additions & 1 deletion app/controlplane/pkg/data/ent/migrate/migrations/atlas.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
h1:8xTN53Ko/Clh8hYLyMM0srzO/e46I2LqvYem1Pd0zKk=
h1:0W3czzI3+TOY9TZ8+RAb/lBpwmVolPs0x20MYEkaWcE=
20230706165452_init-schema.sql h1:VvqbNFEQnCvUVyj2iDYVQQxDM0+sSXqocpt/5H64k8M=
20230710111950-cas-backend.sql h1:A8iBuSzZIEbdsv9ipBtscZQuaBp3V5/VMw7eZH6GX+g=
20230712094107-cas-backends-workflow-runs.sql h1:a5rzxpVGyd56nLRSsKrmCFc9sebg65RWzLghKHh5xvI=
Expand Down Expand Up @@ -64,3 +64,4 @@ h1:8xTN53Ko/Clh8hYLyMM0srzO/e46I2LqvYem1Pd0zKk=
20241114164704.sql h1:raGzf5OXqWpILpRLluUBQzOKErcYOSDde3d3FOW9f44=
20241120213143.sql h1:H4rw5GXrUCa75BtilsytT4paAge15JkoRhepfErQOYA=
20241120214218.sql h1:zur/2VsvAWT2xgmqX/3cRt95+GHap0OwF158RiJuNHI=
20241122101039.sql h1:RMk8MDWj/RhbVKX9ERHEI96tFFPuMEwNZMk65FanOvM=
8 changes: 8 additions & 0 deletions app/controlplane/pkg/data/ent/migrate/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,14 @@ var (
Where: "deleted_at IS NULL",
},
},
{
Name: "workflow_workflow_contract",
Unique: false,
Columns: []*schema.Column{WorkflowsColumns[12]},
Annotation: &entsql.IndexAnnotation{
Where: "deleted_at IS NULL",
},
},
},
}
// WorkflowContractsColumns holds the columns for the "workflow_contracts" table.
Expand Down
3 changes: 3 additions & 0 deletions app/controlplane/pkg/data/ent/schema/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,5 +86,8 @@ func (Workflow) Indexes() []ent.Index {
index.Fields("organization_id", "id").Unique().Annotations(
entsql.IndexWhere("deleted_at IS NULL"),
),
index.Edges("contract").Annotations(
entsql.IndexWhere("deleted_at IS NULL"),
),
}
}
26 changes: 9 additions & 17 deletions app/controlplane/pkg/data/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func (r *WorkflowRepo) List(ctx context.Context, orgID uuid.UUID, filter *biz.Wo
// Apply pagination options and execute the query
workflows, err := wfQuery.
WithLatestWorkflowRun().
WithProject().
Order(ent.Desc(workflow.FieldCreatedAt)).
Limit(pagination.Limit()).
Offset(pagination.Offset()).
Expand Down Expand Up @@ -258,7 +259,7 @@ func (r *WorkflowRepo) GetOrgScoped(ctx context.Context, orgID, workflowID uuid.
workflow, err := orgScopedQuery(r.data.DB, orgID).
QueryWorkflows().
Where(workflow.ID(workflowID), workflow.DeletedAtIsNil()).
WithContract().WithOrganization().WithLatestWorkflowRun().
WithContract().WithOrganization().WithLatestWorkflowRun().WithProject().
Only(ctx)

if err != nil {
Expand All @@ -282,7 +283,7 @@ func (r *WorkflowRepo) GetOrgScopedByProjectAndName(ctx context.Context, orgID u
}

wf, err := r.data.DB.Workflow.Query().Where(workflow.ProjectIDEQ(p.ID), workflow.Name(workflowName), workflow.DeletedAtIsNil()).
WithContract().WithOrganization().WithProject().WithLatestWorkflowRun().First(ctx)
WithContract().WithOrganization().WithProject().WithLatestWorkflowRun().WithProject().First(ctx)
if err != nil {
if ent.IsNotFound(err) {
return nil, biz.NewErrNotFound("workflow")
Expand All @@ -300,7 +301,7 @@ func (r *WorkflowRepo) IncRunsCounter(ctx context.Context, workflowID uuid.UUID)
func (r *WorkflowRepo) FindByID(ctx context.Context, id uuid.UUID) (*biz.Workflow, error) {
workflow, err := r.data.DB.Workflow.Query().
Where(workflow.DeletedAtIsNil(), workflow.ID(id)).
WithContract().WithOrganization().WithLatestWorkflowRun().
WithContract().WithOrganization().WithLatestWorkflowRun().WithProject().
Only(ctx)
if err != nil {
if ent.IsNotFound(err) {
Expand Down Expand Up @@ -351,22 +352,13 @@ func entWFToBizWF(ctx context.Context, w *ent.Workflow) (*biz.Workflow, error) {
Public: w.Public,
Description: w.Description,
OrgID: w.OrganizationID,
ProjectID: w.ProjectID,
}

// Set project either pre-loaded or queried
if project := w.Edges.Project; project != nil {
wf.Project = project.Name
} else {
project, err := w.QueryProject().Only(ctx)
if err != nil {
return nil, err
}
wf.Project = project.Name
wf.ProjectID = project.ID
}

if wf.Project == "" {
return nil, fmt.Errorf("workflow %s has no project", w.ID)
// Set p either pre-loaded or queried
if p := w.Edges.Project; p != nil {
wf.Project = p.Name
wf.ProjectID = p.ID
}

if contract := w.Edges.Contract; contract != nil {
Expand Down
2 changes: 1 addition & 1 deletion app/controlplane/pkg/data/workflowcontract.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ func getWorkflowReferences(ctx context.Context, schema *ent.WorkflowContract) ([
workflows := schema.Edges.Workflows
if workflows == nil {
var err error
workflows, err = schema.QueryWorkflows().
workflows, err = schema.QueryWorkflows().WithProject().
Where(workflow.DeletedAtIsNil()).WithProject().
Select(workflowcontract.FieldID).All(ctx)
if err != nil {
Expand Down
26 changes: 10 additions & 16 deletions app/controlplane/pkg/data/workflowrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (r *WorkflowRunRepo) List(ctx context.Context, orgID uuid.UUID, filters *bi
})
}

workflowRuns, err := wfRunsQuery.All(ctx)
workflowRuns, err := wfRunsQuery.WithVersion().All(ctx)
if err != nil {
return nil, "", err
}
Expand All @@ -252,14 +252,15 @@ func (r *WorkflowRunRepo) List(ctx context.Context, orgID uuid.UUID, filters *bi
return result, cursor, nil
}

func (r *WorkflowRunRepo) ListNotFinishedOlderThan(ctx context.Context, olderThan time.Time) ([]*biz.WorkflowRun, error) {
func (r *WorkflowRunRepo) ListNotFinishedOlderThan(ctx context.Context, olderThan time.Time, limit int) ([]*biz.WorkflowRun, error) {
q := r.data.DB.WorkflowRun.Query().WithWorkflow().Where(workflowrun.CreatedAtLTE(olderThan)).Where(workflowrun.StateEQ(biz.WorkflowRunInitialized))
if limit > 0 {
q = q.Limit(limit)
}

// TODO: Look into adding upper bound on the createdAt column to prevent full table scans
// For now this is fine especially because we have a composite index
workflowRuns, err := r.data.DB.WorkflowRun.Query().
WithWorkflow().
Where(workflowrun.CreatedAtLTE(olderThan)).
Where(workflowrun.StateEQ(biz.WorkflowRunInitialized)).
All(ctx)
workflowRuns, err := q.All(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -316,17 +317,10 @@ func entWrToBizWr(ctx context.Context, wr *ent.WorkflowRun) (*biz.WorkflowRun, e
}

// Load version preloaded or otherwise query it
var err error
version := wr.Edges.Version
if version == nil {
version, err = wr.QueryVersion().Only(ctx)
if err != nil {
return nil, fmt.Errorf("failed to query version: %w", err)
}
if wr.Edges.Version != nil {
r.ProjectVersion = entProjectVersionToBiz(wr.Edges.Version)
}

r.ProjectVersion = entProjectVersionToBiz(version)

if backends := wr.Edges.CasBackends; backends != nil {
for _, b := range backends {
r.CASBackends = append(r.CASBackends, entCASBackendToBiz(b))
Expand Down
Loading