Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(share/discovery): add discovery metrics #2155

Merged
merged 63 commits into from
May 11, 2023
Merged
Show file tree
Hide file tree
Changes from 62 commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
3e5abdb
prog
distractedm1nd Apr 20, 2023
e3b8595
only adding connected peers
distractedm1nd Apr 21, 2023
1f9a052
improve logic a bit
Wondertan Apr 21, 2023
f058a21
add connecting map
Wondertan Apr 21, 2023
48732ff
cleanup the connecting map
Wondertan Apr 21, 2023
3f3c287
limit connections
Wondertan Apr 21, 2023
0148dd2
dialTimeout -> connectTimeout
Wondertan Apr 21, 2023
23b590e
unlock on error
Wondertan Apr 24, 2023
9943c17
make connect timeout a constatnt
Wondertan Apr 24, 2023
e58c3d5
discovery parameters
Wondertan Apr 26, 2023
c717917
actually break the config, refine defaults and allow disabling discov…
Wondertan Apr 26, 2023
ab03470
fixing log and peer limit deactivation comment
distractedm1nd Apr 27, 2023
7cf7287
removing storage of ctx on Discovery
distractedm1nd Apr 27, 2023
86a8a06
notify subscribers directly after adding
distractedm1nd Apr 27, 2023
14adb1c
restarting FindPeers until limit is reached
distractedm1nd Apr 27, 2023
8b8941f
respecting ctx in findPeers loop
distractedm1nd Apr 27, 2023
c40f68e
start discovery asap
Wondertan Apr 26, 2023
4917621
case for already connected peer
Wondertan Apr 27, 2023
ca3a4be
protect rather than tag
Wondertan Apr 27, 2023
f7c413a
first working iteration for tests
Wondertan Apr 27, 2023
c5e0d00
set and backoff audit + remove retrying for FindPeers
Wondertan Apr 27, 2023
c382f16
the least flaky version of the test; 2/100 fails
Wondertan Apr 27, 2023
d96ade7
- stop wait group when enough peers found
walldiss Apr 28, 2023
2005330
log Error if Discovery is unable to find new peers for long time
walldiss Apr 28, 2023
37a159e
rerun findPeers if previous run stopped too early
walldiss Apr 28, 2023
82eb331
- do not connect to already connected peers
walldiss Apr 28, 2023
36deab8
improve logs
walldiss Apr 28, 2023
d83f2ef
increase time
Wondertan Apr 27, 2023
c154073
another attempt to deflake the test
Wondertan Apr 28, 2023
cf20647
various fixes
Wondertan Apr 28, 2023
d139979
update backoff logic to deflake test
Wondertan Apr 28, 2023
72548fb
more fixes
Wondertan Apr 28, 2023
2869a45
decrease backoff timeout
Wondertan Apr 28, 2023
c723d5d
drain ticker and timer channel before reset
walldiss Apr 28, 2023
439580b
unflake the test by removing unnecessary stop of the timer
Wondertan Apr 30, 2023
a4d2b27
log as found only valid peers
Wondertan May 2, 2023
a0b0c1c
add metrics for discovery
walldiss May 2, 2023
dd8abcf
fix shadow read for add peers to set
walldiss May 2, 2023
32050eb
another backoff cleanup
Wondertan May 2, 2023
18b4eb6
Merge branch 'discovery-dial-timeout' into discovery_metrics
walldiss May 2, 2023
de865a9
small fix
Wondertan May 2, 2023
0b56b79
add delay for findpeers retry
walldiss May 2, 2023
0796147
make findPeers fast retry delay a var to modify it in tests
walldiss May 2, 2023
8eed668
fix comment
Wondertan May 2, 2023
38d97e9
notify when we actually need the discovery, instead of periodic check…
Wondertan May 2, 2023
c8fb086
fix comment
Wondertan May 2, 2023
6cb5cdf
simplify the code further
Wondertan May 2, 2023
0ebf3af
rework tests to use real swarm. Mocknet has some bug regarding connec…
Wondertan May 2, 2023
3ba19e9
ensurePeers should depend on global state rather than local
Wondertan May 2, 2023
d8b85d6
int back to uint
Wondertan May 2, 2023
8e20342
apply review suggestions
walldiss May 3, 2023
2ebbf3c
Merge branch 'discovery-dial-timeout' into discovery_metrics
walldiss May 3, 2023
0b56f67
rebase
walldiss May 3, 2023
e88f671
Merge branch 'main-celestia' into discovery_metrics
walldiss May 3, 2023
1512b10
rebase
walldiss May 3, 2023
d498cf3
rebase
walldiss May 3, 2023
7dc8e98
Merge branch 'main-celestia' into discovery_metrics
walldiss May 3, 2023
6cb5cf9
rebase
walldiss May 3, 2023
6bfcb8a
- apply review suggestions
walldiss May 3, 2023
894ee4e
use user context for metrics observation
walldiss May 9, 2023
f722beb
Merge branch 'main-celestia' into discovery_metrics
walldiss May 9, 2023
bff848e
Merge branch 'main' into discovery_metrics
walldiss May 10, 2023
d170d2b
Merge branch 'main-celestia' into discovery_metrics
walldiss May 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions nodebuilder/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func WithMetrics(metricOpts []otlpmetrichttp.Option, nodeType node.Type, buildIn
fx.Invoke(fraud.WithMetrics),
fx.Invoke(node.WithMetrics),
fx.Invoke(modheader.WithMetrics),
fx.Invoke(share.WithDiscoveryMetrics),
)

samplingMetrics := fx.Options(
Expand Down
11 changes: 9 additions & 2 deletions nodebuilder/share/opts.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
package share

import (
disc "github.com/celestiaorg/celestia-node/share/availability/discovery"
"github.com/celestiaorg/celestia-node/share/getters"
"github.com/celestiaorg/celestia-node/share/p2p/peers"
"github.com/celestiaorg/celestia-node/share/p2p/shrexeds"
"github.com/celestiaorg/celestia-node/share/p2p/shrexnd"
)

// WithPeerManagerMetrics is a utility function that is expected to be
// "invoked" by the fx lifecycle.
// WithPeerManagerMetrics is a utility function to turn on peer manager metrics and that is
// expected to be "invoked" by the fx lifecycle.
func WithPeerManagerMetrics(m *peers.Manager) error {
return m.WithMetrics()
}

// WithDiscoveryMetrics is a utility function to turn on discovery metrics and that is expected to
// be "invoked" by the fx lifecycle.
func WithDiscoveryMetrics(d *disc.Discovery) error {
return d.WithMetrics()
}

func WithShrexClientMetrics(edsClient *shrexeds.Client, ndClient *shrexnd.Client) error {
err := edsClient.WithMetrics()
if err != nil {
Expand Down
16 changes: 15 additions & 1 deletion share/availability/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type Discovery struct {

triggerDisc chan struct{}

metrics *metrics

cancel context.CancelFunc

params Parameters
Expand Down Expand Up @@ -132,6 +134,7 @@ func (d *Discovery) Advertise(ctx context.Context) {
defer timer.Stop()
for {
_, err := d.disc.Advertise(ctx, rendezvousPoint)
d.metrics.observeAdvertise(ctx, err)
if err != nil {
if ctx.Err() != nil {
return
Expand Down Expand Up @@ -265,14 +268,18 @@ func (d *Discovery) discover(ctx context.Context) bool {
drainChannel(ticker.C)
select {
case <-findCtx.Done():
d.metrics.observeFindPeers(ctx, true, true)
return true
case <-ticker.C:
walldiss marked this conversation as resolved.
Show resolved Hide resolved
d.metrics.observeDiscoveryStuck(ctx)
log.Warn("wasn't able to find new peers for long time")
continue
case p, ok := <-peers:
if !ok {
isEnoughPeers := d.set.Size() >= d.set.Limit()
d.metrics.observeFindPeers(ctx, ctx.Err() != nil, isEnoughPeers)
log.Debugw("discovery channel closed", "find_is_canceled", findCtx.Err() != nil)
return d.set.Size() >= d.set.Limit()
return isEnoughPeers
}

peer := p
Expand Down Expand Up @@ -308,15 +315,19 @@ func (d *Discovery) handleDiscoveredPeer(ctx context.Context, peer peer.AddrInfo
logger := log.With("peer", peer.ID)
switch {
case peer.ID == d.host.ID():
d.metrics.observeHandlePeer(ctx, handlePeerSkipSelf)
logger.Debug("skip handle: self discovery")
return false
case len(peer.Addrs) == 0:
d.metrics.observeHandlePeer(ctx, handlePeerEmptyAddrs)
logger.Debug("skip handle: empty address list")
return false
case d.set.Size() >= d.set.Limit():
d.metrics.observeHandlePeer(ctx, handlePeerEnoughPeers)
logger.Debug("skip handle: enough peers found")
return false
case d.connector.HasBackoff(peer.ID):
d.metrics.observeHandlePeer(ctx, handlePeerBackoff)
logger.Debug("skip handle: backoff")
return false
}
Expand All @@ -327,6 +338,7 @@ func (d *Discovery) handleDiscoveredPeer(ctx context.Context, peer peer.AddrInfo
case network.NotConnected:
err := d.connector.Connect(ctx, peer)
if err != nil {
d.metrics.observeHandlePeer(ctx, handlePeerConnErr)
logger.Debugw("unable to connect", "err", err)
return false
}
Expand All @@ -335,10 +347,12 @@ func (d *Discovery) handleDiscoveredPeer(ctx context.Context, peer peer.AddrInfo
}

if !d.set.Add(peer.ID) {
d.metrics.observeHandlePeer(ctx, handlePeerInSet)
logger.Debug("peer is already in discovery set")
return false
}
d.onUpdatedPeers(peer.ID, true)
d.metrics.observeHandlePeer(ctx, handlePeerConnected)
logger.Debug("added peer to set")

// tag to protect peer from being killed by ConnManager
Expand Down
184 changes: 184 additions & 0 deletions share/availability/discovery/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package discovery

import (
"context"
"fmt"

"github.com/libp2p/go-libp2p/core/peer"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/global"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/instrument/asyncint64"
"go.opentelemetry.io/otel/metric/instrument/syncint64"
)

const (
discoveryEnougPeersKey = "enough_peers"
discoveryFindCancledKey = "is_canceled"

handlePeerResultKey = "result"
handlePeerSkipSelf handlePeerResult = "skip_self"
handlePeerEmptyAddrs handlePeerResult = "skip_empty_addresses"
handlePeerEnoughPeers handlePeerResult = "skip_enough_peers"
handlePeerBackoff handlePeerResult = "skip_backoff"
handlePeerConnected handlePeerResult = "connected"
handlePeerConnErr handlePeerResult = "conn_err"
handlePeerInSet handlePeerResult = "in_set"

advertiseFailedKey = "failed"
)

var (
meter = global.MeterProvider().Meter("share_discovery")
)

type handlePeerResult string

type metrics struct {
peersAmount asyncint64.Gauge
discoveryResult syncint64.Counter // attributes: enough_peers[bool],is_canceled[bool]
discoveryStuck syncint64.Counter
handlePeerResult syncint64.Counter // attributes: result[string]
advertise syncint64.Counter // attributes: failed[bool]
peerAdded syncint64.Counter
peerRemoved syncint64.Counter
}

// WithMetrics turns on metric collection in discoery.
func (d *Discovery) WithMetrics() error {
metrics, err := initMetrics(d)
if err != nil {
return fmt.Errorf("discovery: init metrics: %w", err)
}
d.metrics = metrics
d.WithOnPeersUpdate(metrics.observeOnPeersUpdate)
return nil
}

func initMetrics(d *Discovery) (*metrics, error) {
peersAmount, err := meter.AsyncInt64().Gauge("discovery_amount_of_peers",
instrument.WithDescription("amount of peers in discovery set"))
if err != nil {
return nil, err
}

discoveryResult, err := meter.SyncInt64().Counter("discovery_find_peers_result",
instrument.WithDescription("result of find peers run"))
if err != nil {
return nil, err
}

discoveryStuck, err := meter.SyncInt64().Counter("discovery_lookup_is_stuck",
instrument.WithDescription("indicates discovery wasn't able to find peers for more than 1 min"))
if err != nil {
return nil, err
}

handlePeerResultCounter, err := meter.SyncInt64().Counter("discovery_handler_peer_result",
instrument.WithDescription("result handling found peer"))
if err != nil {
return nil, err
}

advertise, err := meter.SyncInt64().Counter("discovery_advertise_event",
instrument.WithDescription("advertise events counter"))
if err != nil {
return nil, err
}

peerAdded, err := meter.SyncInt64().Counter("discovery_add_peer",
instrument.WithDescription("add peer to discovery set counter"))
if err != nil {
return nil, err
}

peerRemoved, err := meter.SyncInt64().Counter("discovery_remove_peer",
instrument.WithDescription("remove peer from discovery set counter"))
if err != nil {
return nil, err
}

metrics := &metrics{
peersAmount: peersAmount,
discoveryResult: discoveryResult,
discoveryStuck: discoveryStuck,
handlePeerResult: handlePeerResultCounter,
advertise: advertise,
peerAdded: peerAdded,
peerRemoved: peerRemoved,
}

err = meter.RegisterCallback(
[]instrument.Asynchronous{
peersAmount,
},
func(ctx context.Context) {
peersAmount.Observe(ctx, int64(d.set.Size()))
},
)
if err != nil {
return nil, fmt.Errorf("registering metrics callback: %w", err)
}
return metrics, nil
}

func (m *metrics) observeFindPeers(ctx context.Context, canceled, isEnoughPeers bool) {
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}

m.discoveryResult.Add(ctx, 1,
attribute.Bool(discoveryFindCancledKey, canceled),
attribute.Bool(discoveryEnougPeersKey, isEnoughPeers))
}

func (m *metrics) observeHandlePeer(ctx context.Context, result handlePeerResult) {
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}

m.handlePeerResult.Add(ctx, 1,
attribute.String(handlePeerResultKey, string(result)))
}

func (m *metrics) observeAdvertise(ctx context.Context, err error) {
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}

m.advertise.Add(ctx, 1,
attribute.Bool(advertiseFailedKey, err != nil))
}

func (m *metrics) observeOnPeersUpdate(_ peer.ID, isAdded bool) {
if m == nil {
return
}
ctx := context.Background()

if isAdded {
m.peerAdded.Add(ctx, 1)
return
}
m.peerRemoved.Add(ctx, 1)
}

func (m *metrics) observeDiscoveryStuck(ctx context.Context) {
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}

m.discoveryStuck.Add(ctx, 1)
}
3 changes: 2 additions & 1 deletion share/p2p/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ type Metrics struct {
totalRequestCounter syncint64.Counter
}

// ObserveRequests increments the total number of requests sent with the given status as an attribute.
// ObserveRequests increments the total number of requests sent with the given status as an
// attribute.
func (m *Metrics) ObserveRequests(count int64, status status) {
if m == nil {
return
Expand Down
Loading