-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
metrics.go
199 lines (182 loc) · 7.52 KB
/
metrics.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package rangefeed
import (
"fmt"
"time"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
)
var (
metaRangeFeedCatchUpScanNanos = metric.Metadata{
Name: "kv.rangefeed.catchup_scan_nanos",
Help: "Time spent in RangeFeed catchup scan",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
metaRangeFeedExhausted = metric.Metadata{
Name: "kv.rangefeed.budget_allocation_failed",
Help: "Number of times RangeFeed failed because memory budget was exceeded",
Measurement: "Events",
Unit: metric.Unit_COUNT,
}
metaRangeFeedBudgetBlocked = metric.Metadata{
Name: "kv.rangefeed.budget_allocation_blocked",
Help: "Number of times RangeFeed waited for budget availability",
Measurement: "Events",
Unit: metric.Unit_COUNT,
}
metaRangeFeedRegistrations = metric.Metadata{
Name: "kv.rangefeed.registrations",
Help: "Number of active RangeFeed registrations",
Measurement: "Registrations",
Unit: metric.Unit_COUNT,
}
metaRangeFeedClosedTimestampMaxBehindNanos = metric.Metadata{
Name: "kv.rangefeed.closed_timestamp_max_behind_nanos",
Help: "Largest latency between realtime and replica max closed timestamp for replicas " +
"that have active rangeeds on them",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
metaRangefeedSlowClosedTimestampRanges = metric.Metadata{
Name: "kv.rangefeed.closed_timestamp.slow_ranges",
Help: "Number of ranges that have a closed timestamp lagging by more than 5x target lag. " +
"Periodically re-calculated",
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
}
metaRangeFeedProcessorsGO = metric.Metadata{
Name: "kv.rangefeed.processors_goroutine",
Help: "Number of active RangeFeed processors using goroutines",
Measurement: "Processors",
Unit: metric.Unit_COUNT,
}
metaRangeFeedProcessorsScheduler = metric.Metadata{
Name: "kv.rangefeed.processors_scheduler",
Help: "Number of active RangeFeed processors using scheduler",
Measurement: "Processors",
Unit: metric.Unit_COUNT,
}
metaQueueTimeHistogramsTemplate = metric.Metadata{
Name: "kv.rangefeed.scheduler.%s.latency",
Help: "KV RangeFeed %s scheduler latency",
Measurement: "Latency",
Unit: metric.Unit_NANOSECONDS,
}
metaQueueSizeTemplate = metric.Metadata{
Name: "kv.rangefeed.scheduler.%s.queue_size",
Help: "Number of entries in the KV RangeFeed %s scheduler queue",
Measurement: "Pending Ranges",
Unit: metric.Unit_COUNT,
}
)
// Metrics are for production monitoring of RangeFeeds.
type Metrics struct {
RangeFeedCatchUpScanNanos *metric.Counter
RangeFeedBudgetExhausted *metric.Counter
RangeFeedBudgetBlocked *metric.Counter
RangeFeedRegistrations *metric.Gauge
RangeFeedClosedTimestampMaxBehindNanos *metric.Gauge
RangeFeedSlowClosedTimestampRanges *metric.Gauge
RangeFeedSlowClosedTimestampLogN log.EveryN
// RangeFeedSlowClosedTimestampNudgeSem bounds the amount of work that can be
// spun up on behalf of the RangeFeed nudger. We don't expect to hit this
// limit, but it's here to limit the effect on stability in case something
// unexpected happens.
RangeFeedSlowClosedTimestampNudgeSem chan struct{}
// Metrics exposing rangefeed processor by type. Those metrics are used to
// monitor processor switch over. They could be removed when legacy processor
// is removed.
RangeFeedProcessorsGO *metric.Gauge
RangeFeedProcessorsScheduler *metric.Gauge
}
// MetricStruct implements the metric.Struct interface.
func (*Metrics) MetricStruct() {}
// NewMetrics makes the metrics for RangeFeeds monitoring.
func NewMetrics() *Metrics {
return &Metrics{
RangeFeedCatchUpScanNanos: metric.NewCounter(metaRangeFeedCatchUpScanNanos),
RangeFeedBudgetExhausted: metric.NewCounter(metaRangeFeedExhausted),
RangeFeedBudgetBlocked: metric.NewCounter(metaRangeFeedBudgetBlocked),
RangeFeedRegistrations: metric.NewGauge(metaRangeFeedRegistrations),
RangeFeedClosedTimestampMaxBehindNanos: metric.NewGauge(metaRangeFeedClosedTimestampMaxBehindNanos),
RangeFeedSlowClosedTimestampRanges: metric.NewGauge(metaRangefeedSlowClosedTimestampRanges),
RangeFeedSlowClosedTimestampLogN: log.Every(5 * time.Second),
RangeFeedSlowClosedTimestampNudgeSem: make(chan struct{}, 1024),
RangeFeedProcessorsGO: metric.NewGauge(metaRangeFeedProcessorsGO),
RangeFeedProcessorsScheduler: metric.NewGauge(metaRangeFeedProcessorsScheduler),
}
}
// FeedBudgetPoolMetrics holds metrics for RangeFeed budgets for the purpose
// or registration in a metric registry.
type FeedBudgetPoolMetrics struct {
SystemBytesCount *metric.Gauge
SharedBytesCount *metric.Gauge
}
// MetricStruct implements metrics.Struct interface.
func (FeedBudgetPoolMetrics) MetricStruct() {}
// NewFeedBudgetMetrics creates new metrics for RangeFeed budgets.
func NewFeedBudgetMetrics(histogramWindow time.Duration) *FeedBudgetPoolMetrics {
makeMemMetricMetadata := func(name, help string) metric.Metadata {
return metric.Metadata{
Name: "kv.rangefeed.mem_" + name,
Help: help,
Measurement: "Memory",
Unit: metric.Unit_BYTES,
}
}
return &FeedBudgetPoolMetrics{
SystemBytesCount: metric.NewGauge(makeMemMetricMetadata("system",
"Memory usage by rangefeeds on system ranges")),
SharedBytesCount: metric.NewGauge(makeMemMetricMetadata("shared",
"Memory usage by rangefeeds")),
}
}
// ShardMetrics metrics for individual scheduler shard.
type ShardMetrics struct {
// QueueTime is time spent by range in scheduler queue.
QueueTime metric.IHistogram
// QueueSize is number of elements in the queue recently observed by reader.
QueueSize *metric.Gauge
}
// MetricStruct implements metrics.Struct interface.
func (*ShardMetrics) MetricStruct() {}
// SchedulerMetrics for production monitoring of rangefeed Scheduler.
type SchedulerMetrics struct {
SystemPriority *ShardMetrics
NormalPriority *ShardMetrics
}
// MetricStruct implements metrics.Struct interface.
func (*SchedulerMetrics) MetricStruct() {}
// NewSchedulerMetrics creates metric struct for Scheduler.
func NewSchedulerMetrics(histogramWindow time.Duration) *SchedulerMetrics {
return &SchedulerMetrics{
SystemPriority: newSchedulerShardMetrics("system", histogramWindow),
NormalPriority: newSchedulerShardMetrics("normal", histogramWindow),
}
}
func newSchedulerShardMetrics(name string, histogramWindow time.Duration) *ShardMetrics {
expandTemplate := func(template metric.Metadata) metric.Metadata {
result := template
result.Name = fmt.Sprintf(template.Name, name)
result.Help = fmt.Sprintf(template.Help, name)
return result
}
return &ShardMetrics{
QueueTime: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePreferHdrLatency,
Metadata: expandTemplate(metaQueueTimeHistogramsTemplate),
Duration: histogramWindow,
BucketConfig: metric.IOLatencyBuckets,
}),
QueueSize: metric.NewGauge(expandTemplate(metaQueueSizeTemplate)),
}
}