Skip to content

Commit

Permalink
feat(share/discovery): add discovery metrics (#2155)
Browse files Browse the repository at this point in the history
## Overview

Adds metrics for discovery component
  • Loading branch information
walldiss committed May 11, 2023
1 parent a8d02a2 commit 687704e
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 3 deletions.
1 change: 1 addition & 0 deletions nodebuilder/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func WithMetrics(metricOpts []otlpmetrichttp.Option, nodeType node.Type, buildIn
fx.Invoke(fraud.WithMetrics),
fx.Invoke(node.WithMetrics),
fx.Invoke(modheader.WithMetrics),
fx.Invoke(share.WithDiscoveryMetrics),
)

samplingMetrics := fx.Options(
Expand Down
11 changes: 9 additions & 2 deletions nodebuilder/share/opts.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
package share

import (
disc "github.com/celestiaorg/celestia-node/share/availability/discovery"
"github.com/celestiaorg/celestia-node/share/getters"
"github.com/celestiaorg/celestia-node/share/p2p/peers"
"github.com/celestiaorg/celestia-node/share/p2p/shrexeds"
"github.com/celestiaorg/celestia-node/share/p2p/shrexnd"
)

// WithPeerManagerMetrics is a utility function that is expected to be
// "invoked" by the fx lifecycle.
// WithPeerManagerMetrics is a utility function to turn on peer manager metrics and that is
// expected to be "invoked" by the fx lifecycle.
func WithPeerManagerMetrics(m *peers.Manager) error {
return m.WithMetrics()
}

// WithDiscoveryMetrics is a utility function to turn on discovery metrics and that is expected to
// be "invoked" by the fx lifecycle.
func WithDiscoveryMetrics(d *disc.Discovery) error {
return d.WithMetrics()
}

func WithShrexClientMetrics(edsClient *shrexeds.Client, ndClient *shrexnd.Client) error {
err := edsClient.WithMetrics()
if err != nil {
Expand Down
16 changes: 15 additions & 1 deletion share/availability/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type Discovery struct {

triggerDisc chan struct{}

metrics *metrics

cancel context.CancelFunc

params Parameters
Expand Down Expand Up @@ -132,6 +134,7 @@ func (d *Discovery) Advertise(ctx context.Context) {
defer timer.Stop()
for {
_, err := d.disc.Advertise(ctx, rendezvousPoint)
d.metrics.observeAdvertise(ctx, err)
if err != nil {
if ctx.Err() != nil {
return
Expand Down Expand Up @@ -268,14 +271,18 @@ func (d *Discovery) discover(ctx context.Context) bool {
drainChannel(ticker.C)
select {
case <-findCtx.Done():
d.metrics.observeFindPeers(ctx, true, true)
return true
case <-ticker.C:
d.metrics.observeDiscoveryStuck(ctx)
log.Warn("wasn't able to find new peers for long time")
continue
case p, ok := <-peers:
if !ok {
isEnoughPeers := d.set.Size() >= d.set.Limit()
d.metrics.observeFindPeers(ctx, ctx.Err() != nil, isEnoughPeers)
log.Debugw("discovery channel closed", "find_is_canceled", findCtx.Err() != nil)
return d.set.Size() >= d.set.Limit()
return isEnoughPeers
}

peer := p
Expand Down Expand Up @@ -311,15 +318,19 @@ func (d *Discovery) handleDiscoveredPeer(ctx context.Context, peer peer.AddrInfo
logger := log.With("peer", peer.ID)
switch {
case peer.ID == d.host.ID():
d.metrics.observeHandlePeer(ctx, handlePeerSkipSelf)
logger.Debug("skip handle: self discovery")
return false
case len(peer.Addrs) == 0:
d.metrics.observeHandlePeer(ctx, handlePeerEmptyAddrs)
logger.Debug("skip handle: empty address list")
return false
case d.set.Size() >= d.set.Limit():
d.metrics.observeHandlePeer(ctx, handlePeerEnoughPeers)
logger.Debug("skip handle: enough peers found")
return false
case d.connector.HasBackoff(peer.ID):
d.metrics.observeHandlePeer(ctx, handlePeerBackoff)
logger.Debug("skip handle: backoff")
return false
}
Expand All @@ -330,6 +341,7 @@ func (d *Discovery) handleDiscoveredPeer(ctx context.Context, peer peer.AddrInfo
case network.NotConnected:
err := d.connector.Connect(ctx, peer)
if err != nil {
d.metrics.observeHandlePeer(ctx, handlePeerConnErr)
logger.Debugw("unable to connect", "err", err)
return false
}
Expand All @@ -338,10 +350,12 @@ func (d *Discovery) handleDiscoveredPeer(ctx context.Context, peer peer.AddrInfo
}

if !d.set.Add(peer.ID) {
d.metrics.observeHandlePeer(ctx, handlePeerInSet)
logger.Debug("peer is already in discovery set")
return false
}
d.onUpdatedPeers(peer.ID, true)
d.metrics.observeHandlePeer(ctx, handlePeerConnected)
logger.Debug("added peer to set")

// tag to protect peer from being killed by ConnManager
Expand Down
184 changes: 184 additions & 0 deletions share/availability/discovery/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package discovery

import (
"context"
"fmt"

"github.com/libp2p/go-libp2p/core/peer"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/global"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/instrument/asyncint64"
"go.opentelemetry.io/otel/metric/instrument/syncint64"
)

const (
discoveryEnougPeersKey = "enough_peers"
discoveryFindCancledKey = "is_canceled"

handlePeerResultKey = "result"
handlePeerSkipSelf handlePeerResult = "skip_self"
handlePeerEmptyAddrs handlePeerResult = "skip_empty_addresses"
handlePeerEnoughPeers handlePeerResult = "skip_enough_peers"
handlePeerBackoff handlePeerResult = "skip_backoff"
handlePeerConnected handlePeerResult = "connected"
handlePeerConnErr handlePeerResult = "conn_err"
handlePeerInSet handlePeerResult = "in_set"

advertiseFailedKey = "failed"
)

var (
meter = global.MeterProvider().Meter("share_discovery")
)

type handlePeerResult string

type metrics struct {
peersAmount asyncint64.Gauge
discoveryResult syncint64.Counter // attributes: enough_peers[bool],is_canceled[bool]
discoveryStuck syncint64.Counter
handlePeerResult syncint64.Counter // attributes: result[string]
advertise syncint64.Counter // attributes: failed[bool]
peerAdded syncint64.Counter
peerRemoved syncint64.Counter
}

// WithMetrics turns on metric collection in discoery.
func (d *Discovery) WithMetrics() error {
metrics, err := initMetrics(d)
if err != nil {
return fmt.Errorf("discovery: init metrics: %w", err)
}
d.metrics = metrics
d.WithOnPeersUpdate(metrics.observeOnPeersUpdate)
return nil
}

func initMetrics(d *Discovery) (*metrics, error) {
peersAmount, err := meter.AsyncInt64().Gauge("discovery_amount_of_peers",
instrument.WithDescription("amount of peers in discovery set"))
if err != nil {
return nil, err
}

discoveryResult, err := meter.SyncInt64().Counter("discovery_find_peers_result",
instrument.WithDescription("result of find peers run"))
if err != nil {
return nil, err
}

discoveryStuck, err := meter.SyncInt64().Counter("discovery_lookup_is_stuck",
instrument.WithDescription("indicates discovery wasn't able to find peers for more than 1 min"))
if err != nil {
return nil, err
}

handlePeerResultCounter, err := meter.SyncInt64().Counter("discovery_handler_peer_result",
instrument.WithDescription("result handling found peer"))
if err != nil {
return nil, err
}

advertise, err := meter.SyncInt64().Counter("discovery_advertise_event",
instrument.WithDescription("advertise events counter"))
if err != nil {
return nil, err
}

peerAdded, err := meter.SyncInt64().Counter("discovery_add_peer",
instrument.WithDescription("add peer to discovery set counter"))
if err != nil {
return nil, err
}

peerRemoved, err := meter.SyncInt64().Counter("discovery_remove_peer",
instrument.WithDescription("remove peer from discovery set counter"))
if err != nil {
return nil, err
}

metrics := &metrics{
peersAmount: peersAmount,
discoveryResult: discoveryResult,
discoveryStuck: discoveryStuck,
handlePeerResult: handlePeerResultCounter,
advertise: advertise,
peerAdded: peerAdded,
peerRemoved: peerRemoved,
}

err = meter.RegisterCallback(
[]instrument.Asynchronous{
peersAmount,
},
func(ctx context.Context) {
peersAmount.Observe(ctx, int64(d.set.Size()))
},
)
if err != nil {
return nil, fmt.Errorf("registering metrics callback: %w", err)
}
return metrics, nil
}

func (m *metrics) observeFindPeers(ctx context.Context, canceled, isEnoughPeers bool) {
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}

m.discoveryResult.Add(ctx, 1,
attribute.Bool(discoveryFindCancledKey, canceled),
attribute.Bool(discoveryEnougPeersKey, isEnoughPeers))
}

func (m *metrics) observeHandlePeer(ctx context.Context, result handlePeerResult) {
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}

m.handlePeerResult.Add(ctx, 1,
attribute.String(handlePeerResultKey, string(result)))
}

func (m *metrics) observeAdvertise(ctx context.Context, err error) {
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}

m.advertise.Add(ctx, 1,
attribute.Bool(advertiseFailedKey, err != nil))
}

func (m *metrics) observeOnPeersUpdate(_ peer.ID, isAdded bool) {
if m == nil {
return
}
ctx := context.Background()

if isAdded {
m.peerAdded.Add(ctx, 1)
return
}
m.peerRemoved.Add(ctx, 1)
}

func (m *metrics) observeDiscoveryStuck(ctx context.Context) {
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}

m.discoveryStuck.Add(ctx, 1)
}

0 comments on commit 687704e

Please sign in to comment.