This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 62
/
execution_repo.go
122 lines (110 loc) · 3.79 KB
/
execution_repo.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package gormimpl
import (
"context"
"errors"
"fmt"
"github.com/flyteorg/flyteadmin/pkg/common"
adminErrors "github.com/flyteorg/flyteadmin/pkg/repositories/errors"
"github.com/flyteorg/flyteadmin/pkg/repositories/interfaces"
"github.com/flyteorg/flyteadmin/pkg/repositories/models"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flytestdlib/promutils"
"gorm.io/gorm"
)
// Implementation of ExecutionInterface.
type ExecutionRepo struct {
db *gorm.DB
errorTransformer adminErrors.ErrorTransformer
metrics gormMetrics
}
func (r *ExecutionRepo) Create(ctx context.Context, input models.Execution) error {
timer := r.metrics.CreateDuration.Start()
tx := r.db.Omit("id").Create(&input)
timer.Stop()
if tx.Error != nil {
return r.errorTransformer.ToFlyteAdminError(tx.Error)
}
return nil
}
func (r *ExecutionRepo) Get(ctx context.Context, input interfaces.Identifier) (models.Execution, error) {
var execution models.Execution
timer := r.metrics.GetDuration.Start()
tx := r.db.Where(&models.Execution{
ExecutionKey: models.ExecutionKey{
Project: input.Project,
Domain: input.Domain,
Name: input.Name,
},
}).Take(&execution)
timer.Stop()
if tx.Error != nil && errors.Is(tx.Error, gorm.ErrRecordNotFound) {
return models.Execution{}, adminErrors.GetMissingEntityError("execution", &core.Identifier{
Project: input.Project,
Domain: input.Domain,
Name: input.Name,
})
} else if tx.Error != nil {
return models.Execution{}, r.errorTransformer.ToFlyteAdminError(tx.Error)
}
return execution, nil
}
func (r *ExecutionRepo) Update(ctx context.Context, execution models.Execution) error {
timer := r.metrics.UpdateDuration.Start()
tx := r.db.Model(&execution).Updates(execution)
timer.Stop()
if err := tx.Error; err != nil {
return r.errorTransformer.ToFlyteAdminError(err)
}
return nil
}
func (r *ExecutionRepo) List(ctx context.Context, input interfaces.ListResourceInput) (
interfaces.ExecutionCollectionOutput, error) {
var err error
// First validate input.
if err = ValidateListInput(input); err != nil {
return interfaces.ExecutionCollectionOutput{}, err
}
var executions []models.Execution
tx := r.db.Limit(input.Limit).Offset(input.Offset)
// And add join condition as required by user-specified filters (which can potentially include join table attrs).
if ok := input.JoinTableEntities[common.LaunchPlan]; ok {
tx = tx.Joins(fmt.Sprintf("INNER JOIN %s ON %s.launch_plan_id = %s.id",
launchPlanTableName, executionTableName, launchPlanTableName))
}
if ok := input.JoinTableEntities[common.Workflow]; ok {
tx = tx.Joins(fmt.Sprintf("INNER JOIN %s ON %s.workflow_id = %s.id",
workflowTableName, executionTableName, workflowTableName))
}
if ok := input.JoinTableEntities[common.Task]; ok {
tx = tx.Joins(fmt.Sprintf("INNER JOIN %s ON %s.task_id = %s.id",
taskTableName, executionTableName, taskTableName))
}
// Apply filters
tx, err = applyScopedFilters(tx, input.InlineFilters, input.MapFilters)
if err != nil {
return interfaces.ExecutionCollectionOutput{}, err
}
// Apply sort ordering.
if input.SortParameter != nil {
tx = tx.Order(input.SortParameter.GetGormOrderExpr())
}
timer := r.metrics.ListDuration.Start()
tx = tx.Find(&executions)
timer.Stop()
if tx.Error != nil {
return interfaces.ExecutionCollectionOutput{}, r.errorTransformer.ToFlyteAdminError(tx.Error)
}
return interfaces.ExecutionCollectionOutput{
Executions: executions,
}, nil
}
// Returns an instance of ExecutionRepoInterface
func NewExecutionRepo(
db *gorm.DB, errorTransformer adminErrors.ErrorTransformer, scope promutils.Scope) interfaces.ExecutionRepoInterface {
metrics := newMetrics(scope)
return &ExecutionRepo{
db: db,
errorTransformer: errorTransformer,
metrics: metrics,
}
}