Skip to content

Commit

Permalink
app/promauto: implement reset gauge (#2289)
Browse files Browse the repository at this point in the history
Implement a ResetGauge to simplify "resetting of previously set labels".

category: misc
ticket: #2277
  • Loading branch information
corverroos committed Jun 8, 2023
1 parent 97f8a02 commit a57c120
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 20 deletions.
2 changes: 2 additions & 0 deletions app/monitoringapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ func beaconNodeSyncing(ctx context.Context, eth2Cl eth2client.NodeSyncingProvide
// beaconNodeVersionMetric sets the beacon node version gauge.
func beaconNodeVersionMetric(ctx context.Context, eth2Cl eth2wrap.Client, clock clockwork.Clock) {
nodeVersionTicker := clock.NewTicker(10 * time.Minute)

// TODO(corver): Refactor to use ResetGauge.
var prevNodeVersion string
setNodeVersion := func() {
version, err := eth2Cl.NodeVersion(ctx)
Expand Down
3 changes: 2 additions & 1 deletion app/peerinfo/peerinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ func (p *PeerInfo) sendOnce(ctx context.Context, now time.Time) {
// newMetricsSubmitter returns a prometheus metric submitter.
func newMetricsSubmitter() metricSubmitter {
var (
mu sync.Mutex
mu sync.Mutex
// TODO(corver): Refactor to use ResetGauge.
prevVersions = make(map[string]string)
prevGitHashes = make(map[string]string)
)
Expand Down
56 changes: 56 additions & 0 deletions app/promauto/resetgauge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright © 2022-2023 Obol Labs Inc. Licensed under the terms of a Business Source License 1.1

package promauto

import (
"strings"
"sync"

"github.com/prometheus/client_golang/prometheus"
)

const separator = "|"

// NewResetGaugeVec creates a new ResetGaugeVec.
func NewResetGaugeVec(opts prometheus.GaugeOpts, labelNames []string) *ResetGaugeVec {
return &ResetGaugeVec{
inner: NewGaugeVec(opts, labelNames),
labels: make(map[string]bool),
}
}

// ResetGaugeVec is a GaugeVec that can be reset which deletes all previously set labels.
// This is useful to clear out labels that are no longer present.
type ResetGaugeVec struct {
inner *prometheus.GaugeVec

mu sync.Mutex
labels map[string]bool
}

func (g *ResetGaugeVec) WithLabelValues(lvs ...string) prometheus.Gauge {
for _, lv := range lvs {
if strings.Contains(lv, separator) {
panic("label value cannot contain separator")
}
}

g.mu.Lock()
defer g.mu.Unlock()

g.labels[strings.Join(lvs, separator)] = true

return g.inner.WithLabelValues(lvs...)
}

// Reset deletes all previously set labels.
func (g *ResetGaugeVec) Reset() {
g.mu.Lock()
defer g.mu.Unlock()

for lv := range g.labels {
g.inner.DeleteLabelValues(strings.Split(lv, separator)...)
}

g.labels = make(map[string]bool)
}
59 changes: 59 additions & 0 deletions app/promauto/resetgauge_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright © 2022-2023 Obol Labs Inc. Licensed under the terms of a Business Source License 1.1

package promauto_test

import (
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"

"github.com/obolnetwork/charon/app/promauto"
)

const resetTest = "reset_test"

var testResetGauge = promauto.NewResetGaugeVec(prometheus.GaugeOpts{
Name: resetTest,
Help: "",
}, []string{"label"})

func TestResetGaugeVec(t *testing.T) {
registry, err := promauto.NewRegistry(nil)
require.NoError(t, err)

testResetGauge.WithLabelValues("1").Set(1)
assertVecLen(t, registry, resetTest, 1)

testResetGauge.WithLabelValues("2").Set(2)
assertVecLen(t, registry, resetTest, 2)

testResetGauge.Reset()
assertVecLen(t, registry, resetTest, 0)

testResetGauge.WithLabelValues("3").Set(3)
assertVecLen(t, registry, resetTest, 1)
}

func assertVecLen(t *testing.T, registry *prometheus.Registry, name string, l int) { //nolint:unparam // abstracting name is fine even though it is always currently constant
t.Helper()

metrics, err := registry.Gather()
require.NoError(t, err)

for _, metricFam := range metrics {
if *metricFam.Name != name {
continue
}

require.Len(t, metricFam.Metric, l)

return
}

if l == 0 {
return
}

require.Fail(t, "metric not found")
}
1 change: 1 addition & 0 deletions core/scheduler/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func instrumentDuty(duty core.Duty, defSet core.DutyDefinitionSet) {

// newMetricSubmitter returns a function that sets validator balance and status metric.
func newMetricSubmitter() func(pubkey core.PubKey, totalBal eth2p0.Gwei, status string) {
// TODO(corver): Refactor to use ResetGauge.
prevStatus := make(map[core.PubKey]string)

return func(pubkey core.PubKey, totalBal eth2p0.Gwei, status string) {
Expand Down
2 changes: 1 addition & 1 deletion docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ when storing metrics from multiple nodes or clusters in one Prometheus instance.
| `p2p_peer_connection_types` | Gauge | Current number of libp2p connections by peer and type (`direct` or `relay`). Note that peers may have multiple connections. | `peer, type` |
| `p2p_peer_network_receive_bytes_total` | Counter | Total number of network bytes received from the peer by protocol. | `peer, protocol` |
| `p2p_peer_network_sent_bytes_total` | Counter | Total number of network bytes sent to the peer by protocol. | `peer, protocol` |
| `p2p_peer_streams` | Gauge | Current number of libp2p streams by peer and direction (`inbound` or `outbound` or `unknown`). | `peer, direction, protocol` |
| `p2p_peer_streams` | Gauge | Current number of libp2p streams by peer, direction (`inbound` or `outbound` or `unknown`) and protocol. | `peer, direction, protocol` |
| `p2p_ping_error_total` | Counter | Total number of ping errors per peer | `peer` |
| `p2p_ping_latency_secs` | Histogram | Ping latencies in seconds per peer | `peer` |
| `p2p_ping_success` | Gauge | Whether the last ping was successful (1) or not (0). Can be used as proxy for connected peers | `peer` |
Expand Down
5 changes: 2 additions & 3 deletions p2p/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,10 @@ var (
Help: "Current number of libp2p connections by peer and type ('direct' or 'relay'). Note that peers may have multiple connections.",
}, []string{"peer", "type"})

// TODO(gsora): remove this once we fix the stream leak issue.
peerStreamGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
peerStreamGauge = promauto.NewResetGaugeVec(prometheus.GaugeOpts{
Namespace: "p2p",
Name: "peer_streams",
Help: "Current number of libp2p streams by peer and direction ('inbound' or 'outbound' or 'unknown').",
Help: "Current number of libp2p streams by peer, direction ('inbound' or 'outbound' or 'unknown') and protocol.",
}, []string{"peer", "direction", "protocol"})

peerConnCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Expand Down
32 changes: 17 additions & 15 deletions p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,9 @@ func RegisterConnectionLogger(ctx context.Context, tcpNode host.Host, peerIDs []
}

type streamKey struct {
connKey
Protocol string
PeerName string
Direction string
Protocol string
}

var (
Expand All @@ -305,35 +306,36 @@ func RegisterConnectionLogger(ctx context.Context, tcpNode host.Host, peerIDs []
case <-ctx.Done():
return
case <-ticker.C:
// Instrument connection counts.
// Instrument connection and stream counts.
peerStreamGauge.Reset() // Reset stream gauge to clear previously set protocols.
counts := make(map[connKey]int)
streams := make(map[streamKey]int)

for _, conn := range tcpNode.Network().Conns() {
p := PeerName(conn.RemotePeer())
key := connKey{PeerName: p, Type: addrType(conn.RemoteMultiaddr())}
counts[key]++
cKey := connKey{
PeerName: p,
Type: addrType(conn.RemoteMultiaddr()),
}
counts[cKey]++

for _, stream := range conn.GetStreams() {
dir := stream.Stat().Direction.String()
protocol := stream.Protocol()
sKey := streamKey{
connKey: connKey{PeerName: p, Type: dir},
Protocol: string(protocol),
PeerName: p,
Direction: stream.Stat().Direction.String(),
Protocol: string(stream.Protocol()),
}

streams[sKey]++
}
}
for _, pID := range peerIDs {
for _, typ := range []string{addrTypeRelay, addrTypeDirect} {
key := connKey{PeerName: PeerName(pID), Type: typ}
peerConnGauge.WithLabelValues(key.PeerName, key.Type).Set(float64(counts[key]))
cKey := connKey{PeerName: PeerName(pID), Type: typ}
peerConnGauge.WithLabelValues(cKey.PeerName, cKey.Type).Set(float64(counts[cKey]))
}
}
// TODO(gsora): remove this once we fix the stream leak issue
for key, amount := range streams {
peerStreamGauge.WithLabelValues(key.PeerName, key.Type, key.Protocol).Set(float64(amount))
for sKey, amount := range streams {
peerStreamGauge.WithLabelValues(sKey.PeerName, sKey.Direction, sKey.Protocol).Set(float64(amount))
}
case e := <-events:
// Log and instrument events.
Expand Down

0 comments on commit a57c120

Please sign in to comment.