This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 63
/
cloudevent_publisher.go
118 lines (99 loc) · 3.57 KB
/
cloudevent_publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package implementations
import (
"context"
"fmt"
"reflect"
"time"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyteadmin/pkg/async/notifications/implementations"
cloudevents "github.com/cloudevents/sdk-go/v2"
"k8s.io/apimachinery/pkg/util/sets"
"github.com/flyteorg/flyteadmin/pkg/async/cloudevent/interfaces"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/golang/protobuf/proto"
)
const (
cloudEventSource = "https://github.com/flyteorg/flyteadmin"
cloudEventTypePrefix = "com.flyte.resource"
jsonSchemaURLKey = "jsonschemaurl"
jsonSchemaURL = "https://github.com/flyteorg/flyteidl/blob/v0.24.14/jsonschema/workflow_execution.json"
)
// Publisher This event publisher acts to asynchronously publish workflow execution events.
type Publisher struct {
sender interfaces.Sender
systemMetrics implementations.EventPublisherSystemMetrics
events sets.String
}
func (p *Publisher) Publish(ctx context.Context, notificationType string, msg proto.Message) error {
if !p.shouldPublishEvent(notificationType) {
return nil
}
p.systemMetrics.PublishTotal.Inc()
logger.Debugf(ctx, "Publishing the following message [%+v]", msg)
var executionID string
var phase string
var eventTime time.Time
switch msgType := msg.(type) {
case *admin.WorkflowExecutionEventRequest:
e := msgType.Event
executionID = e.ExecutionId.String()
phase = e.Phase.String()
eventTime = e.OccurredAt.AsTime()
case *admin.TaskExecutionEventRequest:
e := msgType.Event
executionID = e.TaskId.String()
phase = e.Phase.String()
eventTime = e.OccurredAt.AsTime()
case *admin.NodeExecutionEventRequest:
e := msgType.Event
executionID = msgType.Event.Id.String()
phase = e.Phase.String()
eventTime = e.OccurredAt.AsTime()
default:
return fmt.Errorf("unsupported event types [%+v]", reflect.TypeOf(msg))
}
event := cloudevents.NewEvent()
// CloudEvent specification: https://github.com/cloudevents/spec/blob/v1.0/spec.md#required-attributes
event.SetType(fmt.Sprintf("%v.%v", cloudEventTypePrefix, notificationType))
event.SetSource(cloudEventSource)
event.SetID(fmt.Sprintf("%v.%v", executionID, phase))
event.SetTime(eventTime)
event.SetExtension(jsonSchemaURLKey, jsonSchemaURL)
if err := event.SetData(cloudevents.ApplicationJSON, &msg); err != nil {
p.systemMetrics.PublishError.Inc()
logger.Errorf(ctx, "Failed to encode message [%v] with error: %v", msg, err)
return err
}
if err := p.sender.Send(ctx, notificationType, event); err != nil {
p.systemMetrics.PublishError.Inc()
logger.Errorf(ctx, "Failed to send message [%v] with error: %v", msg, err)
return err
}
p.systemMetrics.PublishSuccess.Inc()
return nil
}
func (p *Publisher) shouldPublishEvent(notificationType string) bool {
return p.events.Has(notificationType)
}
func NewCloudEventsPublisher(sender interfaces.Sender, scope promutils.Scope, eventTypes []string) interfaces.Publisher {
eventSet := sets.NewString()
for _, event := range eventTypes {
if event == implementations.AllTypes || event == implementations.AllTypesShort {
for _, e := range implementations.SupportedEvents {
eventSet = eventSet.Insert(e)
}
break
}
if e, found := implementations.SupportedEvents[event]; found {
eventSet = eventSet.Insert(e)
} else {
panic(fmt.Errorf("unsupported event type [%s] in the config", event))
}
}
return &Publisher{
sender: sender,
systemMetrics: implementations.NewEventPublisherSystemMetrics(scope.NewSubScope("cloudevents_publisher")),
events: eventSet,
}
}