This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 63
/
execution.go
372 lines (343 loc) · 14 KB
/
execution.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
package transformers
import (
"context"
"fmt"
"time"
"github.com/flyteorg/flyteadmin/pkg/common"
"github.com/flyteorg/flyteadmin/pkg/errors"
"github.com/flyteorg/flyteadmin/pkg/repositories/models"
"github.com/flyteorg/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/storage"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"google.golang.org/grpc/codes"
"google.golang.org/protobuf/types/known/timestamppb"
"k8s.io/apimachinery/pkg/util/sets"
)
var clusterReassignablePhases = sets.NewString(core.WorkflowExecution_UNDEFINED.String(), core.WorkflowExecution_QUEUED.String())
// CreateExecutionModelInput encapsulates request parameters for calls to CreateExecutionModel.
type CreateExecutionModelInput struct {
WorkflowExecutionID core.WorkflowExecutionIdentifier
RequestSpec *admin.ExecutionSpec
LaunchPlanID uint
WorkflowID uint
TaskID uint
Phase core.WorkflowExecution_Phase
CreatedAt time.Time
Notifications []*admin.Notification
WorkflowIdentifier *core.Identifier
ParentNodeExecutionID uint
SourceExecutionID uint
Cluster string
InputsURI storage.DataReference
UserInputsURI storage.DataReference
SecurityContext *core.SecurityContext
}
// CreateExecutionModel transforms a ExecutionCreateRequest to a Execution model
func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, error) {
requestSpec := input.RequestSpec
if requestSpec.Metadata == nil {
requestSpec.Metadata = &admin.ExecutionMetadata{}
}
requestSpec.Metadata.SystemMetadata = &admin.SystemMetadata{
ExecutionCluster: input.Cluster,
}
requestSpec.SecurityContext = input.SecurityContext
spec, err := proto.Marshal(requestSpec)
if err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "Failed to serialize execution spec: %v", err)
}
createdAt, err := ptypes.TimestampProto(input.CreatedAt)
if err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to serialize execution created at time")
}
closure := admin.ExecutionClosure{
Phase: input.Phase,
CreatedAt: createdAt,
UpdatedAt: createdAt,
Notifications: input.Notifications,
WorkflowId: input.WorkflowIdentifier,
StateChangeDetails: &admin.ExecutionStateChangeDetails{
State: admin.ExecutionState_EXECUTION_ACTIVE,
Principal: requestSpec.Metadata.Principal,
OccurredAt: createdAt,
},
}
if input.Phase == core.WorkflowExecution_RUNNING {
closure.StartedAt = createdAt
}
closureBytes, err := proto.Marshal(&closure)
if err != nil {
return nil, errors.NewFlyteAdminError(codes.Internal, "Failed to serialize launch plan status")
}
activeExecution := int32(admin.ExecutionState_EXECUTION_ACTIVE)
executionModel := &models.Execution{
ExecutionKey: models.ExecutionKey{
Project: input.WorkflowExecutionID.Project,
Domain: input.WorkflowExecutionID.Domain,
Name: input.WorkflowExecutionID.Name,
},
Spec: spec,
Phase: input.Phase.String(),
Closure: closureBytes,
WorkflowID: input.WorkflowID,
ExecutionCreatedAt: &input.CreatedAt,
ExecutionUpdatedAt: &input.CreatedAt,
ParentNodeExecutionID: input.ParentNodeExecutionID,
SourceExecutionID: input.SourceExecutionID,
Cluster: input.Cluster,
InputsURI: input.InputsURI,
UserInputsURI: input.UserInputsURI,
User: requestSpec.Metadata.Principal,
State: &activeExecution,
}
// A reference launch entity can be one of either or a task OR launch plan. Traditionally, workflows are executed
// with a reference launch plan which is why this behavior is the default below.
if input.TaskID > 0 {
executionModel.TaskID = input.TaskID
} else {
executionModel.LaunchPlanID = input.LaunchPlanID
}
if input.RequestSpec.Metadata != nil {
executionModel.Mode = int32(input.RequestSpec.Metadata.Mode)
}
return executionModel, nil
}
func reassignCluster(ctx context.Context, cluster string, executionID *core.WorkflowExecutionIdentifier, execution *models.Execution) error {
logger.Debugf(ctx, "Updating cluster for execution [%v] with existing recorded cluster [%s] and setting to cluster [%s]",
executionID, execution.Cluster, cluster)
execution.Cluster = cluster
var executionSpec admin.ExecutionSpec
err := proto.Unmarshal(execution.Spec, &executionSpec)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to unmarshal execution spec: %v", err)
}
if executionSpec.Metadata == nil {
executionSpec.Metadata = &admin.ExecutionMetadata{}
}
if executionSpec.Metadata.SystemMetadata == nil {
executionSpec.Metadata.SystemMetadata = &admin.SystemMetadata{}
}
executionSpec.Metadata.SystemMetadata.ExecutionCluster = cluster
marshaledSpec, err := proto.Marshal(&executionSpec)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to marshal execution spec: %v", err)
}
execution.Spec = marshaledSpec
return nil
}
// Updates an existing model given a WorkflowExecution event.
func UpdateExecutionModelState(
ctx context.Context,
execution *models.Execution, request admin.WorkflowExecutionEventRequest,
inlineEventDataPolicy interfaces.InlineEventDataPolicy, storageClient *storage.DataStore) error {
var executionClosure admin.ExecutionClosure
err := proto.Unmarshal(execution.Closure, &executionClosure)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to unmarshal execution closure: %v", err)
}
executionClosure.Phase = request.Event.Phase
executionClosure.UpdatedAt = request.Event.OccurredAt
execution.Phase = request.Event.Phase.String()
occurredAtTimestamp, err := ptypes.Timestamp(request.Event.OccurredAt)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to parse OccurredAt: %v", err)
}
execution.ExecutionUpdatedAt = &occurredAtTimestamp
// only mark the execution started when we get the initial running event
if request.Event.Phase == core.WorkflowExecution_RUNNING {
execution.StartedAt = &occurredAtTimestamp
executionClosure.StartedAt = request.Event.OccurredAt
} else if common.IsExecutionTerminal(request.Event.Phase) {
if execution.StartedAt != nil {
execution.Duration = occurredAtTimestamp.Sub(*execution.StartedAt)
executionClosure.Duration = ptypes.DurationProto(execution.Duration)
} else {
logger.Infof(context.Background(),
"Cannot compute duration because startedAt was never set, requestId: %v", request.RequestId)
}
}
// Default or empty cluster values do not require updating the execution model.
ignoreClusterFromEvent := len(request.Event.ProducerId) == 0 || request.Event.ProducerId == common.DefaultProducerID
logger.Debugf(ctx, "Producer Id [%v]. IgnoreClusterFromEvent [%v]", request.Event.ProducerId, ignoreClusterFromEvent)
if !ignoreClusterFromEvent {
if clusterReassignablePhases.Has(execution.Phase) {
if err := reassignCluster(ctx, request.Event.ProducerId, request.Event.ExecutionId, execution); err != nil {
return err
}
} else if execution.Cluster != request.Event.ProducerId {
errorMsg := fmt.Sprintf("Cannot accept events for running/terminated execution [%v] from cluster [%s],"+
"expected events to originate from [%s]",
request.Event.ExecutionId, request.Event.ProducerId, execution.Cluster)
return errors.NewIncompatibleClusterError(ctx, errorMsg, execution.Cluster)
}
}
if request.Event.GetOutputUri() != "" {
executionClosure.OutputResult = &admin.ExecutionClosure_Outputs{
Outputs: &admin.LiteralMapBlob{
Data: &admin.LiteralMapBlob_Uri{
Uri: request.Event.GetOutputUri(),
},
},
}
} else if request.Event.GetOutputData() != nil {
switch inlineEventDataPolicy {
case interfaces.InlineEventDataPolicyStoreInline:
executionClosure.OutputResult = &admin.ExecutionClosure_OutputData{
OutputData: request.Event.GetOutputData(),
}
default:
logger.Debugf(ctx, "Offloading outputs per InlineEventDataPolicy")
uri, err := common.OffloadLiteralMap(ctx, storageClient, request.Event.GetOutputData(),
request.Event.ExecutionId.Project, request.Event.ExecutionId.Domain, request.Event.ExecutionId.Name, OutputsObjectSuffix)
if err != nil {
return err
}
executionClosure.OutputResult = &admin.ExecutionClosure_Outputs{
Outputs: &admin.LiteralMapBlob{
Data: &admin.LiteralMapBlob_Uri{
Uri: uri.String(),
},
},
}
}
} else if request.Event.GetError() != nil {
executionClosure.OutputResult = &admin.ExecutionClosure_Error{
Error: request.Event.GetError(),
}
k := request.Event.GetError().Kind.String()
execution.ErrorKind = &k
execution.ErrorCode = &request.Event.GetError().Code
}
marshaledClosure, err := proto.Marshal(&executionClosure)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to marshal execution closure: %v", err)
}
execution.Closure = marshaledClosure
return nil
}
// UpdateExecutionModelStateChangeDetails Updates an existing model with stateUpdateTo, stateUpdateBy and
// statedUpdatedAt details from the request
func UpdateExecutionModelStateChangeDetails(executionModel *models.Execution, stateUpdatedTo admin.ExecutionState,
stateUpdatedAt time.Time, stateUpdatedBy string) error {
var closure admin.ExecutionClosure
err := proto.Unmarshal(executionModel.Closure, &closure)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to unmarshal execution closure: %v", err)
}
// Update the indexed columns
stateInt := int32(stateUpdatedTo)
executionModel.State = &stateInt
// Update the closure with the same
var stateUpdatedAtProto *timestamppb.Timestamp
// Default use the createdAt timestamp as the state change occurredAt time
if stateUpdatedAtProto, err = ptypes.TimestampProto(stateUpdatedAt); err != nil {
return err
}
closure.StateChangeDetails = &admin.ExecutionStateChangeDetails{
State: stateUpdatedTo,
Principal: stateUpdatedBy,
OccurredAt: stateUpdatedAtProto,
}
marshaledClosure, err := proto.Marshal(&closure)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to marshal execution closure: %v", err)
}
executionModel.Closure = marshaledClosure
return nil
}
// The execution abort metadata is recorded but the phase is not actually updated *until* the abort event is propagated
// by flytepropeller. The metadata is preemptively saved at the time of the abort.
func SetExecutionAborting(execution *models.Execution, cause, principal string) error {
var closure admin.ExecutionClosure
err := proto.Unmarshal(execution.Closure, &closure)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to unmarshal execution closure: %v", err)
}
closure.OutputResult = &admin.ExecutionClosure_AbortMetadata{
AbortMetadata: &admin.AbortMetadata{
Cause: cause,
Principal: principal,
},
}
closure.Phase = core.WorkflowExecution_ABORTING
marshaledClosure, err := proto.Marshal(&closure)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to marshal execution closure: %v", err)
}
execution.Closure = marshaledClosure
execution.AbortCause = cause
execution.Phase = core.WorkflowExecution_ABORTING.String()
return nil
}
func GetExecutionIdentifier(executionModel *models.Execution) core.WorkflowExecutionIdentifier {
return core.WorkflowExecutionIdentifier{
Project: executionModel.Project,
Domain: executionModel.Domain,
Name: executionModel.Name,
}
}
func FromExecutionModel(executionModel models.Execution) (*admin.Execution, error) {
var spec admin.ExecutionSpec
var err error
if err = proto.Unmarshal(executionModel.Spec, &spec); err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal spec")
}
var closure admin.ExecutionClosure
if err = proto.Unmarshal(executionModel.Closure, &closure); err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal closure")
}
if closure.StateChangeDetails == nil {
// Update execution state details from model for older executions
if closure.StateChangeDetails, err = PopulateDefaultStateChangeDetails(executionModel); err != nil {
return nil, err
}
}
id := GetExecutionIdentifier(&executionModel)
if executionModel.Phase == core.WorkflowExecution_ABORTED.String() && closure.GetAbortMetadata() == nil {
// In the case of data predating the AbortMetadata field we manually set it in the closure only
// if it does not yet exist.
closure.OutputResult = &admin.ExecutionClosure_AbortMetadata{
AbortMetadata: &admin.AbortMetadata{
Cause: executionModel.AbortCause,
},
}
}
// TODO: Clear deprecated fields to reduce message size.
// spec.Inputs = nil
// closure.ComputedInputs = nil
return &admin.Execution{
Id: &id,
Spec: &spec,
Closure: &closure,
}, nil
}
// PopulateDefaultStateChangeDetails used to populate execution state change details for older executions which donot
// have these details captured. Hence we construct a default state change details from existing data model.
func PopulateDefaultStateChangeDetails(executionModel models.Execution) (*admin.ExecutionStateChangeDetails, error) {
var err error
var occurredAt *timestamppb.Timestamp
// Default use the createdAt timestamp as the state change occurredAt time
if occurredAt, err = ptypes.TimestampProto(executionModel.CreatedAt); err != nil {
return nil, err
}
return &admin.ExecutionStateChangeDetails{
State: admin.ExecutionState_EXECUTION_ACTIVE,
OccurredAt: occurredAt,
Principal: executionModel.User,
}, nil
}
func FromExecutionModels(executionModels []models.Execution) ([]*admin.Execution, error) {
executions := make([]*admin.Execution, len(executionModels))
for idx, executionModel := range executionModels {
execution, err := FromExecutionModel(executionModel)
if err != nil {
return nil, err
}
executions[idx] = execution
}
return executions, nil
}