-
Notifications
You must be signed in to change notification settings - Fork 546
/
workflow_event_recorder.go
85 lines (74 loc) · 3.25 KB
/
workflow_event_recorder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package events
import (
"context"
"strings"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/event"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/flyteorg/flyte/flytestdlib/storage"
)
//go:generate mockery -all -output=mocks -case=underscore
// Recorder for Workflow events
type WorkflowEventRecorder interface {
// Records workflow execution events indicating the workflow has undergone a phase change and additional metadata.
RecordWorkflowEvent(ctx context.Context, event *event.WorkflowExecutionEvent, eventConfig *config.EventConfig) error
}
type workflowEventRecorder struct {
eventRecorder EventRecorder
store *storage.DataStore
}
// In certain cases, a successful workflow execution event can be configured to include raw output data inline. However,
// for large outputs these events may exceed the event recipient's message size limit, so we fallback to passing
// the offloaded output URI instead.
func (r *workflowEventRecorder) handleFailure(ctx context.Context, ev *event.WorkflowExecutionEvent, err error) error {
st, ok := status.FromError(err)
if !ok || st.Code() != codes.ResourceExhausted {
// Error was not a status error
return err
}
if !strings.HasPrefix(st.Message(), "message too large") {
return err
}
// This time, we attempt to record the event with the output URI set.
return r.eventRecorder.RecordWorkflowEvent(ctx, ev)
}
func (r *workflowEventRecorder) RecordWorkflowEvent(ctx context.Context, ev *event.WorkflowExecutionEvent, eventConfig *config.EventConfig) error {
var origEvent = ev
var rawOutputPolicy = eventConfig.RawOutputPolicy
if rawOutputPolicy == config.RawOutputPolicyInline && len(ev.GetOutputUri()) > 0 {
outputs := &core.LiteralMap{}
err := r.store.ReadProtobuf(ctx, storage.DataReference(ev.GetOutputUri()), outputs)
if err != nil {
// Fall back to forwarding along outputs by reference when we can't fetch them.
logger.Warnf(ctx, "failed to fetch outputs by ref [%s] to send inline with err: %v", ev.GetOutputUri(), err)
rawOutputPolicy = config.RawOutputPolicyReference
} else {
origEvent = proto.Clone(ev).(*event.WorkflowExecutionEvent)
ev.OutputResult = &event.WorkflowExecutionEvent_OutputData{
OutputData: outputs,
}
}
}
err := r.eventRecorder.RecordWorkflowEvent(ctx, ev)
if err != nil {
logger.Infof(ctx, "Failed to record workflow event [%+v] with err: %v", ev, err)
// Only attempt to retry sending an event in the case we tried to send raw output data inline
if eventConfig.FallbackToOutputReference && rawOutputPolicy == config.RawOutputPolicyInline {
logger.Infof(ctx, "Falling back to sending workflow event outputs by reference for [%+v]", ev.ExecutionId)
return r.handleFailure(ctx, origEvent, err)
}
return err
}
return nil
}
func NewWorkflowEventRecorder(eventSink EventSink, scope promutils.Scope, store *storage.DataStore) WorkflowEventRecorder {
return &workflowEventRecorder{
eventRecorder: NewEventRecorder(eventSink, scope),
store: store,
}
}