-
Notifications
You must be signed in to change notification settings - Fork 3.2k
/
metrics.go
287 lines (232 loc) · 8.26 KB
/
metrics.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
package metrics
import (
"fmt"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/util/workqueue"
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
)
const (
argoNamespace = "argo"
workflowsSubsystem = "workflows"
DefaultMetricsServerPort = 9090
DefaultMetricsServerPath = "/metrics"
)
type ServerConfig struct {
Enabled bool
Path string
Port int
TTL time.Duration
IgnoreErrors bool
Secure bool
}
func (s ServerConfig) SameServerAs(other ServerConfig) bool {
return s.Port == other.Port && s.Path == other.Path && s.Enabled && other.Enabled && s.Secure == other.Secure
}
type metric struct {
metric prometheus.Metric
lastUpdated time.Time
}
type Metrics struct {
// Ensures mutual exclusion in workflows map
mutex sync.RWMutex
metricsConfig ServerConfig
telemetryConfig ServerConfig
workflowsProcessed prometheus.Counter
podsByPhase map[corev1.PodPhase]prometheus.Gauge
workflowsByPhase map[v1alpha1.NodePhase]prometheus.Gauge
workflows map[string][]string
operationDurations prometheus.Histogram
errors map[ErrorCause]prometheus.Counter
customMetrics map[string]metric
workqueueMetrics map[string]prometheus.Metric
workersBusy map[string]prometheus.Gauge
// Used to quickly check if a metric desc is already used by the system
defaultMetricDescs map[string]bool
metricNameHelps map[string]string
logMetric *prometheus.CounterVec
}
func (m *Metrics) Levels() []log.Level {
return []log.Level{log.InfoLevel, log.WarnLevel, log.ErrorLevel}
}
func (m *Metrics) Fire(entry *log.Entry) error {
m.logMetric.WithLabelValues(entry.Level.String()).Inc()
return nil
}
var _ prometheus.Collector = &Metrics{}
func New(metricsConfig, telemetryConfig ServerConfig) *Metrics {
metrics := &Metrics{
metricsConfig: metricsConfig,
telemetryConfig: telemetryConfig,
workflowsProcessed: newCounter("workflows_processed_count", "Number of workflow updates processed", nil),
podsByPhase: getPodPhaseGauges(),
workflowsByPhase: getWorkflowPhaseGauges(),
workflows: make(map[string][]string),
operationDurations: newHistogram("operation_duration_seconds", "Histogram of durations of operations", nil, []float64{0.1, 0.25, 0.5, 0.75, 1.0, 1.25, 1.5, 1.75, 2.0, 2.5, 3.0}),
errors: getErrorCounters(),
customMetrics: make(map[string]metric),
workqueueMetrics: make(map[string]prometheus.Metric),
workersBusy: make(map[string]prometheus.Gauge),
defaultMetricDescs: make(map[string]bool),
metricNameHelps: make(map[string]string),
logMetric: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "log_messages",
Help: "Total number of log messages.",
}, []string{"level"}),
}
for _, metric := range metrics.allMetrics() {
metrics.defaultMetricDescs[metric.Desc().String()] = true
}
for _, level := range metrics.Levels() {
metrics.logMetric.WithLabelValues(level.String())
}
log.AddHook(metrics)
return metrics
}
func (m *Metrics) allMetrics() []prometheus.Metric {
m.mutex.RLock()
defer m.mutex.RUnlock()
allMetrics := []prometheus.Metric{
m.workflowsProcessed,
m.operationDurations,
}
for _, metric := range m.workflowsByPhase {
allMetrics = append(allMetrics, metric)
}
for _, metric := range m.podsByPhase {
allMetrics = append(allMetrics, metric)
}
for _, metric := range m.errors {
allMetrics = append(allMetrics, metric)
}
for _, metric := range m.workqueueMetrics {
allMetrics = append(allMetrics, metric)
}
for _, metric := range m.workersBusy {
allMetrics = append(allMetrics, metric)
}
for _, metric := range m.customMetrics {
allMetrics = append(allMetrics, metric.metric)
}
return allMetrics
}
func (m *Metrics) StopRealtimeMetricsForKey(key string) {
m.mutex.Lock()
defer m.mutex.Unlock()
if _, exists := m.workflows[key]; !exists {
return
}
realtimeMetrics := m.workflows[key]
for _, metric := range realtimeMetrics {
delete(m.customMetrics, metric)
}
delete(m.workflows, key)
}
func (m *Metrics) OperationCompleted(durationSeconds float64) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.operationDurations.Observe(durationSeconds)
}
func (m *Metrics) GetCustomMetric(key string) prometheus.Metric {
m.mutex.RLock()
defer m.mutex.RUnlock()
// It's okay to return nil metrics in this function
return m.customMetrics[key].metric
}
func (m *Metrics) UpsertCustomMetric(key string, ownerKey string, newMetric prometheus.Metric, realtime bool) error {
m.mutex.Lock()
defer m.mutex.Unlock()
metricDesc := newMetric.Desc().String()
if _, inUse := m.defaultMetricDescs[metricDesc]; inUse {
return fmt.Errorf("metric '%s' is already in use by the system, please use a different name", newMetric.Desc())
}
name, help := recoverMetricNameAndHelpFromDesc(metricDesc)
if existingHelp, inUse := m.metricNameHelps[name]; inUse && help != existingHelp {
return fmt.Errorf("metric '%s' has help string '%s' but should have '%s' (help strings must be identical for metrics of the same name)", name, help, existingHelp)
} else {
m.metricNameHelps[name] = help
}
m.customMetrics[key] = metric{metric: newMetric, lastUpdated: time.Now()}
// If this is a realtime metric, track it
if realtime {
m.workflows[ownerKey] = append(m.workflows[ownerKey], key)
}
return nil
}
func (m *Metrics) SetWorkflowPhaseGauge(phase v1alpha1.NodePhase, num int) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.workflowsByPhase[phase].Set(float64(num))
}
func (m *Metrics) SetPodPhaseGauge(phase corev1.PodPhase, num int) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.podsByPhase[phase].Set(float64(num))
}
type ErrorCause string
const (
ErrorCauseOperationPanic ErrorCause = "OperationPanic"
ErrorCauseCronWorkflowSubmissionError ErrorCause = "CronWorkflowSubmissionError"
ErrorCauseCronWorkflowSpecError ErrorCause = "CronWorkflowSpecError"
)
func (m *Metrics) OperationPanic() {
m.mutex.Lock()
defer m.mutex.Unlock()
m.errors[ErrorCauseOperationPanic].Inc()
}
func (m *Metrics) CronWorkflowSubmissionError() {
m.mutex.Lock()
defer m.mutex.Unlock()
m.errors[ErrorCauseCronWorkflowSubmissionError].Inc()
}
func (m *Metrics) CronWorkflowSpecError() {
m.mutex.Lock()
defer m.mutex.Unlock()
m.errors[ErrorCauseCronWorkflowSpecError].Inc()
}
// Act as a metrics provider for a workflow queue
var _ workqueue.MetricsProvider = &Metrics{}
func (m *Metrics) NewDepthMetric(name string) workqueue.GaugeMetric {
m.mutex.Lock()
defer m.mutex.Unlock()
key := fmt.Sprintf("%s-depth", name)
if _, ok := m.workqueueMetrics[key]; !ok {
m.workqueueMetrics[key] = newGauge("queue_depth_count", "Depth of the queue", map[string]string{"queue_name": name})
}
return m.workqueueMetrics[key].(prometheus.Gauge)
}
func (m *Metrics) NewAddsMetric(name string) workqueue.CounterMetric {
m.mutex.Lock()
defer m.mutex.Unlock()
key := fmt.Sprintf("%s-adds", name)
if _, ok := m.workqueueMetrics[key]; !ok {
m.workqueueMetrics[key] = newCounter("queue_adds_count", "Adds to the queue", map[string]string{"queue_name": name})
}
return m.workqueueMetrics[key].(prometheus.Counter)
}
func (m *Metrics) NewLatencyMetric(name string) workqueue.HistogramMetric {
m.mutex.Lock()
defer m.mutex.Unlock()
key := fmt.Sprintf("%s-latency", name)
if _, ok := m.workqueueMetrics[key]; !ok {
m.workqueueMetrics[key] = newHistogram("queue_latency", "Time objects spend waiting in the queue", map[string]string{"queue_name": name}, []float64{1.0, 5.0, 20.0, 60.0, 180.0})
}
return m.workqueueMetrics[key].(prometheus.Histogram)
}
// These metrics are not relevant to be exposed
type noopMetric struct{}
func (noopMetric) Inc() {}
func (noopMetric) Dec() {}
func (noopMetric) Set(float64) {}
func (noopMetric) Observe(float64) {}
func (m *Metrics) NewRetriesMetric(name string) workqueue.CounterMetric { return noopMetric{} }
func (m *Metrics) NewWorkDurationMetric(name string) workqueue.HistogramMetric { return noopMetric{} }
func (m *Metrics) NewUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric {
return noopMetric{}
}
func (m *Metrics) NewLongestRunningProcessorSecondsMetric(name string) workqueue.SettableGaugeMetric {
return noopMetric{}
}