diff --git a/pkg/node/node.go b/pkg/node/node.go index 732aec4e412..81189a8f44c 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -652,6 +652,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, debugAPIService.MustRegisterMetrics(pingPong.Metrics()...) debugAPIService.MustRegisterMetrics(acc.Metrics()...) debugAPIService.MustRegisterMetrics(storer.Metrics()...) + debugAPIService.MustRegisterMetrics(kad.Metrics()...) if pullerService != nil { debugAPIService.MustRegisterMetrics(pullerService.Metrics()...) diff --git a/pkg/topology/kademlia/kademlia.go b/pkg/topology/kademlia/kademlia.go index 93eb292b149..023014f1018 100644 --- a/pkg/topology/kademlia/kademlia.go +++ b/pkg/topology/kademlia/kademlia.go @@ -23,7 +23,7 @@ import ( "github.com/ethersphere/bee/pkg/shed" "github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/topology" - "github.com/ethersphere/bee/pkg/topology/kademlia/internal/metrics" + im "github.com/ethersphere/bee/pkg/topology/kademlia/internal/metrics" "github.com/ethersphere/bee/pkg/topology/kademlia/internal/waitnext" "github.com/ethersphere/bee/pkg/topology/pslice" ma "github.com/multiformats/go-multiaddr" @@ -89,12 +89,13 @@ type Kad struct { logger logging.Logger // logger standalone bool // indicates whether the node is working in standalone mode bootnode bool // indicates whether the node is working in bootnode mode - collector *metrics.Collector + collector *im.Collector quit chan struct{} // quit channel halt chan struct{} // halt channel done chan struct{} // signal that `manage` has quit wg sync.WaitGroup waitNext *waitnext.WaitNext + metrics metrics } // New returns a new Kademlia. @@ -134,11 +135,12 @@ func New( logger: logger, standalone: o.StandaloneMode, bootnode: o.BootnodeMode, - collector: metrics.NewCollector(metricsDB), + collector: im.NewCollector(metricsDB), quit: make(chan struct{}), halt: make(chan struct{}), done: make(chan struct{}), wg: sync.WaitGroup{}, + metrics: newMetrics(), } if k.bitSuffixLength > 0 { @@ -235,7 +237,11 @@ type peerConnInfo struct { // connectBalanced attempts to connect to the balanced peers first. func (k *Kad) connectBalanced(wg *sync.WaitGroup, peerConnChan chan<- *peerConnInfo) { skipPeers := func(peer swarm.Address) bool { - return k.waitNext.Waiting(peer) + if k.waitNext.Waiting(peer) { + k.metrics.TotalBeforeExpireWaits.Inc() + return true + } + return false } for i := range k.commonBinPrefixes { @@ -309,6 +315,7 @@ func (k *Kad) connectNeighbours(wg *sync.WaitGroup, peerConnChan chan<- *peerCon } if k.waitNext.Waiting(addr) { + k.metrics.TotalBeforeExpireWaits.Inc() return false, false, nil } @@ -371,7 +378,8 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup, k.connectedPeers.Add(peer.addr) - k.collector.Record(peer.addr, metrics.PeerLogIn(time.Now(), metrics.PeerConnectionDirectionOutbound)) + k.metrics.TotalOutboundConnections.Inc() + k.collector.Record(peer.addr, im.PeerLogIn(time.Now(), im.PeerConnectionDirectionOutbound)) k.depthMu.Lock() k.depth = recalcDepth(k.connectedPeers, k.radius) @@ -398,6 +406,7 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup, addr := peer.addr.String() if k.waitNext.Waiting(peer.addr) { + k.metrics.TotalBeforeExpireWaits.Inc() wg.Done() continue } @@ -450,8 +459,11 @@ func (k *Kad) manage() { case <-k.quit: return case <-time.After(30 * time.Second): + start := time.Now() if err := k.collector.Flush(); err != nil { k.logger.Debugf("kademlia: unable to flush metrics counters to the persistent store: %v", err) + } else { + k.metrics.InternalMetricsFlushTime.Observe(float64(time.Since(start).Nanoseconds())) } k.notifyManageLoop() case <-k.manageC: @@ -474,13 +486,24 @@ func (k *Kad) manage() { k.connectBalanced(&wg, peerConnChan) k.connectNeighbours(&wg, peerConnChan) wg.Wait() + + k.depthMu.Lock() + depth := k.depth + radius := k.radius + k.depthMu.Unlock() + k.logger.Tracef( "kademlia: connector took %s to finish: old depth %d; new depth %d", time.Since(start), oldDepth, - k.NeighborhoodDepth(), + depth, ) + k.metrics.CurrentDepth.Set(float64(depth)) + k.metrics.CurrentRadius.Set(float64(radius)) + k.metrics.CurrentlyKnownPeers.Set(float64(k.knownPeers.Length())) + k.metrics.CurrentlyConnectedPeers.Set(float64(k.connectedPeers.Length())) + if k.connectedPeers.Length() == 0 { select { case <-k.halt: @@ -527,6 +550,8 @@ func (k *Kad) connectBootnodes(ctx context.Context) { bzzAddress, err := k.p2p.Connect(ctx, addr) attempts++ + k.metrics.TotalBootNodesConnectionAttempts.Inc() + if err != nil { if errors.Is(err, p2p.ErrDialLightNode) { k.logger.Debugf("connect fail %s: %v", addr, err) @@ -654,6 +679,8 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr) ctx, cancel := context.WithTimeout(ctx, peerConnectionAttemptTimeout) defer cancel() + k.metrics.TotalOutboundConnectionAttempts.Inc() + switch i, err := k.p2p.Connect(ctx, ma); { case errors.Is(err, p2p.ErrDialLightNode): return errPruneEntry @@ -677,9 +704,10 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr) failedAttempts++ } - k.collector.Record(peer, metrics.IncSessionConnectionRetry()) + k.metrics.TotalOutboundConnectionFailedAttempts.Inc() + k.collector.Record(peer, im.IncSessionConnectionRetry()) - k.collector.Inspect(peer, func(ss *metrics.Snapshot) { + k.collector.Inspect(peer, func(ss *im.Snapshot) { quickPrune := ss == nil || ss.HasAtMaxOneConnectionAttempt() if (k.connectedPeers.Length() > 0 && quickPrune) || failedAttempts > maxConnAttempts { @@ -758,6 +786,7 @@ func (k *Kad) AddPeers(addrs ...swarm.Address) { } func (k *Kad) Pick(peer p2p.Peer) bool { + k.metrics.PickCalls.Inc() if k.bootnode { // shortcircuit for bootnode mode - always accept connections, // at least until we find a better solution. @@ -766,7 +795,11 @@ func (k *Kad) Pick(peer p2p.Peer) bool { po := swarm.Proximity(k.base.Bytes(), peer.Address.Bytes()) _, oversaturated := k.saturationFunc(po, k.knownPeers, k.connectedPeers) // pick the peer if we are not oversaturated - return !oversaturated + if !oversaturated { + return true + } + k.metrics.PickCallsFalse.Inc() + return false } // Connected is called when a peer has dialed in. @@ -805,7 +838,8 @@ func (k *Kad) connected(ctx context.Context, addr swarm.Address) error { k.knownPeers.Add(addr) k.connectedPeers.Add(addr) - k.collector.Record(addr, metrics.PeerLogIn(time.Now(), metrics.PeerConnectionDirectionInbound)) + k.metrics.TotalInboundConnections.Inc() + k.collector.Record(addr, im.PeerLogIn(time.Now(), im.PeerConnectionDirectionInbound)) k.waitNext.Remove(addr) @@ -827,7 +861,8 @@ func (k *Kad) Disconnected(peer p2p.Peer) { k.waitNext.SetTryAfter(peer.Address, time.Now().Add(timeToRetry)) - k.collector.Record(peer.Address, metrics.PeerLogOut(time.Now())) + k.metrics.TotalInboundDisconnections.Inc() + k.collector.Record(peer.Address, im.PeerLogOut(time.Now())) k.depthMu.Lock() k.depth = recalcDepth(k.connectedPeers, k.radius) @@ -1254,7 +1289,7 @@ func (k *Kad) randomPeer(bin uint8) (swarm.Address, error) { // createMetricsSnapshotView creates new topology.MetricSnapshotView from the // given metrics.Snapshot and rounds all the timestamps and durations to its // nearest second. -func createMetricsSnapshotView(ss *metrics.Snapshot) *topology.MetricSnapshotView { +func createMetricsSnapshotView(ss *im.Snapshot) *topology.MetricSnapshotView { if ss == nil { return nil } diff --git a/pkg/topology/kademlia/metrics.go b/pkg/topology/kademlia/metrics.go new file mode 100644 index 00000000000..d964e54e8a7 --- /dev/null +++ b/pkg/topology/kademlia/metrics.go @@ -0,0 +1,124 @@ +// Copyright 2021 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package kademlia + +import ( + m "github.com/ethersphere/bee/pkg/metrics" + "github.com/prometheus/client_golang/prometheus" +) + +// metrics groups kademlia related prometheus counters. +type metrics struct { + PickCalls prometheus.Counter + PickCallsFalse prometheus.Counter + CurrentDepth prometheus.Gauge + CurrentRadius prometheus.Gauge + CurrentlyKnownPeers prometheus.Gauge + CurrentlyConnectedPeers prometheus.Gauge + InternalMetricsFlushTime prometheus.Histogram + TotalBeforeExpireWaits prometheus.Counter + TotalInboundConnections prometheus.Counter + TotalInboundDisconnections prometheus.Counter + TotalOutboundConnections prometheus.Counter + TotalOutboundConnectionAttempts prometheus.Counter + TotalOutboundConnectionFailedAttempts prometheus.Counter + TotalBootNodesConnectionAttempts prometheus.Counter +} + +// newMetrics is a convenient constructor for creating new metrics. +func newMetrics() metrics { + const subsystem = "kademlia" + + return metrics{ + PickCalls: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "pick_calls", + Help: "The number of pick method call made.", + }), + PickCallsFalse: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "pick_calls_false", + Help: "The number of pick method call made which returned false.", + }), + CurrentDepth: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "current_depth", + Help: "The current value of depth.", + }), + CurrentRadius: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "current_radius", + Help: "The current value of radius.", + }), + CurrentlyKnownPeers: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "currently_known_peers", + Help: "Number of currently known peers", + }), + CurrentlyConnectedPeers: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "currently_connected_peers", + Help: "Number of currently connected peers", + }), + InternalMetricsFlushTime: prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "internal_metrics_flush_time", + Help: "Time spent flushing the internal metrics about peers to the state-store", + }), + TotalBeforeExpireWaits: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "total_before_expire_waits", + Help: "Total before expire waits made.", + }), + TotalInboundConnections: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "total_inbound_connections", + Help: "Total inbound connections made.", + }), + TotalInboundDisconnections: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "total_inbound_disconnections", + Help: "Total inbound disconnections made.", + }), + TotalOutboundConnections: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "total_outbound_connections", + Help: "Total outbound connections made.", + }), + TotalOutboundConnectionAttempts: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "total_outbound_connection_attempts", + Help: "Total outbound connection attempts made.", + }), + TotalOutboundConnectionFailedAttempts: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "total_outbound_connection_failed_attempts", + Help: "Total outbound connection failed attempts made.", + }), + TotalBootNodesConnectionAttempts: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "total_bootnodes_connection_attempts", + Help: "Total boot-nodes connection attempts made.", + })} +} + +// Metrics returns set of prometheus collectors. +func (k *Kad) Metrics() []prometheus.Collector { + return m.PrometheusCollectorsFromFields(k.metrics) +}