Skip to content

Commit

Permalink
feat(kademlia): add prometheus metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Mrekaj committed Jun 3, 2021
1 parent 3e60bfc commit 1e0f14b
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 12 deletions.
1 change: 1 addition & 0 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
debugAPIService.MustRegisterMetrics(pingPong.Metrics()...)
debugAPIService.MustRegisterMetrics(acc.Metrics()...)
debugAPIService.MustRegisterMetrics(storer.Metrics()...)
debugAPIService.MustRegisterMetrics(kad.Metrics()...)

if pullerService != nil {
debugAPIService.MustRegisterMetrics(pullerService.Metrics()...)
Expand Down
60 changes: 48 additions & 12 deletions pkg/topology/kademlia/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/topology/kademlia/internal/metrics"
im "github.com/ethersphere/bee/pkg/topology/kademlia/internal/metrics"
"github.com/ethersphere/bee/pkg/topology/kademlia/internal/waitnext"
"github.com/ethersphere/bee/pkg/topology/pslice"
ma "github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -89,12 +89,13 @@ type Kad struct {
logger logging.Logger // logger
standalone bool // indicates whether the node is working in standalone mode
bootnode bool // indicates whether the node is working in bootnode mode
collector *metrics.Collector
collector *im.Collector
quit chan struct{} // quit channel
halt chan struct{} // halt channel
done chan struct{} // signal that `manage` has quit
wg sync.WaitGroup
waitNext *waitnext.WaitNext
metrics metrics
}

// New returns a new Kademlia.
Expand Down Expand Up @@ -134,11 +135,12 @@ func New(
logger: logger,
standalone: o.StandaloneMode,
bootnode: o.BootnodeMode,
collector: metrics.NewCollector(metricsDB),
collector: im.NewCollector(metricsDB),
quit: make(chan struct{}),
halt: make(chan struct{}),
done: make(chan struct{}),
wg: sync.WaitGroup{},
metrics: newMetrics(),
}

if k.bitSuffixLength > 0 {
Expand Down Expand Up @@ -235,7 +237,11 @@ type peerConnInfo struct {
// connectBalanced attempts to connect to the balanced peers first.
func (k *Kad) connectBalanced(wg *sync.WaitGroup, peerConnChan chan<- *peerConnInfo) {
skipPeers := func(peer swarm.Address) bool {
return k.waitNext.Waiting(peer)
if k.waitNext.Waiting(peer) {
k.metrics.TotalBeforeExpireWaits.Inc()
return true
}
return false
}

for i := range k.commonBinPrefixes {
Expand Down Expand Up @@ -309,6 +315,7 @@ func (k *Kad) connectNeighbours(wg *sync.WaitGroup, peerConnChan chan<- *peerCon
}

if k.waitNext.Waiting(addr) {
k.metrics.TotalBeforeExpireWaits.Inc()
return false, false, nil
}

Expand Down Expand Up @@ -371,7 +378,8 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup,

k.connectedPeers.Add(peer.addr)

k.collector.Record(peer.addr, metrics.PeerLogIn(time.Now(), metrics.PeerConnectionDirectionOutbound))
k.metrics.TotalOutboundConnections.Inc()
k.collector.Record(peer.addr, im.PeerLogIn(time.Now(), im.PeerConnectionDirectionOutbound))

k.depthMu.Lock()
k.depth = recalcDepth(k.connectedPeers, k.radius)
Expand All @@ -398,6 +406,7 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup,
addr := peer.addr.String()

if k.waitNext.Waiting(peer.addr) {
k.metrics.TotalBeforeExpireWaits.Inc()
wg.Done()
continue
}
Expand Down Expand Up @@ -450,8 +459,12 @@ func (k *Kad) manage() {
case <-k.quit:
return
case <-time.After(30 * time.Second):
start := time.Now()
if err := k.collector.Flush(); err != nil {
k.metrics.InternalMetricsFlushTotalErrors.Inc()
k.logger.Debugf("kademlia: unable to flush metrics counters to the persistent store: %v", err)
} else {
k.metrics.InternalMetricsFlushTime.Observe(float64(time.Since(start).Nanoseconds()))
}
k.notifyManageLoop()
case <-k.manageC:
Expand All @@ -474,13 +487,24 @@ func (k *Kad) manage() {
k.connectBalanced(&wg, peerConnChan)
k.connectNeighbours(&wg, peerConnChan)
wg.Wait()

k.depthMu.Lock()
depth := k.depth
radius := k.radius
k.depthMu.Unlock()

k.logger.Tracef(
"kademlia: connector took %s to finish: old depth %d; new depth %d",
time.Since(start),
oldDepth,
k.NeighborhoodDepth(),
depth,
)

k.metrics.CurrentDepth.Set(float64(depth))
k.metrics.CurrentRadius.Set(float64(radius))
k.metrics.CurrentlyKnownPeers.Set(float64(k.knownPeers.Length()))
k.metrics.CurrentlyConnectedPeers.Set(float64(k.connectedPeers.Length()))

if k.connectedPeers.Length() == 0 {
select {
case <-k.halt:
Expand Down Expand Up @@ -527,6 +551,8 @@ func (k *Kad) connectBootnodes(ctx context.Context) {
bzzAddress, err := k.p2p.Connect(ctx, addr)

attempts++
k.metrics.TotalBootNodesConnectionAttempts.Inc()

if err != nil {
if errors.Is(err, p2p.ErrDialLightNode) {
k.logger.Debugf("connect fail %s: %v", addr, err)
Expand Down Expand Up @@ -654,6 +680,8 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr)
ctx, cancel := context.WithTimeout(ctx, peerConnectionAttemptTimeout)
defer cancel()

k.metrics.TotalOutboundConnectionAttempts.Inc()

switch i, err := k.p2p.Connect(ctx, ma); {
case errors.Is(err, p2p.ErrDialLightNode):
return errPruneEntry
Expand All @@ -677,9 +705,10 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr)
failedAttempts++
}

k.collector.Record(peer, metrics.IncSessionConnectionRetry())
k.metrics.TotalOutboundConnectionFailedAttempts.Inc()
k.collector.Record(peer, im.IncSessionConnectionRetry())

k.collector.Inspect(peer, func(ss *metrics.Snapshot) {
k.collector.Inspect(peer, func(ss *im.Snapshot) {
quickPrune := ss == nil || ss.HasAtMaxOneConnectionAttempt()

if (k.connectedPeers.Length() > 0 && quickPrune) || failedAttempts > maxConnAttempts {
Expand Down Expand Up @@ -758,6 +787,7 @@ func (k *Kad) AddPeers(addrs ...swarm.Address) {
}

func (k *Kad) Pick(peer p2p.Peer) bool {
k.metrics.PickCalls.Inc()
if k.bootnode {
// shortcircuit for bootnode mode - always accept connections,
// at least until we find a better solution.
Expand All @@ -766,7 +796,11 @@ func (k *Kad) Pick(peer p2p.Peer) bool {
po := swarm.Proximity(k.base.Bytes(), peer.Address.Bytes())
_, oversaturated := k.saturationFunc(po, k.knownPeers, k.connectedPeers)
// pick the peer if we are not oversaturated
return !oversaturated
if !oversaturated {
return true
}
k.metrics.PickCallsFalse.Inc()
return false
}

// Connected is called when a peer has dialed in.
Expand Down Expand Up @@ -805,7 +839,8 @@ func (k *Kad) connected(ctx context.Context, addr swarm.Address) error {
k.knownPeers.Add(addr)
k.connectedPeers.Add(addr)

k.collector.Record(addr, metrics.PeerLogIn(time.Now(), metrics.PeerConnectionDirectionInbound))
k.metrics.TotalInboundConnections.Inc()
k.collector.Record(addr, im.PeerLogIn(time.Now(), im.PeerConnectionDirectionInbound))

k.waitNext.Remove(addr)

Expand All @@ -827,7 +862,8 @@ func (k *Kad) Disconnected(peer p2p.Peer) {

k.waitNext.SetTryAfter(peer.Address, time.Now().Add(timeToRetry))

k.collector.Record(peer.Address, metrics.PeerLogOut(time.Now()))
k.metrics.TotalInboundDisconnections.Inc()
k.collector.Record(peer.Address, im.PeerLogOut(time.Now()))

k.depthMu.Lock()
k.depth = recalcDepth(k.connectedPeers, k.radius)
Expand Down Expand Up @@ -1254,7 +1290,7 @@ func (k *Kad) randomPeer(bin uint8) (swarm.Address, error) {
// createMetricsSnapshotView creates new topology.MetricSnapshotView from the
// given metrics.Snapshot and rounds all the timestamps and durations to its
// nearest second.
func createMetricsSnapshotView(ss *metrics.Snapshot) *topology.MetricSnapshotView {
func createMetricsSnapshotView(ss *im.Snapshot) *topology.MetricSnapshotView {
if ss == nil {
return nil
}
Expand Down
131 changes: 131 additions & 0 deletions pkg/topology/kademlia/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package kademlia

import (
m "github.com/ethersphere/bee/pkg/metrics"
"github.com/prometheus/client_golang/prometheus"
)

// metrics groups kademlia related prometheus counters.
type metrics struct {
PickCalls prometheus.Counter
PickCallsFalse prometheus.Counter
CurrentDepth prometheus.Gauge
CurrentRadius prometheus.Gauge
CurrentlyKnownPeers prometheus.Gauge
CurrentlyConnectedPeers prometheus.Gauge
InternalMetricsFlushTime prometheus.Histogram
InternalMetricsFlushTotalErrors prometheus.Counter
TotalBeforeExpireWaits prometheus.Counter
TotalInboundConnections prometheus.Counter
TotalInboundDisconnections prometheus.Counter
TotalOutboundConnections prometheus.Counter
TotalOutboundConnectionAttempts prometheus.Counter
TotalOutboundConnectionFailedAttempts prometheus.Counter
TotalBootNodesConnectionAttempts prometheus.Counter
}

// newMetrics is a convenient constructor for creating new metrics.
func newMetrics() metrics {
const subsystem = "kademlia"

return metrics{
PickCalls: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "pick_calls",
Help: "The number of pick method call made.",
}),
PickCallsFalse: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "pick_calls_false",
Help: "The number of pick method call made which returned false.",
}),
CurrentDepth: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "current_depth",
Help: "The current value of depth.",
}),
CurrentRadius: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "current_radius",
Help: "The current value of radius.",
}),
CurrentlyKnownPeers: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "currently_known_peers",
Help: "Number of currently known peers.",
}),
CurrentlyConnectedPeers: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "currently_connected_peers",
Help: "Number of currently connected peers.",
}),
InternalMetricsFlushTime: prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "internal_metrics_flush_time",
Help: "The time spent flushing the internal metrics about peers to the state-store.",
}),
InternalMetricsFlushTotalErrors: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "internal_metrics_flush_total_errors",
Help: "Number of total errors occurred during flushing the internal metrics to the state-store.",
}),
TotalBeforeExpireWaits: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "total_before_expire_waits",
Help: "Total before expire waits made.",
}),
TotalInboundConnections: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "total_inbound_connections",
Help: "Total inbound connections made.",
}),
TotalInboundDisconnections: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "total_inbound_disconnections",
Help: "Total inbound disconnections made.",
}),
TotalOutboundConnections: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "total_outbound_connections",
Help: "Total outbound connections made.",
}),
TotalOutboundConnectionAttempts: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "total_outbound_connection_attempts",
Help: "Total outbound connection attempts made.",
}),
TotalOutboundConnectionFailedAttempts: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "total_outbound_connection_failed_attempts",
Help: "Total outbound connection failed attempts made.",
}),
TotalBootNodesConnectionAttempts: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "total_bootnodes_connection_attempts",
Help: "Total boot-nodes connection attempts made.",
})}
}

// Metrics returns set of prometheus collectors.
func (k *Kad) Metrics() []prometheus.Collector {
return m.PrometheusCollectorsFromFields(k.metrics)
}

0 comments on commit 1e0f14b

Please sign in to comment.