forked from ava-labs/avalanchego
-
Notifications
You must be signed in to change notification settings - Fork 4
/
latency.go
215 lines (180 loc) · 7.13 KB
/
latency.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package metrics
import (
"fmt"
"time"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"github.com/MetalBlockchain/metalgo/ids"
"github.com/MetalBlockchain/metalgo/snow/choices"
"github.com/MetalBlockchain/metalgo/utils/linkedhashmap"
"github.com/MetalBlockchain/metalgo/utils/logging"
"github.com/MetalBlockchain/metalgo/utils/metric"
"github.com/MetalBlockchain/metalgo/utils/wrappers"
)
var _ Latency = (*latency)(nil)
type Latency interface {
// Issued marks the item as having been issued.
Issued(id ids.ID, pollNumber uint64)
// Accepted marks the item as having been accepted.
// Pass the container size in bytes for metrics tracking.
Accepted(id ids.ID, pollNumber uint64, containerSize int)
// Rejected marks the item as having been rejected.
// Pass the container size in bytes for metrics tracking.
Rejected(id ids.ID, pollNumber uint64, containerSize int)
// MeasureAndGetOldestDuration returns the amount of time the oldest item
// has been processing.
MeasureAndGetOldestDuration() time.Duration
// NumProcessing returns the number of currently processing items.
NumProcessing() int
}
type opStart struct {
time time.Time
pollNumber uint64
}
// Latency reports commonly used consensus latency metrics.
type latency struct {
// ProcessingEntries keeps track of the [opStart] that each item was issued
// into the consensus instance. This is used to calculate the amount of time
// to accept or reject the item.
processingEntries linkedhashmap.LinkedHashmap[ids.ID, opStart]
// log reports anomalous events.
log logging.Logger
// numProcessing keeps track of the number of items processing
numProcessing prometheus.Gauge
// pollsAccepted tracks the number of polls that an item was in processing
// for before being accepted
pollsAccepted metric.Averager
// pollsRejected tracks the number of polls that an item was in processing
// for before being rejected
pollsRejected metric.Averager
// latAccepted tracks the number of nanoseconds that an item was processing
// before being accepted
latAccepted metric.Averager
containerSizeAcceptedSum prometheus.Gauge
// rejected tracks the number of nanoseconds that an item was processing
// before being rejected
latRejected metric.Averager
containerSizeRejectedSum prometheus.Gauge
}
// Initialize the metrics with the provided names.
func NewLatency(metricName, descriptionName string, log logging.Logger, namespace string, reg prometheus.Registerer) (Latency, error) {
errs := wrappers.Errs{}
l := &latency{
processingEntries: linkedhashmap.New[ids.ID, opStart](),
log: log,
// e.g.,
// "avalanche_7y7zwo7XatqnX4dtTakLo32o7jkMX4XuDa26WaxbCXoCT1qKK_blks_processing" to count how blocks are currently processing
numProcessing: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Name: fmt.Sprintf("%s_processing", metricName),
Help: fmt.Sprintf("Number of currently processing %s", metricName),
}),
pollsAccepted: metric.NewAveragerWithErrs(
namespace,
fmt.Sprintf("%s_polls_accepted", metricName),
fmt.Sprintf("number of polls from issuance of a %s to its acceptance", descriptionName),
reg,
&errs,
),
pollsRejected: metric.NewAveragerWithErrs(
namespace,
fmt.Sprintf("%s_polls_rejected", metricName),
fmt.Sprintf("number of polls from issuance of a %s to its rejection", descriptionName),
reg,
&errs,
),
// e.g.,
// "avalanche_C_blks_accepted_count" to count how many "Observe" gets called -- count all "Accept"
// "avalanche_C_blks_accepted_sum" to count how many ns have elapsed since its issuance on acceptance
// "avalanche_C_blks_accepted_sum / avalanche_C_blks_accepted_count" is the average block acceptance latency in ns
// "avalanche_C_blks_accepted_container_size_sum" to track cumulative sum of all accepted blocks' sizes
// "avalanche_C_blks_accepted_container_size_sum / avalanche_C_blks_accepted_count" is the average block size
latAccepted: metric.NewAveragerWithErrs(
namespace,
fmt.Sprintf("%s_accepted", metricName),
fmt.Sprintf("time (in ns) from issuance of a %s to its acceptance", descriptionName),
reg,
&errs,
),
containerSizeAcceptedSum: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Name: fmt.Sprintf("%s_accepted_container_size_sum", metricName),
Help: fmt.Sprintf("Cumulative sum of container size of all accepted %s", metricName),
}),
// e.g.,
// "avalanche_P_blks_rejected_count" to count how many "Observe" gets called -- count all "Reject"
// "avalanche_P_blks_rejected_sum" to count how many ns have elapsed since its issuance on rejection
// "avalanche_P_blks_accepted_sum / avalanche_P_blks_accepted_count" is the average block acceptance latency in ns
// "avalanche_P_blks_accepted_container_size_sum" to track cumulative sum of all accepted blocks' sizes
// "avalanche_P_blks_accepted_container_size_sum / avalanche_P_blks_accepted_count" is the average block size
latRejected: metric.NewAveragerWithErrs(
namespace,
fmt.Sprintf("%s_rejected", metricName),
fmt.Sprintf("time (in ns) from issuance of a %s to its rejection", descriptionName),
reg,
&errs,
),
containerSizeRejectedSum: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Name: fmt.Sprintf("%s_rejected_container_size_sum", metricName),
Help: fmt.Sprintf("Cumulative sum of container size of all rejected %s", metricName),
}),
}
errs.Add(
reg.Register(l.numProcessing),
reg.Register(l.containerSizeAcceptedSum),
reg.Register(l.containerSizeRejectedSum),
)
return l, errs.Err
}
func (l *latency) Issued(id ids.ID, pollNumber uint64) {
l.processingEntries.Put(id, opStart{
time: time.Now(),
pollNumber: pollNumber,
})
l.numProcessing.Inc()
}
func (l *latency) Accepted(id ids.ID, pollNumber uint64, containerSize int) {
start, ok := l.processingEntries.Get(id)
if !ok {
l.log.Debug("unable to measure tx latency",
zap.Stringer("status", choices.Accepted),
zap.Stringer("txID", id),
)
return
}
l.processingEntries.Delete(id)
l.pollsAccepted.Observe(float64(pollNumber - start.pollNumber))
duration := time.Since(start.time)
l.latAccepted.Observe(float64(duration))
l.numProcessing.Dec()
l.containerSizeAcceptedSum.Add(float64(containerSize))
}
func (l *latency) Rejected(id ids.ID, pollNumber uint64, containerSize int) {
start, ok := l.processingEntries.Get(id)
if !ok {
l.log.Debug("unable to measure tx latency",
zap.Stringer("status", choices.Rejected),
zap.Stringer("txID", id),
)
return
}
l.processingEntries.Delete(id)
l.pollsRejected.Observe(float64(pollNumber - start.pollNumber))
duration := time.Since(start.time)
l.latRejected.Observe(float64(duration))
l.numProcessing.Dec()
l.containerSizeRejectedSum.Add(float64(containerSize))
}
func (l *latency) MeasureAndGetOldestDuration() time.Duration {
_, oldestOp, exists := l.processingEntries.Oldest()
if !exists {
return 0
}
return time.Since(oldestOp.time)
}
func (l *latency) NumProcessing() int {
return l.processingEntries.Len()
}