diff --git a/nodebuilder/settings.go b/nodebuilder/settings.go index 8da9f90da6..bea3c78ad2 100644 --- a/nodebuilder/settings.go +++ b/nodebuilder/settings.go @@ -71,6 +71,7 @@ func WithMetrics(metricOpts []otlpmetrichttp.Option, nodeType node.Type, buildIn fx.Invoke(fraud.WithMetrics), fx.Invoke(node.WithMetrics), fx.Invoke(modheader.WithMetrics), + fx.Invoke(share.WithDiscoveryMetrics), ) samplingMetrics := fx.Options( diff --git a/nodebuilder/share/opts.go b/nodebuilder/share/opts.go index dc63a4ca1e..cba949882b 100644 --- a/nodebuilder/share/opts.go +++ b/nodebuilder/share/opts.go @@ -1,18 +1,25 @@ package share import ( + disc "github.com/celestiaorg/celestia-node/share/availability/discovery" "github.com/celestiaorg/celestia-node/share/getters" "github.com/celestiaorg/celestia-node/share/p2p/peers" "github.com/celestiaorg/celestia-node/share/p2p/shrexeds" "github.com/celestiaorg/celestia-node/share/p2p/shrexnd" ) -// WithPeerManagerMetrics is a utility function that is expected to be -// "invoked" by the fx lifecycle. +// WithPeerManagerMetrics is a utility function to turn on peer manager metrics and that is +// expected to be "invoked" by the fx lifecycle. func WithPeerManagerMetrics(m *peers.Manager) error { return m.WithMetrics() } +// WithDiscoveryMetrics is a utility function to turn on discovery metrics and that is expected to +// be "invoked" by the fx lifecycle. +func WithDiscoveryMetrics(d *disc.Discovery) error { + return d.WithMetrics() +} + func WithShrexClientMetrics(edsClient *shrexeds.Client, ndClient *shrexnd.Client) error { err := edsClient.WithMetrics() if err != nil { diff --git a/share/availability/discovery/discovery.go b/share/availability/discovery/discovery.go index 913b66d328..f1db6e0797 100644 --- a/share/availability/discovery/discovery.go +++ b/share/availability/discovery/discovery.go @@ -50,6 +50,8 @@ type Discovery struct { triggerDisc chan struct{} + metrics *metrics + cancel context.CancelFunc params Parameters @@ -132,6 +134,7 @@ func (d *Discovery) Advertise(ctx context.Context) { defer timer.Stop() for { _, err := d.disc.Advertise(ctx, rendezvousPoint) + d.metrics.observeAdvertise(ctx, err) if err != nil { if ctx.Err() != nil { return @@ -268,14 +271,18 @@ func (d *Discovery) discover(ctx context.Context) bool { drainChannel(ticker.C) select { case <-findCtx.Done(): + d.metrics.observeFindPeers(ctx, true, true) return true case <-ticker.C: + d.metrics.observeDiscoveryStuck(ctx) log.Warn("wasn't able to find new peers for long time") continue case p, ok := <-peers: if !ok { + isEnoughPeers := d.set.Size() >= d.set.Limit() + d.metrics.observeFindPeers(ctx, ctx.Err() != nil, isEnoughPeers) log.Debugw("discovery channel closed", "find_is_canceled", findCtx.Err() != nil) - return d.set.Size() >= d.set.Limit() + return isEnoughPeers } peer := p @@ -311,15 +318,19 @@ func (d *Discovery) handleDiscoveredPeer(ctx context.Context, peer peer.AddrInfo logger := log.With("peer", peer.ID) switch { case peer.ID == d.host.ID(): + d.metrics.observeHandlePeer(ctx, handlePeerSkipSelf) logger.Debug("skip handle: self discovery") return false case len(peer.Addrs) == 0: + d.metrics.observeHandlePeer(ctx, handlePeerEmptyAddrs) logger.Debug("skip handle: empty address list") return false case d.set.Size() >= d.set.Limit(): + d.metrics.observeHandlePeer(ctx, handlePeerEnoughPeers) logger.Debug("skip handle: enough peers found") return false case d.connector.HasBackoff(peer.ID): + d.metrics.observeHandlePeer(ctx, handlePeerBackoff) logger.Debug("skip handle: backoff") return false } @@ -330,6 +341,7 @@ func (d *Discovery) handleDiscoveredPeer(ctx context.Context, peer peer.AddrInfo case network.NotConnected: err := d.connector.Connect(ctx, peer) if err != nil { + d.metrics.observeHandlePeer(ctx, handlePeerConnErr) logger.Debugw("unable to connect", "err", err) return false } @@ -338,10 +350,12 @@ func (d *Discovery) handleDiscoveredPeer(ctx context.Context, peer peer.AddrInfo } if !d.set.Add(peer.ID) { + d.metrics.observeHandlePeer(ctx, handlePeerInSet) logger.Debug("peer is already in discovery set") return false } d.onUpdatedPeers(peer.ID, true) + d.metrics.observeHandlePeer(ctx, handlePeerConnected) logger.Debug("added peer to set") // tag to protect peer from being killed by ConnManager diff --git a/share/availability/discovery/metrics.go b/share/availability/discovery/metrics.go new file mode 100644 index 0000000000..9edd364185 --- /dev/null +++ b/share/availability/discovery/metrics.go @@ -0,0 +1,184 @@ +package discovery + +import ( + "context" + "fmt" + + "github.com/libp2p/go-libp2p/core/peer" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric/global" + "go.opentelemetry.io/otel/metric/instrument" + "go.opentelemetry.io/otel/metric/instrument/asyncint64" + "go.opentelemetry.io/otel/metric/instrument/syncint64" +) + +const ( + discoveryEnougPeersKey = "enough_peers" + discoveryFindCancledKey = "is_canceled" + + handlePeerResultKey = "result" + handlePeerSkipSelf handlePeerResult = "skip_self" + handlePeerEmptyAddrs handlePeerResult = "skip_empty_addresses" + handlePeerEnoughPeers handlePeerResult = "skip_enough_peers" + handlePeerBackoff handlePeerResult = "skip_backoff" + handlePeerConnected handlePeerResult = "connected" + handlePeerConnErr handlePeerResult = "conn_err" + handlePeerInSet handlePeerResult = "in_set" + + advertiseFailedKey = "failed" +) + +var ( + meter = global.MeterProvider().Meter("share_discovery") +) + +type handlePeerResult string + +type metrics struct { + peersAmount asyncint64.Gauge + discoveryResult syncint64.Counter // attributes: enough_peers[bool],is_canceled[bool] + discoveryStuck syncint64.Counter + handlePeerResult syncint64.Counter // attributes: result[string] + advertise syncint64.Counter // attributes: failed[bool] + peerAdded syncint64.Counter + peerRemoved syncint64.Counter +} + +// WithMetrics turns on metric collection in discoery. +func (d *Discovery) WithMetrics() error { + metrics, err := initMetrics(d) + if err != nil { + return fmt.Errorf("discovery: init metrics: %w", err) + } + d.metrics = metrics + d.WithOnPeersUpdate(metrics.observeOnPeersUpdate) + return nil +} + +func initMetrics(d *Discovery) (*metrics, error) { + peersAmount, err := meter.AsyncInt64().Gauge("discovery_amount_of_peers", + instrument.WithDescription("amount of peers in discovery set")) + if err != nil { + return nil, err + } + + discoveryResult, err := meter.SyncInt64().Counter("discovery_find_peers_result", + instrument.WithDescription("result of find peers run")) + if err != nil { + return nil, err + } + + discoveryStuck, err := meter.SyncInt64().Counter("discovery_lookup_is_stuck", + instrument.WithDescription("indicates discovery wasn't able to find peers for more than 1 min")) + if err != nil { + return nil, err + } + + handlePeerResultCounter, err := meter.SyncInt64().Counter("discovery_handler_peer_result", + instrument.WithDescription("result handling found peer")) + if err != nil { + return nil, err + } + + advertise, err := meter.SyncInt64().Counter("discovery_advertise_event", + instrument.WithDescription("advertise events counter")) + if err != nil { + return nil, err + } + + peerAdded, err := meter.SyncInt64().Counter("discovery_add_peer", + instrument.WithDescription("add peer to discovery set counter")) + if err != nil { + return nil, err + } + + peerRemoved, err := meter.SyncInt64().Counter("discovery_remove_peer", + instrument.WithDescription("remove peer from discovery set counter")) + if err != nil { + return nil, err + } + + metrics := &metrics{ + peersAmount: peersAmount, + discoveryResult: discoveryResult, + discoveryStuck: discoveryStuck, + handlePeerResult: handlePeerResultCounter, + advertise: advertise, + peerAdded: peerAdded, + peerRemoved: peerRemoved, + } + + err = meter.RegisterCallback( + []instrument.Asynchronous{ + peersAmount, + }, + func(ctx context.Context) { + peersAmount.Observe(ctx, int64(d.set.Size())) + }, + ) + if err != nil { + return nil, fmt.Errorf("registering metrics callback: %w", err) + } + return metrics, nil +} + +func (m *metrics) observeFindPeers(ctx context.Context, canceled, isEnoughPeers bool) { + if m == nil { + return + } + if ctx.Err() != nil { + ctx = context.Background() + } + + m.discoveryResult.Add(ctx, 1, + attribute.Bool(discoveryFindCancledKey, canceled), + attribute.Bool(discoveryEnougPeersKey, isEnoughPeers)) +} + +func (m *metrics) observeHandlePeer(ctx context.Context, result handlePeerResult) { + if m == nil { + return + } + if ctx.Err() != nil { + ctx = context.Background() + } + + m.handlePeerResult.Add(ctx, 1, + attribute.String(handlePeerResultKey, string(result))) +} + +func (m *metrics) observeAdvertise(ctx context.Context, err error) { + if m == nil { + return + } + if ctx.Err() != nil { + ctx = context.Background() + } + + m.advertise.Add(ctx, 1, + attribute.Bool(advertiseFailedKey, err != nil)) +} + +func (m *metrics) observeOnPeersUpdate(_ peer.ID, isAdded bool) { + if m == nil { + return + } + ctx := context.Background() + + if isAdded { + m.peerAdded.Add(ctx, 1) + return + } + m.peerRemoved.Add(ctx, 1) +} + +func (m *metrics) observeDiscoveryStuck(ctx context.Context) { + if m == nil { + return + } + if ctx.Err() != nil { + ctx = context.Background() + } + + m.discoveryStuck.Add(ctx, 1) +}