This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 63
/
common.go
114 lines (99 loc) · 4.18 KB
/
common.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package gormimpl
import (
"fmt"
"github.com/flyteorg/flyteadmin/pkg/common"
adminErrors "github.com/flyteorg/flyteadmin/pkg/errors"
"github.com/flyteorg/flyteadmin/pkg/repositories/errors"
"github.com/flyteorg/flyteadmin/pkg/repositories/interfaces"
"google.golang.org/grpc/codes"
"gorm.io/gorm"
)
const Project = "project"
const Domain = "domain"
const Name = "name"
const Description = "description"
const ResourceType = "resource_type"
const State = "state"
const ID = "id"
const executionTableName = "executions"
const namedEntityMetadataTableName = "named_entity_metadata"
const nodeExecutionTableName = "node_executions"
const nodeExecutionEventTableName = "node_event_executions"
const taskExecutionTableName = "task_executions"
const taskTableName = "tasks"
const limit = "limit"
const filters = "filters"
var identifierGroupBy = fmt.Sprintf("%s, %s, %s", Project, Domain, Name)
var entityToTableName = map[common.Entity]string{
common.Execution: "executions",
common.LaunchPlan: "launch_plans",
common.NodeExecution: "node_executions",
common.NodeExecutionEvent: "node_execution_events",
common.Task: "tasks",
common.TaskExecution: "task_executions",
common.Workflow: "workflows",
common.NamedEntity: "entities",
common.NamedEntityMetadata: "named_entity_metadata",
}
var innerJoinNodeExecToNodeEvents = fmt.Sprintf(
"INNER JOIN %s ON %s.node_execution_id = %s.id",
nodeExecutionTableName, nodeExecutionEventTableName, nodeExecutionTableName)
var innerJoinExecToNodeExec = fmt.Sprintf(
"INNER JOIN %s ON %s.execution_project = %s.execution_project AND "+
"%s.execution_domain = %s.execution_domain AND %s.execution_name = %s.execution_name",
executionTableName, nodeExecutionTableName, executionTableName, nodeExecutionTableName, executionTableName,
nodeExecutionTableName, executionTableName)
var innerJoinNodeExecToTaskExec = fmt.Sprintf(
"INNER JOIN %s ON %s.node_id = %s.node_id AND %s.execution_project = %s.execution_project AND "+
"%s.execution_domain = %s.execution_domain AND %s.execution_name = %s.execution_name",
nodeExecutionTableName, taskExecutionTableName, nodeExecutionTableName, taskExecutionTableName,
nodeExecutionTableName, taskExecutionTableName, nodeExecutionTableName, taskExecutionTableName,
nodeExecutionTableName)
// Because dynamic tasks do NOT necessarily register static task definitions, we use a left join to not exclude
// dynamic tasks from list queries.
var leftJoinTaskToTaskExec = fmt.Sprintf(
"LEFT JOIN %s ON %s.project = %s.project AND %s.domain = %s.domain AND %s.name = %s.name AND "+
"%s.version = %s.version",
taskTableName, taskExecutionTableName, taskTableName, taskExecutionTableName, taskTableName,
taskExecutionTableName, taskTableName, taskExecutionTableName, taskTableName)
// Validates there are no missing but required parameters in ListResourceInput
func ValidateListInput(input interfaces.ListResourceInput) adminErrors.FlyteAdminError {
if input.Limit == 0 {
return errors.GetInvalidInputError(limit)
}
if len(input.InlineFilters) == 0 {
return errors.GetInvalidInputError(filters)
}
return nil
}
func applyFilters(tx *gorm.DB, inlineFilters []common.InlineFilter, mapFilters []common.MapFilter) (*gorm.DB, error) {
for _, filter := range inlineFilters {
gormQueryExpr, err := filter.GetGormQueryExpr()
if err != nil {
return nil, errors.GetInvalidInputError(err.Error())
}
tx = tx.Where(gormQueryExpr.Query, gormQueryExpr.Args)
}
for _, mapFilter := range mapFilters {
tx = tx.Where(mapFilter.GetFilter())
}
return tx, nil
}
func applyScopedFilters(tx *gorm.DB, inlineFilters []common.InlineFilter, mapFilters []common.MapFilter) (*gorm.DB, error) {
for _, filter := range inlineFilters {
tableName, ok := entityToTableName[filter.GetEntity()]
if !ok {
return nil, adminErrors.NewFlyteAdminErrorf(codes.InvalidArgument,
"unrecognized entity in filter expression: %v", filter.GetEntity())
}
gormQueryExpr, err := filter.GetGormJoinTableQueryExpr(tableName)
if err != nil {
return nil, err
}
tx = tx.Where(gormQueryExpr.Query, gormQueryExpr.Args)
}
for _, mapFilter := range mapFilters {
tx = tx.Where(mapFilter.GetFilter())
}
return tx, nil
}