This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 63
/
named_entity_manager.go
178 lines (156 loc) · 5.98 KB
/
named_entity_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
package impl
import (
"context"
"strconv"
"strings"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flytestdlib/contextutils"
"github.com/flyteorg/flyteadmin/pkg/common"
"github.com/flyteorg/flyteadmin/pkg/errors"
"google.golang.org/grpc/codes"
"github.com/flyteorg/flyteadmin/pkg/manager/impl/util"
"github.com/flyteorg/flyteadmin/pkg/manager/impl/validation"
"github.com/flyteorg/flyteadmin/pkg/manager/interfaces"
repoInterfaces "github.com/flyteorg/flyteadmin/pkg/repositories/interfaces"
"github.com/flyteorg/flyteadmin/pkg/repositories/transformers"
runtimeInterfaces "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/promutils"
)
const state = "state"
// System-generated workflows are meant to be hidden from the user by default. Therefore we always only show
// workflow-type named entities that have been user generated only.
var nonSystemGeneratedWorkflowsFilter, _ = common.NewSingleValueFilter(
common.NamedEntityMetadata, common.NotEqual, state, admin.NamedEntityState_SYSTEM_GENERATED)
var defaultWorkflowsFilter, _ = common.NewWithDefaultValueFilter(
strconv.Itoa(int(admin.NamedEntityState_NAMED_ENTITY_ACTIVE)), nonSystemGeneratedWorkflowsFilter)
type NamedEntityMetrics struct {
Scope promutils.Scope
}
type NamedEntityManager struct {
db repoInterfaces.Repository
config runtimeInterfaces.Configuration
metrics NamedEntityMetrics
}
func (m *NamedEntityManager) UpdateNamedEntity(ctx context.Context, request admin.NamedEntityUpdateRequest) (
*admin.NamedEntityUpdateResponse, error) {
if err := validation.ValidateNamedEntityUpdateRequest(request); err != nil {
logger.Debugf(ctx, "invalid request [%+v]: %v", request, err)
return nil, err
}
ctx = contextutils.WithProjectDomain(ctx, request.Id.Project, request.Id.Domain)
// Ensure entity exists before trying to update it
_, err := util.GetNamedEntity(ctx, m.db, request.ResourceType, *request.Id)
if err != nil {
return nil, err
}
metadataModel := transformers.CreateNamedEntityModel(&request)
err = m.db.NamedEntityRepo().Update(ctx, metadataModel)
if err != nil {
logger.Debugf(ctx, "Failed to update named_entity for [%+v] with err %v", request.Id, err)
return nil, err
}
return &admin.NamedEntityUpdateResponse{}, nil
}
func (m *NamedEntityManager) GetNamedEntity(ctx context.Context, request admin.NamedEntityGetRequest) (
*admin.NamedEntity, error) {
if err := validation.ValidateNamedEntityGetRequest(request); err != nil {
logger.Debugf(ctx, "invalid request [%+v]: %v", request, err)
return nil, err
}
ctx = contextutils.WithProjectDomain(ctx, request.Id.Project, request.Id.Domain)
return util.GetNamedEntity(ctx, m.db, request.ResourceType, *request.Id)
}
func (m *NamedEntityManager) getQueryFilters(referenceEntity core.ResourceType, requestFilters string) ([]common.InlineFilter, error) {
filters := make([]common.InlineFilter, 0)
if referenceEntity == core.ResourceType_WORKFLOW {
filters = append(filters, defaultWorkflowsFilter)
}
if len(requestFilters) == 0 {
return filters, nil
}
additionalFilters, err := util.ParseFilters(requestFilters, common.NamedEntity)
if err != nil {
return nil, err
}
for _, filter := range additionalFilters {
if strings.Contains(filter.GetField(), state) {
filterWithDefaultValue, err := common.NewWithDefaultValueFilter(
strconv.Itoa(int(admin.NamedEntityState_NAMED_ENTITY_ACTIVE)), filter)
if err != nil {
return nil, err
}
filters = append(filters, filterWithDefaultValue)
} else {
filters = append(filters, filter)
}
}
return filters, nil
}
func (m *NamedEntityManager) ListNamedEntities(ctx context.Context, request admin.NamedEntityListRequest) (
*admin.NamedEntityList, error) {
if err := validation.ValidateNamedEntityListRequest(request); err != nil {
logger.Debugf(ctx, "invalid request [%+v]: %v", request, err)
return nil, err
}
ctx = contextutils.WithProjectDomain(ctx, request.Project, request.Domain)
// HACK: In order to filter by state (if requested) - we need to amend the filter to use COALESCE
// e.g. eq(state, 1) becomes 'WHERE (COALESCE(state, 0) = '1')' since not every NamedEntity necessarily
// has an entry, and therefore the default state value '0' (active), should be assumed.
filters, err := m.getQueryFilters(request.ResourceType, request.Filters)
if err != nil {
return nil, err
}
var sortParameter common.SortParameter
if request.SortBy != nil {
sortParameter, err = common.NewSortParameter(*request.SortBy)
if err != nil {
return nil, err
}
}
offset, err := validation.ValidateToken(request.Token)
if err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument,
"invalid pagination token %s for ListNamedEntities", request.Token)
}
listInput := repoInterfaces.ListNamedEntityInput{
ListResourceInput: repoInterfaces.ListResourceInput{
Limit: int(request.Limit),
Offset: offset,
InlineFilters: filters,
SortParameter: sortParameter,
},
Project: request.Project,
Domain: request.Domain,
ResourceType: request.ResourceType,
}
output, err := m.db.NamedEntityRepo().List(ctx, listInput)
if err != nil {
logger.Debugf(ctx, "Failed to list named entities of type: %s with project: %s, domain: %s. Returned error was: %v",
request.ResourceType, request.Project, request.Domain, err)
return nil, err
}
var token string
if len(output.Entities) == int(request.Limit) {
token = strconv.Itoa(offset + len(output.Entities))
}
entities := transformers.FromNamedEntityModels(output.Entities)
return &admin.NamedEntityList{
Entities: entities,
Token: token,
}, nil
}
func NewNamedEntityManager(
db repoInterfaces.Repository,
config runtimeInterfaces.Configuration,
scope promutils.Scope) interfaces.NamedEntityInterface {
metrics := NamedEntityMetrics{
Scope: scope,
}
return &NamedEntityManager{
db: db,
config: config,
metrics: metrics,
}
}