This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 63
/
event_publisher.go
104 lines (87 loc) · 3.13 KB
/
event_publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package implementations
import (
"context"
"k8s.io/apimachinery/pkg/util/sets"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyteadmin/pkg/async/notifications/interfaces"
"github.com/NYTimes/gizmo/pubsub"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/golang/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
)
type EventPublisherSystemMetrics struct {
Scope promutils.Scope
PublishTotal prometheus.Counter
PublishSuccess prometheus.Counter
PublishError prometheus.Counter
}
// TODO: Add a counter that encompasses the publisher stats grouped by project and domain.
type EventPublisher struct {
pub pubsub.Publisher
systemMetrics EventPublisherSystemMetrics
events sets.String
}
var taskExecutionReq admin.TaskExecutionEventRequest
var nodeExecutionReq admin.NodeExecutionEventRequest
var workflowExecutionReq admin.WorkflowExecutionEventRequest
const (
Task = "task"
Node = "node"
Workflow = "workflow"
AllTypes = "all"
AllTypesShort = "*"
)
var SupportedEvents = map[string]string{
Task: proto.MessageName(&taskExecutionReq),
Node: proto.MessageName(&nodeExecutionReq),
Workflow: proto.MessageName(&workflowExecutionReq),
}
// The key is the notification type as defined as an enum.
func (p *EventPublisher) Publish(ctx context.Context, notificationType string, msg proto.Message) error {
if !p.shouldPublishEvent(notificationType) {
return nil
}
p.systemMetrics.PublishTotal.Inc()
logger.Debugf(ctx, "Publishing the following message [%+v]", msg)
err := p.pub.Publish(ctx, notificationType, msg)
if err != nil {
p.systemMetrics.PublishError.Inc()
logger.Errorf(ctx, "Failed to publish a message with key [%s] and message [%s] and error: %v", notificationType, msg.String(), err)
} else {
p.systemMetrics.PublishSuccess.Inc()
}
return err
}
func (p *EventPublisher) shouldPublishEvent(notificationType string) bool {
return p.events.Has(notificationType)
}
func NewEventPublisherSystemMetrics(scope promutils.Scope) EventPublisherSystemMetrics {
return EventPublisherSystemMetrics{
Scope: scope,
PublishTotal: scope.MustNewCounter("event_publish_total", "overall count of publish messages"),
PublishSuccess: scope.MustNewCounter("event_publish_success", "success count of publish messages"),
PublishError: scope.MustNewCounter("event_publish_errors", "count of publish errors"),
}
}
func NewEventsPublisher(pub pubsub.Publisher, scope promutils.Scope, eventTypes []string) interfaces.Publisher {
eventSet := sets.NewString()
for _, event := range eventTypes {
if event == AllTypes || event == AllTypesShort {
for _, e := range SupportedEvents {
eventSet = eventSet.Insert(e)
}
break
}
if e, found := SupportedEvents[event]; found {
eventSet = eventSet.Insert(e)
} else {
logger.Errorf(context.Background(), "Unsupported event type [%s] in the config")
}
}
return &EventPublisher{
pub: pub,
systemMetrics: NewEventPublisherSystemMetrics(scope.NewSubScope("events_publisher")),
events: eventSet,
}
}