-
Notifications
You must be signed in to change notification settings - Fork 885
/
metrics.go
172 lines (141 loc) · 4.36 KB
/
metrics.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package discovery
import (
"context"
"fmt"
"github.com/libp2p/go-libp2p/core/peer"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"github.com/celestiaorg/celestia-node/libs/utils"
)
const (
discoveryEnoughPeersKey = "enough_peers"
handlePeerResultKey = "result"
handlePeerSkipSelf handlePeerResult = "skip_self"
handlePeerEnoughPeers handlePeerResult = "skip_enough_peers"
handlePeerBackoff handlePeerResult = "skip_backoff"
handlePeerConnected handlePeerResult = "connected"
handlePeerConnErr handlePeerResult = "conn_err"
handlePeerInSet handlePeerResult = "in_set"
advertiseFailedKey = "failed"
)
var meter = otel.Meter("share_discovery")
type handlePeerResult string
type metrics struct {
peersAmount metric.Int64ObservableGauge
discoveryResult metric.Int64Counter // attributes: enough_peers[bool],is_canceled[bool]
handlePeerResult metric.Int64Counter // attributes: result[string]
advertise metric.Int64Counter // attributes: failed[bool]
peerAdded metric.Int64Counter
peerRemoved metric.Int64Counter
clientReg metric.Registration
}
// WithMetrics turns on metric collection in discoery.
func (d *Discovery) WithMetrics() error {
metrics, err := initMetrics(d)
if err != nil {
return fmt.Errorf("discovery: init metrics: %w", err)
}
d.metrics = metrics
d.onUpdatedPeers = d.onUpdatedPeers.add(metrics.observeOnPeersUpdate)
return nil
}
func initMetrics(d *Discovery) (*metrics, error) {
peersAmount, err := meter.Int64ObservableGauge("discovery_amount_of_peers",
metric.WithDescription("amount of peers in discovery set"))
if err != nil {
return nil, err
}
discoveryResult, err := meter.Int64Counter("discovery_find_peers_result",
metric.WithDescription("result of find peers run"))
if err != nil {
return nil, err
}
handlePeerResultCounter, err := meter.Int64Counter("discovery_handler_peer_result",
metric.WithDescription("result handling found peer"))
if err != nil {
return nil, err
}
advertise, err := meter.Int64Counter("discovery_advertise_event",
metric.WithDescription("advertise events counter"))
if err != nil {
return nil, err
}
peerAdded, err := meter.Int64Counter("discovery_add_peer",
metric.WithDescription("add peer to discovery set counter"))
if err != nil {
return nil, err
}
peerRemoved, err := meter.Int64Counter("discovery_remove_peer",
metric.WithDescription("remove peer from discovery set counter"))
if err != nil {
return nil, err
}
backOffSize, err := meter.Int64ObservableGauge("discovery_backoff_amount",
metric.WithDescription("amount of peers in backoff"))
if err != nil {
return nil, err
}
metrics := &metrics{
peersAmount: peersAmount,
discoveryResult: discoveryResult,
handlePeerResult: handlePeerResultCounter,
advertise: advertise,
peerAdded: peerAdded,
peerRemoved: peerRemoved,
}
callback := func(_ context.Context, observer metric.Observer) error {
observer.ObserveInt64(peersAmount, int64(d.set.Size()))
observer.ObserveInt64(backOffSize, int64(d.connector.Size()))
return nil
}
metrics.clientReg, err = meter.RegisterCallback(callback, peersAmount, backOffSize)
if err != nil {
return nil, fmt.Errorf("registering metrics callback: %w", err)
}
return metrics, nil
}
func (m *metrics) close() error {
if m == nil {
return nil
}
return m.clientReg.Unregister()
}
func (m *metrics) observeFindPeers(ctx context.Context, isEnoughPeers bool) {
if m == nil {
return
}
ctx = utils.ResetContextOnError(ctx)
m.discoveryResult.Add(ctx, 1,
metric.WithAttributes(
attribute.Bool(discoveryEnoughPeersKey, isEnoughPeers)))
}
func (m *metrics) observeHandlePeer(ctx context.Context, result handlePeerResult) {
if m == nil {
return
}
ctx = utils.ResetContextOnError(ctx)
m.handlePeerResult.Add(ctx, 1,
metric.WithAttributes(
attribute.String(handlePeerResultKey, string(result))))
}
func (m *metrics) observeAdvertise(ctx context.Context, err error) {
if m == nil {
return
}
ctx = utils.ResetContextOnError(ctx)
m.advertise.Add(ctx, 1,
metric.WithAttributes(
attribute.Bool(advertiseFailedKey, err != nil)))
}
func (m *metrics) observeOnPeersUpdate(_ peer.ID, isAdded bool) {
if m == nil {
return
}
ctx := context.Background()
if isAdded {
m.peerAdded.Add(ctx, 1)
return
}
m.peerRemoved.Add(ctx, 1)
}