Skip to content

Commit

Permalink
feat(syncer/metrics): add metrics to syncer (#136)
Browse files Browse the repository at this point in the history
  • Loading branch information
vgonkivs committed Feb 1, 2024
1 parent aa5a016 commit 30ce8bc
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 20 deletions.
196 changes: 182 additions & 14 deletions sync/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,48 +3,216 @@ package sync
import (
"context"
"sync/atomic"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

var meter = otel.Meter("header/sync")

type metrics struct {
totalSynced atomic.Int64
totalSyncedGauge metric.Float64ObservableGauge
syncerReg metric.Registration

subjectiveHeadInst metric.Int64ObservableGauge
syncLoopRunningInst metric.Int64ObservableGauge

syncLoopStarted metric.Int64Counter
trustedPeersOutOfSync metric.Int64Counter
unrecentHeader metric.Int64Counter
subjectiveInit metric.Int64Counter

subjectiveHead atomic.Int64

syncLoopDurationHist metric.Float64Histogram
syncLoopActive atomic.Int64
syncStartedTs time.Time

requestRangeTimeHist metric.Float64Histogram
requestRangeStartTs time.Time

blockTime metric.Float64Histogram
prevHeader time.Time
}

func newMetrics() (*metrics, error) {
totalSynced, err := meter.Float64ObservableGauge(
"total_synced_headers",
metric.WithDescription("total synced headers"),
syncLoopStarted, err := meter.Int64Counter(
"hdr_sync_loop_started_counter",
metric.WithDescription("sync loop started shows that syncing is in progress"),
)
if err != nil {
return nil, err
}

m := &metrics{
totalSyncedGauge: totalSynced,
trustedPeersOutOfSync, err := meter.Int64Counter(
"hdr_sync_trust_peers_out_of_sync_counter",
metric.WithDescription("trusted peers out of sync and gave outdated header"),
)
if err != nil {
return nil, err
}

callback := func(ctx context.Context, observer metric.Observer) error {
observer.ObserveFloat64(totalSynced, float64(m.totalSynced.Load()))
return nil
unrecentHeader, err := meter.Int64Counter(
"hdr_sync_unrecent_header_counter",
metric.WithDescription("tracks every time Syncer returns an unrecent header"),
)
if err != nil {
return nil, err
}
_, err = meter.RegisterCallback(callback, totalSynced)

subjectiveInit, err := meter.Int64Counter(
"hdr_sync_subjective_init_counter",
metric.WithDescription(
"tracks how many times is the node initialized ",
),
)
if err != nil {
return nil, err
}

subjectiveHead, err := meter.Int64ObservableGauge(
"hdr_sync_subjective_head_gauge",
metric.WithDescription("subjective head height"),
)
if err != nil {
return nil, err
}

syncLoopDurationHist, err := meter.Float64Histogram(
"hdr_sync_loop_time_hist",
metric.WithDescription("tracks the duration of syncing"))
if err != nil {
return nil, err
}

requestRangeTimeHist, err := meter.Float64Histogram("hdr_sync_range_request_time_hist",
metric.WithDescription("tracks the duration of GetRangeByHeight requests"))
if err != nil {
return nil, err
}

syncLoopRunningInst, err := meter.Int64ObservableGauge(
"hdr_sync_loop_status_gauge",
metric.WithDescription("reports whether syncing is active or not"))
if err != nil {
return nil, err
}

blockTime, err := meter.Float64Histogram(
"hdr_sync_actual_blockTime_ts_hist",
metric.WithDescription("duration between creation of 2 blocks"),
)
if err != nil {
return nil, err
}

m := &metrics{
syncLoopStarted: syncLoopStarted,
trustedPeersOutOfSync: trustedPeersOutOfSync,
unrecentHeader: unrecentHeader,
subjectiveInit: subjectiveInit,
syncLoopDurationHist: syncLoopDurationHist,
syncLoopRunningInst: syncLoopRunningInst,
requestRangeTimeHist: requestRangeTimeHist,
blockTime: blockTime,
subjectiveHeadInst: subjectiveHead,
}

m.syncerReg, err = meter.RegisterCallback(m.observeMetrics, m.subjectiveHeadInst, m.syncLoopRunningInst)
if err != nil {
return nil, err
}

return m, nil
}

// recordTotalSynced records the total amount of synced headers.
func (m *metrics) recordTotalSynced(totalSynced int) {
func (m *metrics) observeMetrics(_ context.Context, obs metric.Observer) error {
obs.ObserveInt64(m.subjectiveHeadInst, m.subjectiveHead.Load())
obs.ObserveInt64(m.syncLoopRunningInst, m.syncLoopActive.Load())
return nil
}

func (m *metrics) syncStarted(ctx context.Context) {
m.observe(ctx, func(ctx context.Context) {
m.syncStartedTs = time.Now()
m.syncLoopStarted.Add(ctx, 1)
m.syncLoopActive.Store(1)
})
}

func (m *metrics) syncFinished(ctx context.Context) {
m.observe(ctx, func(ctx context.Context) {
m.syncLoopActive.Store(0)
m.syncLoopDurationHist.Record(ctx, time.Since(m.syncStartedTs).Seconds())
})
}

func (m *metrics) unrecentHead(ctx context.Context) {
m.observe(ctx, func(ctx context.Context) {
m.unrecentHeader.Add(ctx, 1)
})
}

func (m *metrics) trustedPeersOutOufSync(ctx context.Context) {
m.observe(ctx, func(ctx context.Context) {
m.trustedPeersOutOfSync.Add(ctx, 1)
})
}

func (m *metrics) subjectiveInitialization(ctx context.Context) {
m.observe(ctx, func(ctx context.Context) {
m.subjectiveInit.Add(ctx, 1)
})
}

func (m *metrics) updateGetRangeRequestInfo(ctx context.Context, amount int, failed bool) {
m.observe(ctx, func(ctx context.Context) {
m.requestRangeTimeHist.Record(ctx, time.Since(m.requestRangeStartTs).Seconds(),
metric.WithAttributes(
attribute.Int("headers amount", amount),
attribute.Bool("request failed", failed),
))
})
}

func (m *metrics) newSubjectiveHead(ctx context.Context, height uint64, timestamp time.Time) {
m.observe(ctx, func(ctx context.Context) {
m.subjectiveHead.Store(int64(height))

if !m.prevHeader.IsZero() {
m.blockTime.Record(ctx, timestamp.Sub(m.prevHeader).Seconds())
}
})
}

func (m *metrics) rangeRequestStart() {
if m == nil {
return
}
m.requestRangeStartTs = time.Now()
}

func (m *metrics) rangeRequestStop() {
if m == nil {
return
}
m.requestRangeStartTs = time.Time{}
}

m.totalSynced.Add(int64(totalSynced))
func (m *metrics) observe(ctx context.Context, observeFn func(context.Context)) {
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}
observeFn(ctx)
}

func (m *metrics) Close() error {
if m == nil {
return nil
}
return m.syncerReg.Unregister()
}
15 changes: 9 additions & 6 deletions sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (s *Syncer[H]) Start(ctx context.Context) error {
// Stop stops Syncer.
func (s *Syncer[H]) Stop(context.Context) error {
s.cancel()
return nil
return s.metrics.Close()
}

// SyncWait blocks until ongoing sync is done.
Expand Down Expand Up @@ -158,7 +158,7 @@ func (s *Syncer[H]) State() State {

head, err := s.store.Head(s.ctx)
if err == nil {
state.Height = uint64(head.Height())
state.Height = head.Height()
} else if state.Error == "" {
// don't ignore the error if we can show it in the state
state.Error = err.Error()
Expand All @@ -180,7 +180,9 @@ func (s *Syncer[H]) syncLoop() {
for {
select {
case <-s.triggerSync:
s.metrics.syncStarted(s.ctx)
s.sync(s.ctx)
s.metrics.syncFinished(s.ctx)
case <-s.ctx.Done():
return
}
Expand Down Expand Up @@ -239,8 +241,8 @@ func (s *Syncer[H]) sync(ctx context.Context) {
func (s *Syncer[H]) doSync(ctx context.Context, fromHead, toHead H) (err error) {
s.stateLk.Lock()
s.state.ID++
s.state.FromHeight = uint64(fromHead.Height()) + 1
s.state.ToHeight = uint64(toHead.Height())
s.state.FromHeight = fromHead.Height() + 1
s.state.ToHeight = toHead.Height()
s.state.FromHash = fromHead.Hash()
s.state.ToHash = toHead.Hash()
s.state.Start = time.Now()
Expand Down Expand Up @@ -315,7 +317,10 @@ func (s *Syncer[H]) requestHeaders(
}

to := fromHead.Height() + size + 1
s.metrics.rangeRequestStart()
headers, err := s.getter.GetRangeByHeight(ctx, fromHead, to)
s.metrics.updateGetRangeRequestInfo(s.ctx, int(size)/100, err != nil)
s.metrics.rangeRequestStop()
if err != nil {
return err
}
Expand All @@ -338,7 +343,5 @@ func (s *Syncer[H]) storeHeaders(ctx context.Context, headers ...H) error {
if err != nil {
return err
}

s.metrics.recordTotalSynced(len(headers))
return nil
}
5 changes: 5 additions & 0 deletions sync/sync_head.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func (s *Syncer[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, err
// if we can't get it - give what we have
reqCtx, cancel := context.WithTimeout(ctx, time.Second*2) // TODO(@vgonkivs): make timeout configurable
defer cancel()
s.metrics.unrecentHead(s.ctx)
netHead, err := s.getter.Head(reqCtx, header.WithTrustedHead[H](sbjHead))
if err != nil {
log.Warnw("failed to get recent head, returning current subjective", "sbjHead", sbjHead.Height(), "err", err)
Expand Down Expand Up @@ -85,10 +86,12 @@ func (s *Syncer[H]) subjectiveHead(ctx context.Context) (H, error) {
return s.subjectiveHead(ctx)
}
defer s.getter.Unlock()

trustHead, err := s.getter.Head(ctx)
if err != nil {
return trustHead, err
}
s.metrics.subjectiveInitialization(s.ctx)
// and set it as the new subjective head without validation,
// or, in other words, do 'automatic subjective initialization'
// NOTE: we avoid validation as the head expired to prevent possibility of the Long-Range Attack
Expand All @@ -103,6 +106,7 @@ func (s *Syncer[H]) subjectiveHead(ctx context.Context) (H, error) {
log.Warnw("subjective initialization with an old header", "height", trustHead.Height())
}
log.Warn("trusted peer is out of sync")
s.metrics.trustedPeersOutOufSync(s.ctx)
return trustHead, nil
}

Expand All @@ -121,6 +125,7 @@ func (s *Syncer[H]) setSubjectiveHead(ctx context.Context, netHead H) {
"hash", netHead.Hash().String(),
"err", err)
}
s.metrics.newSubjectiveHead(s.ctx, netHead.Height(), netHead.Time())

storeHead, err := s.store.Head(ctx)
if err == nil && storeHead.Height() >= netHead.Height() {
Expand Down

0 comments on commit 30ce8bc

Please sign in to comment.