This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 63
/
flyte_url.go
154 lines (128 loc) · 5.02 KB
/
flyte_url.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package common
import (
"fmt"
"regexp"
"strconv"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
)
// transform to snake case to make lower case
//go:generate enumer --type=ArtifactType --trimprefix=ArtifactType -transform=snake
type ArtifactType int
// The suffixes in these constants are used to match against the tail end of the flyte url, to keep tne flyte url simpler
const (
ArtifactTypeUndefined ArtifactType = iota
ArtifactTypeI // inputs
ArtifactTypeO // outputs
ArtifactTypeD // deck
)
var re = regexp.MustCompile("flyte://v1/(?P<project>[a-zA-Z0-9_-]+)/(?P<domain>[a-zA-Z0-9_-]+)/(?P<exec>[a-zA-Z0-9_-]+)/(?P<node>[a-zA-Z0-9_-]+)(?:/(?P<attempt>[0-9]+))?/(?P<ioType>[iod])$")
var reSpecificOutput = regexp.MustCompile("flyte://v1/(?P<project>[a-zA-Z0-9_-]+)/(?P<domain>[a-zA-Z0-9_-]+)/(?P<exec>[a-zA-Z0-9_-]+)/(?P<node>[a-zA-Z0-9_-]+)(?:/(?P<attempt>[0-9]+))?/(?P<ioType>[io])/(?P<literalName>[a-zA-Z0-9_-]+)$")
func MatchRegex(reg *regexp.Regexp, input string) map[string]string {
names := reg.SubexpNames()
res := reg.FindAllStringSubmatch(input, -1)
if len(res) == 0 {
return nil
}
dict := make(map[string]string, len(names))
for i := 1; i < len(res[0]); i++ {
dict[names[i]] = res[0][i]
}
return dict
}
type ParsedExecution struct {
// Returned when the user does not request a specific attempt
NodeExecID *core.NodeExecutionIdentifier
// This is returned in the case where the user requested a specific attempt. But the TaskID portion of this
// will be empty since that information is not encapsulated in the flyte url.
PartialTaskExecID *core.TaskExecutionIdentifier
// The name of the input or output in the literal map
LiteralName string
IOType ArtifactType
}
func tryMatches(flyteURL string) map[string]string {
var matches map[string]string
if matches = MatchRegex(re, flyteURL); len(matches) > 0 {
return matches
} else if matches = MatchRegex(reSpecificOutput, flyteURL); len(matches) > 0 {
return matches
}
return nil
}
func ParseFlyteURLToExecution(flyteURL string) (ParsedExecution, error) {
// flyteURL can be of the following forms
// flyte://v1/project/domain/execution_id/node_id/attempt/[iod]
// flyte://v1/project/domain/execution_id/node_id/attempt/[io]/output_name
// flyte://v1/project/domain/execution_id/node_id/[io]/output_name
// flyte://v1/project/domain/execution_id/node_id/[iod]
// where i stands for inputs.pb o for outputs.pb and d for the flyte deck
// If the retry attempt is missing, the io requested is assumed to be for the node instead of the task execution
matches := tryMatches(flyteURL)
if matches == nil {
return ParsedExecution{}, fmt.Errorf("failed to parse [%s]", flyteURL)
}
proj := matches["project"]
domain := matches["domain"]
executionID := matches["exec"]
nodeID := matches["node"]
ioType, err := ArtifactTypeString(matches["ioType"])
if err != nil {
return ParsedExecution{}, err
}
literalName := matches["literalName"] // may be empty
// node and task level outputs
nodeExecID := core.NodeExecutionIdentifier{
NodeId: nodeID,
ExecutionId: &core.WorkflowExecutionIdentifier{
Project: proj,
Domain: domain,
Name: executionID,
},
}
// if attempt is there, that means a task execution
if attempt := matches["attempt"]; len(attempt) > 0 {
a, err := strconv.Atoi(attempt)
if err != nil {
return ParsedExecution{}, fmt.Errorf("failed to parse attempt [%v], %v", attempt, err)
}
taskExecID := core.TaskExecutionIdentifier{
NodeExecutionId: &nodeExecID,
// checking for overflow here is probably unreasonable
RetryAttempt: uint32(a),
}
return ParsedExecution{
PartialTaskExecID: &taskExecID,
IOType: ioType,
LiteralName: literalName,
}, nil
}
return ParsedExecution{
NodeExecID: &nodeExecID,
IOType: ioType,
LiteralName: literalName,
}, nil
}
func FlyteURLsFromNodeExecutionID(nodeExecutionID core.NodeExecutionIdentifier, deck bool) *admin.FlyteURLs {
base := fmt.Sprintf("flyte://v1/%s/%s/%s/%s", nodeExecutionID.ExecutionId.Project,
nodeExecutionID.ExecutionId.Domain, nodeExecutionID.ExecutionId.Name, nodeExecutionID.NodeId)
res := &admin.FlyteURLs{
Inputs: fmt.Sprintf("%s/%s", base, ArtifactTypeI),
Outputs: fmt.Sprintf("%s/%s", base, ArtifactTypeO),
}
if deck {
res.Deck = fmt.Sprintf("%s/%s", base, ArtifactTypeD)
}
return res
}
func FlyteURLsFromTaskExecutionID(taskExecutionID core.TaskExecutionIdentifier, deck bool) *admin.FlyteURLs {
base := fmt.Sprintf("flyte://v1/%s/%s/%s/%s/%s", taskExecutionID.NodeExecutionId.ExecutionId.Project,
taskExecutionID.NodeExecutionId.ExecutionId.Domain, taskExecutionID.NodeExecutionId.ExecutionId.Name, taskExecutionID.NodeExecutionId.NodeId, strconv.Itoa(int(taskExecutionID.RetryAttempt)))
res := &admin.FlyteURLs{
Inputs: fmt.Sprintf("%s/%s", base, ArtifactTypeI),
Outputs: fmt.Sprintf("%s/%s", base, ArtifactTypeO),
}
if deck {
res.Deck = fmt.Sprintf("%s/%s", base, ArtifactTypeD)
}
return res
}