forked from benthosdev/benthos
/
tracing.go
137 lines (124 loc) · 3.97 KB
/
tracing.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package service
import (
"sync/atomic"
"github.com/benthosdev/benthos/v4/internal/bundle/tracing"
)
// TracingEventType describes the type of tracing event a component might
// experience during a config run.
//
// Experimental: This type may change outside of major version releases.
type TracingEventType string
// Various tracing event types.
//
// Experimental: This type may change outside of major version releases.
var (
// Note: must match up with ./internal/bundle/tracing/events.go.
TracingEventProduce TracingEventType = "PRODUCE"
TracingEventConsume TracingEventType = "CONSUME"
TracingEventDelete TracingEventType = "DELETE"
TracingEventError TracingEventType = "ERROR"
TracingEventUnknown TracingEventType = "UNKNOWN"
)
func convertTracingEventType(t tracing.EventType) TracingEventType {
switch t {
case tracing.EventProduce:
return TracingEventProduce
case tracing.EventConsume:
return TracingEventConsume
case tracing.EventDelete:
return TracingEventDelete
case tracing.EventError:
return TracingEventError
}
return TracingEventUnknown
}
// TracingEvent represents a single event that occurred within the stream.
//
// Experimental: This type may change outside of major version releases.
type TracingEvent struct {
Type TracingEventType
Content string
Meta map[string]any
}
// TracingSummary is a high level description of all traced events. When tracing
// a stream this should only be queried once the stream has ended.
//
// Experimental: This type may change outside of major version releases.
type TracingSummary struct {
summary *tracing.Summary
}
// TotalInput returns the total traced input messages received.
//
// Experimental: This method may change outside of major version releases.
func (s *TracingSummary) TotalInput() uint64 {
return atomic.LoadUint64(&s.summary.Input)
}
// TotalProcessorErrors returns the total traced processor errors occurred.
//
// Experimental: This method may change outside of major version releases.
func (s *TracingSummary) TotalProcessorErrors() uint64 {
return atomic.LoadUint64(&s.summary.ProcessorErrors)
}
// TotalOutput returns the total traced output messages received.
//
// Experimental: This method may change outside of major version releases.
func (s *TracingSummary) TotalOutput() uint64 {
return atomic.LoadUint64(&s.summary.Output)
}
// InputEvents returns a map of input labels to events traced during the
// execution of a stream pipeline.
//
// Experimental: This method may change outside of major version releases.
func (s *TracingSummary) InputEvents() map[string][]TracingEvent {
m := map[string][]TracingEvent{}
for k, v := range s.summary.InputEvents(false) {
events := make([]TracingEvent, len(v))
for i, e := range v {
events[i] = TracingEvent{
Type: convertTracingEventType(e.Type),
Content: e.Content,
Meta: e.Meta,
}
}
m[k] = events
}
return m
}
// ProcessorEvents returns a map of processor labels to events traced during the
// execution of a stream pipeline.
//
// Experimental: This method may change outside of major version releases.
func (s *TracingSummary) ProcessorEvents() map[string][]TracingEvent {
m := map[string][]TracingEvent{}
for k, v := range s.summary.ProcessorEvents(false) {
events := make([]TracingEvent, len(v))
for i, e := range v {
events[i] = TracingEvent{
Type: convertTracingEventType(e.Type),
Content: e.Content,
Meta: e.Meta,
}
}
m[k] = events
}
return m
}
// OutputEvents returns a map of output labels to events traced during the
// execution of a stream pipeline.
//
// Experimental: This method may change outside of major version releases.
func (s *TracingSummary) OutputEvents() map[string][]TracingEvent {
m := map[string][]TracingEvent{}
for k, v := range s.summary.OutputEvents(false) {
events := make([]TracingEvent, len(v))
for i, e := range v {
events[i] = TracingEvent{
Type: convertTracingEventType(e.Type),
Content: e.Content,
Meta: e.Meta,
}
}
m[k] = events
}
return m
}