-
Notifications
You must be signed in to change notification settings - Fork 546
/
resourcemanager.go
154 lines (129 loc) · 5.98 KB
/
resourcemanager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package resourcemanager
import (
"context"
"fmt"
"sync"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/event"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
pluginCore "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyte/flytestdlib/promutils"
)
type TokenPrefix string
type Token string
const execUrnPrefix = "ex"
const execUrnSeparator = ":"
const tokenNamespaceSeparator = "-"
// Prepending the prefix to the actual token string using '-' separator.
// The output is of type Token
func (t Token) prepend(prefix TokenPrefix) Token {
return Token(fmt.Sprintf("%s%s%s", prefix, tokenNamespaceSeparator, t))
}
// Extending the prefix using ':' separator. The output is still of type TokenPrefix
func (t TokenPrefix) extend(prefixPart string) TokenPrefix {
return TokenPrefix(fmt.Sprintf("%s%s%s", t, execUrnSeparator, prefixPart))
}
func composeProjectScopePrefix(id *core.TaskExecutionIdentifier) TokenPrefix {
return TokenPrefix(execUrnPrefix).extend(id.GetNodeExecutionId().GetExecutionId().GetProject())
}
func composeNamespaceScopePrefix(id *core.TaskExecutionIdentifier) TokenPrefix {
return composeProjectScopePrefix(id).extend(id.GetNodeExecutionId().GetExecutionId().GetDomain())
}
func composeExecutionScopePrefix(id *core.TaskExecutionIdentifier) TokenPrefix {
return composeNamespaceScopePrefix(id).extend(id.GetNodeExecutionId().GetExecutionId().GetName())
}
func ComposeTokenPrefix(id *core.TaskExecutionIdentifier) TokenPrefix {
return composeExecutionScopePrefix(id) // Token prefix is a required part of the token. We leverage the prefix to achieve project-level and namespace-level capping
}
// This struct is designed to serve as the identifier of an user of resource manager
type Resource struct {
quota BaseResourceConstraint
metrics Metrics
rejectedTokens sync.Map
}
type Metrics interface {
GetScope() promutils.Scope
}
type Builder interface {
GetID() string
GetResourceRegistrar(namespacePrefix pluginCore.ResourceNamespace) pluginCore.ResourceRegistrar
BuildResourceManager(ctx context.Context) (BaseResourceManager, error)
}
// A proxy will be created for each TaskExecutionContext.
// The Proxy of an execution contains the resource namespace prefix (e.g., "qubole-hive-executor" for a hive task)
// and the token prefix (e.g., ex:<project>:<domain>:<exec_id>) of that execution.
// The plugins will only have access to a Proxy but not directly the underlying resource manager.
// The Proxy will prepend proper prefixes for the resource namespace and the allocation token.
type Proxy struct {
// pluginCore.ResourceManager
BaseResourceManager
ResourceNamespacePrefix pluginCore.ResourceNamespace
ExecutionIdentifier *core.TaskExecutionIdentifier
ResourcePoolInfo map[string]*event.ResourcePoolInfo
}
type TaskResourceManager interface {
pluginCore.ResourceManager
GetResourcePoolInfo() []*event.ResourcePoolInfo
}
func (p Proxy) ComposeResourceConstraint(spec pluginCore.ResourceConstraintsSpec) []FullyQualifiedResourceConstraint {
composedResourceConstraintList := make([]FullyQualifiedResourceConstraint, 0)
if spec.ProjectScopeResourceConstraint != nil {
composedResourceConstraintList = append(composedResourceConstraintList, composeFullyQualifiedProjectScopeResourceConstraint(spec, p.ExecutionIdentifier))
}
if spec.NamespaceScopeResourceConstraint != nil {
composedResourceConstraintList = append(composedResourceConstraintList, composeFullyQualifiedNamespaceScopeResourceConstraint(spec, p.ExecutionIdentifier))
}
return composedResourceConstraintList
}
func (p Proxy) AllocateResource(ctx context.Context, namespace pluginCore.ResourceNamespace,
allocationToken string, constraintsSpec pluginCore.ResourceConstraintsSpec) (pluginCore.AllocationStatus, error) {
composedResourceConstraintList := p.ComposeResourceConstraint(constraintsSpec)
status, err := p.BaseResourceManager.AllocateResource(ctx,
p.ResourceNamespacePrefix.CreateSubNamespace(namespace),
Token(allocationToken).prepend(ComposeTokenPrefix(p.ExecutionIdentifier)),
composedResourceConstraintList)
if err != nil {
return status, err
}
p.ResourcePoolInfo[allocationToken] = &event.ResourcePoolInfo{
AllocationToken: allocationToken,
Namespace: string(namespace),
}
return status, err
}
func (p Proxy) ReleaseResource(ctx context.Context, namespace pluginCore.ResourceNamespace,
allocationToken string) error {
err := p.BaseResourceManager.ReleaseResource(ctx,
p.ResourceNamespacePrefix.CreateSubNamespace(namespace),
Token(allocationToken).prepend(ComposeTokenPrefix(p.ExecutionIdentifier)))
return err
}
func (p Proxy) GetResourcePoolInfo() []*event.ResourcePoolInfo {
response := make([]*event.ResourcePoolInfo, 0, len(p.ResourcePoolInfo))
for _, resourcePoolInfo := range p.ResourcePoolInfo {
response = append(response, resourcePoolInfo)
}
return response
}
func GetTaskResourceManager(r BaseResourceManager, resourceNamespacePrefix pluginCore.ResourceNamespace,
id *core.TaskExecutionIdentifier) TaskResourceManager {
return Proxy{
BaseResourceManager: r,
ResourceNamespacePrefix: resourceNamespacePrefix,
ExecutionIdentifier: id,
ResourcePoolInfo: make(map[string]*event.ResourcePoolInfo),
}
}
// The Proxy will prepend a proper prefix for the resource namespace.
type ResourceRegistrarProxy struct {
pluginCore.ResourceRegistrar
ResourceNamespacePrefix pluginCore.ResourceNamespace
}
func (p ResourceRegistrarProxy) RegisterResourceQuota(ctx context.Context, namespace pluginCore.ResourceNamespace, quota int) error {
return p.ResourceRegistrar.RegisterResourceQuota(ctx,
p.ResourceNamespacePrefix.CreateSubNamespace(namespace), quota)
}
type BaseResourceManager interface {
GetID() string
AllocateResource(ctx context.Context, namespace pluginCore.ResourceNamespace, allocationToken Token, constraints []FullyQualifiedResourceConstraint) (pluginCore.AllocationStatus, error)
ReleaseResource(ctx context.Context, namespace pluginCore.ResourceNamespace, allocationToken Token) error
}