-
Notifications
You must be signed in to change notification settings - Fork 665
/
latency.go
180 lines (149 loc) · 4.95 KB
/
latency.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
// Copyright (C) 2019-2021, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package metrics
import (
"fmt"
"time"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow/choices"
"github.com/ava-labs/avalanchego/utils/linkedhashmap"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/metric"
"github.com/ava-labs/avalanchego/utils/wrappers"
)
var _ Latency = &latency{}
type Latency interface {
// Issued marks the item as having been issued.
Issued(id ids.ID, pollNumber uint64)
// Accepted marks the item as having been accepted.
Accepted(id ids.ID, pollNumber uint64)
// Rejected marks the item as having been rejected.
Rejected(id ids.ID, pollNumber uint64)
// MeasureAndGetOldestDuration returns the amount of time the oldest item
// has been processing.
MeasureAndGetOldestDuration() time.Duration
// NumProcessing returns the number of currently processing items.
NumProcessing() int
}
type opStart struct {
time time.Time
pollNumber uint64
}
// Latency reports commonly used consensus latency metrics.
type latency struct {
// ProcessingEntries keeps track of the [opStart] that each item was issued
// into the consensus instance. This is used to calculate the amount of time
// to accept or reject the item.
processingEntries linkedhashmap.LinkedHashmap
// log reports anomalous events.
log logging.Logger
// numProcessing keeps track of the number of items processing
numProcessing prometheus.Gauge
// pollsAccepted tracks the number of polls that an item was in processing
// for before being accepted
pollsAccepted metric.Averager
// pollsRejected tracks the number of polls that an item was in processing
// for before being rejected
pollsRejected metric.Averager
// latAccepted tracks the number of nanoseconds that an item was processing
// before being accepted
latAccepted metric.Averager
// rejected tracks the number of nanoseconds that an item was processing
// before being rejected
latRejected metric.Averager
}
// Initialize the metrics with the provided names.
func NewLatency(metricName, descriptionName string, log logging.Logger, namespace string, reg prometheus.Registerer) (Latency, error) {
errs := wrappers.Errs{}
l := &latency{
processingEntries: linkedhashmap.New(),
log: log,
numProcessing: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Name: fmt.Sprintf("%s_processing", metricName),
Help: fmt.Sprintf("Number of currently processing %s", metricName),
}),
pollsAccepted: metric.NewAveragerWithErrs(
namespace,
fmt.Sprintf("%s_polls_accepted", metricName),
fmt.Sprintf("number of polls from issuance of a %s to its acceptance", descriptionName),
reg,
&errs,
),
pollsRejected: metric.NewAveragerWithErrs(
namespace,
fmt.Sprintf("%s_polls_rejected", metricName),
fmt.Sprintf("number of polls from issuance of a %s to its rejection", descriptionName),
reg,
&errs,
),
latAccepted: metric.NewAveragerWithErrs(
namespace,
fmt.Sprintf("%s_accepted", metricName),
fmt.Sprintf("time (in ns) from issuance of a %s to its acceptance", descriptionName),
reg,
&errs,
),
latRejected: metric.NewAveragerWithErrs(
namespace,
fmt.Sprintf("%s_rejected", metricName),
fmt.Sprintf("time (in ns) from issuance of a %s to its rejection", descriptionName),
reg,
&errs,
),
}
errs.Add(reg.Register(l.numProcessing))
return l, errs.Err
}
func (l *latency) Issued(id ids.ID, pollNumber uint64) {
l.processingEntries.Put(id, opStart{
time: time.Now(),
pollNumber: pollNumber,
})
l.numProcessing.Inc()
}
func (l *latency) Accepted(id ids.ID, pollNumber uint64) {
startIntf, ok := l.processingEntries.Get(id)
if !ok {
l.log.Debug("unable to measure tx latency",
zap.Stringer("status", choices.Accepted),
zap.Stringer("txID", id),
)
return
}
l.processingEntries.Delete(id)
start := startIntf.(opStart)
l.pollsAccepted.Observe(float64(pollNumber - start.pollNumber))
duration := time.Since(start.time)
l.latAccepted.Observe(float64(duration))
l.numProcessing.Dec()
}
func (l *latency) Rejected(id ids.ID, pollNumber uint64) {
startIntf, ok := l.processingEntries.Get(id)
if !ok {
l.log.Debug("unable to measure tx latency",
zap.Stringer("status", choices.Rejected),
zap.Stringer("txID", id),
)
return
}
l.processingEntries.Delete(id)
start := startIntf.(opStart)
l.pollsRejected.Observe(float64(pollNumber - start.pollNumber))
duration := time.Since(start.time)
l.latRejected.Observe(float64(duration))
l.numProcessing.Dec()
}
func (l *latency) MeasureAndGetOldestDuration() time.Duration {
_, oldestTimeIntf, exists := l.processingEntries.Oldest()
if !exists {
return 0
}
oldestTime := oldestTimeIntf.(opStart).time
return time.Since(oldestTime)
}
func (l *latency) NumProcessing() int {
return l.processingEntries.Len()
}