Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(metrics): Unregister callback on stop for several metrics implementations #3281

Merged
merged 7 commits into from
Apr 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions das/daser.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ func (d *DASer) Stop(ctx context.Context) error {
}

d.cancel()

if err := d.sampler.metrics.close(); err != nil {
log.Warnw("closing metrics", "err", err)
}

if err = d.sampler.wait(ctx); err != nil {
return fmt.Errorf("DASer force quit: %w", err)
}
Expand Down
11 changes: 10 additions & 1 deletion das/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type metrics struct {
newHead metric.Int64Counter

lastSampledTS uint64

clientReg metric.Registration
}

func (d *DASer) InitMetrics() error {
Expand Down Expand Up @@ -119,7 +121,7 @@ func (d *DASer) InitMetrics() error {
return nil
}

_, err = meter.RegisterCallback(callback,
d.sampler.metrics.clientReg, err = meter.RegisterCallback(callback,
lastSampledTS,
busyWorkers,
networkHead,
Expand All @@ -133,6 +135,13 @@ func (d *DASer) InitMetrics() error {
return nil
}

func (m *metrics) close() error {
if m == nil {
return nil
}
return m.clientReg.Unregister()
}

// observeSample records the time it took to sample a header +
// the amount of sampled contiguous headers
func (m *metrics) observeSample(
Expand Down
21 changes: 18 additions & 3 deletions nodebuilder/node/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@ import (
"context"
"time"

logging "github.com/ipfs/go-log/v2"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.uber.org/fx"
)

var log = logging.Logger("module/node")

var meter = otel.Meter("node")

var (
Expand All @@ -17,7 +21,7 @@ var (
)

// WithMetrics registers node metrics.
func WithMetrics() error {
func WithMetrics(lc fx.Lifecycle) error {
nodeStartTS, err := meter.Int64ObservableGauge(
"node_start_ts",
metric.WithDescription("timestamp when the node was started"),
Expand Down Expand Up @@ -66,7 +70,18 @@ func WithMetrics() error {
return nil
}

_, err = meter.RegisterCallback(callback, nodeStartTS, totalNodeRunTime, buildInfoGauge)
clientReg, err := meter.RegisterCallback(callback, nodeStartTS, totalNodeRunTime, buildInfoGauge)
if err != nil {
return nil
}

return err
lc.Append(
fx.Hook{OnStop: func(context.Context) error {
if err := clientReg.Unregister(); err != nil {
log.Warn("failed to close metrics", "err", err)
}
return nil
}},
)
return nil
}
4 changes: 2 additions & 2 deletions nodebuilder/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ func WithMetrics(metricOpts []otlpmetrichttp.Option, nodeType node.Type) fx.Opti
baseComponents := fx.Options(
fx.Supply(metricOpts),
fx.Invoke(initializeMetrics),
fx.Invoke(func(ca *state.CoreAccessor) {
fx.Invoke(func(lc fx.Lifecycle, ca *state.CoreAccessor) {
if ca == nil {
return
}
state.WithMetrics(ca)
state.WithMetrics(lc, ca)
}),
fx.Invoke(fraud.WithMetrics[*header.ExtendedHeader]),
fx.Invoke(node.WithMetrics),
Expand Down
7 changes: 5 additions & 2 deletions share/eds/cache/accessor_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,13 @@ func (bc *AccessorCache) Remove(key shard.Key) error {
}

// EnableMetrics enables metrics for the cache.
func (bc *AccessorCache) EnableMetrics() error {
func (bc *AccessorCache) EnableMetrics() (CloseMetricsFn, error) {
var err error
bc.metrics, err = newMetrics(bc)
return err
if err != nil {
return nil, err
}
return bc.metrics.close, err
}

// refCloser manages references to accessor from provided reader and removes the ref, when the
Expand Down
4 changes: 3 additions & 1 deletion share/eds/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ var (
errCacheMiss = errors.New("accessor not found in blockstore cache")
)

type CloseMetricsFn func() error

// Cache is an interface that defines the basic Cache operations.
type Cache interface {
// Get retrieves an item from the Cache.
Expand All @@ -37,7 +39,7 @@ type Cache interface {
Remove(shard.Key) error

// EnableMetrics enables metrics in Cache
EnableMetrics() error
EnableMetrics() (CloseMetricsFn, error)
}

// Accessor is a interface type returned by cache, that allows to read raw data by reader or create
Expand Down
19 changes: 15 additions & 4 deletions share/eds/cache/doublecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,20 @@ func (mc *DoubleCache) Second() Cache {
return mc.second
}

func (mc *DoubleCache) EnableMetrics() error {
if err := mc.first.EnableMetrics(); err != nil {
return err
func (mc *DoubleCache) EnableMetrics() (CloseMetricsFn, error) {
firstCloser, err := mc.first.EnableMetrics()
if err != nil {
return nil, err
}
return mc.second.EnableMetrics()
secondCloser, err := mc.second.EnableMetrics()
if err != nil {
return nil, err
}

return func() error {
if err := errors.Join(firstCloser(), secondCloser()); err != nil {
log.Warnw("failed to close metrics", "err", err)
}
return nil
}, nil
}
17 changes: 15 additions & 2 deletions share/eds/cache/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ const (
type metrics struct {
getCounter metric.Int64Counter
evictedCounter metric.Int64Counter

clientReg metric.Registration
}

func newMetrics(bc *AccessorCache) (*metrics, error) {
Expand Down Expand Up @@ -43,12 +45,23 @@ func newMetrics(bc *AccessorCache) (*metrics, error) {
observer.ObserveInt64(cacheSize, int64(bc.cache.Len()))
return nil
}
_, err = meter.RegisterCallback(callback, cacheSize)
clientReg, err := meter.RegisterCallback(callback, cacheSize)
if err != nil {
return nil, err
}

return &metrics{
getCounter: getCounter,
evictedCounter: evictedCounter,
}, err
clientReg: clientReg,
}, nil
}

func (m *metrics) close() error {
if m == nil {
return nil
}
return m.clientReg.Unregister()
}

func (m *metrics) observeEvicted(failed bool) {
Expand Down
4 changes: 2 additions & 2 deletions share/eds/cache/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func (n NoopCache) Remove(shard.Key) error {
return nil
}

func (n NoopCache) EnableMetrics() error {
return nil
func (n NoopCache) EnableMetrics() (CloseMetricsFn, error) {
return func() error { return nil }, nil
}

var _ Accessor = (*NoopAccessor)(nil)
Expand Down
20 changes: 18 additions & 2 deletions share/eds/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package eds

import (
"context"
"errors"
"time"

"go.opentelemetry.io/otel"
Expand Down Expand Up @@ -49,6 +50,9 @@ type metrics struct {

longOpTime metric.Float64Histogram
gcTime metric.Float64Histogram

clientReg metric.Registration
closerFn func() error
}

func (s *Store) WithMetrics() error {
Expand Down Expand Up @@ -124,7 +128,8 @@ func (s *Store) WithMetrics() error {
return err
}

if err = s.cache.Load().EnableMetrics(); err != nil {
closerFn, err := s.cache.Load().EnableMetrics()
if err != nil {
return err
}

Expand All @@ -139,7 +144,8 @@ func (s *Store) WithMetrics() error {
return nil
}

if _, err := meter.RegisterCallback(callback, dagStoreShards); err != nil {
clientReg, err := meter.RegisterCallback(callback, dagStoreShards)
if err != nil {
return err
}

Expand All @@ -155,10 +161,20 @@ func (s *Store) WithMetrics() error {
shardFailureCount: shardFailureCount,
longOpTime: longOpTime,
gcTime: gcTime,
clientReg: clientReg,
closerFn: closerFn,
}
return nil
}

func (m *metrics) close() error {
if m == nil {
return nil
}

return errors.Join(m.closerFn(), m.clientReg.Unregister())
}

func (m *metrics) observeGCtime(ctx context.Context, dur time.Duration, failed bool) {
if m == nil {
return
Expand Down
5 changes: 5 additions & 0 deletions share/eds/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ func (s *Store) Start(ctx context.Context) error {
// Stop stops the underlying DAGStore.
func (s *Store) Stop(context.Context) error {
defer s.cancel()

if err := s.metrics.close(); err != nil {
log.Warnw("failed to close metrics", "err", err)
}

if err := s.invertedIdx.close(); err != nil {
return err
}
Expand Down
7 changes: 6 additions & 1 deletion share/p2p/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,12 @@ func (d *Discovery) Start(context.Context) error {

func (d *Discovery) Stop(context.Context) error {
d.cancel()
return d.metrics.close()

if err := d.metrics.close(); err != nil {
log.Warnw("failed to close metrics", "err", err)
}

return nil
}

// Peers provides a list of discovered peers in the given topic.
Expand Down
4 changes: 4 additions & 0 deletions share/p2p/peers/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ func (m *Manager) Start(startCtx context.Context) error {
func (m *Manager) Stop(ctx context.Context) error {
m.cancel()

if err := m.metrics.close(); err != nil {
log.Warnw("closing metrics", "err", err)
}

// we do not need to wait for headersub and disconnected peers to finish
// here, since they were never started
if m.headerSub == nil && m.shrexSub == nil {
Expand Down
11 changes: 10 additions & 1 deletion share/p2p/peers/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ type metrics struct {
fullNodesPool metric.Int64ObservableGauge // attributes: pool_status
blacklistedPeersByReason sync.Map
blacklistedPeers metric.Int64ObservableGauge // attributes: blacklist_reason

clientReg metric.Registration
}

func initMetrics(manager *Manager) (*metrics, error) {
Expand Down Expand Up @@ -154,13 +156,20 @@ func initMetrics(manager *Manager) (*metrics, error) {
})
return nil
}
_, err = meter.RegisterCallback(callback, shrexPools, fullNodesPool, blacklisted)
metrics.clientReg, err = meter.RegisterCallback(callback, shrexPools, fullNodesPool, blacklisted)
if err != nil {
return nil, fmt.Errorf("registering metrics callback: %w", err)
}
return metrics, nil
}

func (m *metrics) close() error {
if m == nil {
return nil
}
return m.clientReg.Unregister()
}

func (m *metrics) observeGetPeer(
ctx context.Context,
source peerSource, poolSize int, waitTime time.Duration,
Expand Down
15 changes: 13 additions & 2 deletions state/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import (

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
"go.uber.org/fx"
)

var meter = otel.Meter("state")

func WithMetrics(ca *CoreAccessor) {
func WithMetrics(lc fx.Lifecycle, ca *CoreAccessor) {
pfbCounter, _ := meter.Int64ObservableCounter(
"pfb_count",
metric.WithDescription("Total count of submitted PayForBlob transactions"),
Expand All @@ -24,8 +25,18 @@ func WithMetrics(ca *CoreAccessor) {
observer.ObserveInt64(lastPfbTimestamp, ca.LastPayForBlob())
return nil
}
_, err := meter.RegisterCallback(callback, pfbCounter, lastPfbTimestamp)

clientReg, err := meter.RegisterCallback(callback, pfbCounter, lastPfbTimestamp)
if err != nil {
panic(err)
}

lc.Append(fx.Hook{
OnStop: func(context.Context) error {
if err := clientReg.Unregister(); err != nil {
log.Warnw("failed to close metrics", "err", err)
}
return nil
},
})
}
Loading