Skip to content

Commit

Permalink
chore: rework metrics registration and handling
Browse files Browse the repository at this point in the history
  • Loading branch information
vgonkivs committed Jan 8, 2024
1 parent 30b9fb7 commit 04cd1d6
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 10 deletions.
20 changes: 12 additions & 8 deletions sync/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ var meter = otel.Meter("header/sync")
type metrics struct {
totalSynced atomic.Int64
totalSyncedInst metric.Int64ObservableGauge
totalSyncedReg metric.Registration

syncLoopStarted metric.Int64Counter
trustedPeersOutOfSync metric.Int64Counter
Expand All @@ -27,7 +26,8 @@ type metrics struct {
prevHeader time.Time

subjectiveHeadInst metric.Int64ObservableGauge
subjectiveHeadReg metric.Registration

syncReg metric.Registration

headersThreshold time.Duration
}
Expand Down Expand Up @@ -91,16 +91,21 @@ func newMetrics(headersThreshold time.Duration) (*metrics, error) {
headersThreshold: headersThreshold,
}

m.totalSyncedReg, err = meter.RegisterCallback(m.observeTotalSynced, totalSynced, subjectiveHead)
m.syncReg, err = meter.RegisterCallback(m.observeMetrics, m.totalSyncedInst, m.subjectiveHeadInst)
if err != nil {
return nil, err
}

m.subjectiveHeadReg, err = meter.RegisterCallback(m.observeNewHead, totalSynced, subjectiveHead)
return m, nil
}

func (m *metrics) observeMetrics(ctx context.Context, obs metric.Observer) error {
err := m.observeTotalSynced(ctx, obs)
if err != nil {
return nil, err
return err
}
return m, nil

return m.observeNewHead(ctx, obs)
}

func (m *metrics) observeTotalSynced(_ context.Context, obs metric.Observer) error {
Expand All @@ -109,7 +114,7 @@ func (m *metrics) observeTotalSynced(_ context.Context, obs metric.Observer) err
}

func (m *metrics) observeNewHead(_ context.Context, obs metric.Observer) error {
obs.ObserveInt64(m.subjectiveHeadInst, m.totalSynced.Load())
obs.ObserveInt64(m.subjectiveHeadInst, m.subjectiveHead.Load())
return nil
}

Expand Down Expand Up @@ -154,6 +159,5 @@ func (m *metrics) observe(ctx context.Context, observeFn func(context.Context))
if ctx.Err() != nil {
ctx = context.Background()
}

observeFn(ctx)
}
2 changes: 1 addition & 1 deletion sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func NewSyncer[H header.Header[H]](
var metrics *metrics
if params.metrics {
var err error
metrics, err = newMetrics(params.blockTime)
metrics, err = newMetrics(params.recencyThreshold)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion sync/sync_head.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func (s *Syncer[H]) setSubjectiveHead(ctx context.Context, netHead H) {
"hash", netHead.Hash().String(),
"err", err)
}
s.metrics.observeNewSubjectiveHead(s.ctx, int64(netHead.Height()), netHead.Time())

storeHead, err := s.store.Head(ctx)
if err == nil && storeHead.Height() >= netHead.Height() {
Expand All @@ -131,7 +132,6 @@ func (s *Syncer[H]) setSubjectiveHead(ctx context.Context, netHead H) {
s.pending.Add(netHead)
s.wantSync()
log.Infow("new network head", "height", netHead.Height(), "hash", netHead.Hash())
s.metrics.observeNewSubjectiveHead(s.ctx, int64(netHead.Height()), netHead.Time())
}

// incomingNetworkHead processes new potential network headers.
Expand Down

0 comments on commit 04cd1d6

Please sign in to comment.