Skip to content

Commit

Permalink
apply suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
vgonkivs committed Dec 15, 2023
1 parent 121111d commit 81d3d2b
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 37 deletions.
70 changes: 36 additions & 34 deletions sync/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,23 @@ import (
var meter = otel.Meter("header/sync")

type metrics struct {
ctx context.Context

totalSynced atomic.Int64
totalSyncedGauge metric.Float64ObservableGauge

syncLoopStarted metric.Int64Counter
trustedPeersOutOfSync metric.Int64Counter
laggingHeadersStart metric.Int64Counter

subjectiveHead atomic.Int64
headerTimestamp metric.Float64Histogram
subjectiveHead atomic.Int64
blockTime metric.Float64Histogram

headerReceived time.Time
prevHeader time.Time

headersThreshHold time.Duration
headersThreshold time.Duration
}

func newMetrics(headersThreshHold time.Duration) (*metrics, error) {
func newMetrics(headersThreshold time.Duration) (*metrics, error) {
totalSynced, err := meter.Float64ObservableGauge(
"total_synced_headers",
metric.WithDescription("total synced headers"),
Expand All @@ -39,40 +37,53 @@ func newMetrics(headersThreshHold time.Duration) (*metrics, error) {
return nil, err
}

syncLoopStarted, err := meter.Int64Counter("sync_loop_started", metric.WithDescription("sync loop started"))
syncLoopStarted, err := meter.Int64Counter(
"sync_loop_started",
metric.WithDescription("sync loop started"),
)
if err != nil {
return nil, err
}

trustedPeersOutOfSync, err := meter.Int64Counter("tr_peers_out_of_sync", metric.WithDescription("trusted peers out of sync"))
trustedPeersOutOfSync, err := meter.Int64Counter(
"tr_peers_out_of_sync",
metric.WithDescription("trusted peers out of sync"),
)
if err != nil {
return nil, err
}

laggingHeadersStart, err := meter.Int64Counter("sync_lagging_hdr_start", metric.WithDescription("lagging header start"))
laggingHeadersStart, err := meter.Int64Counter(
"sync_lagging_hdr_start",
metric.WithDescription("lagging header start"),
)
if err != nil {
return nil, err
}

subjectiveHead, err := meter.Int64ObservableGauge("sync_subjective_head", metric.WithDescription("subjective head height"))
subjectiveHead, err := meter.Int64ObservableGauge(
"sync_subjective_head",
metric.WithDescription("subjective head height"),
)
if err != nil {
return nil, err
}

headerTimestamp, err := meter.Float64Histogram("sync_subjective_head_ts",
metric.WithDescription("subjective_head_timestamp"))
blockTime, err := meter.Float64Histogram(
"sync_actual_blockTime_ts",
metric.WithDescription("subjective_head_timestamp"),
)
if err != nil {
return nil, err
}

m := &metrics{
ctx: context.Background(),
totalSyncedGauge: totalSynced,
syncLoopStarted: syncLoopStarted,
trustedPeersOutOfSync: trustedPeersOutOfSync,
laggingHeadersStart: laggingHeadersStart,
headerTimestamp: headerTimestamp,
headersThreshHold: headersThreshHold,
blockTime: blockTime,
headersThreshold: headersThreshold,
}

callback := func(ctx context.Context, observer metric.Observer) error {
Expand All @@ -99,39 +110,30 @@ func (m *metrics) recordSyncLoopStarted() {
if m == nil {
return
}
m.syncLoopStarted.Add(m.ctx, 1)
m.syncLoopStarted.Add(context.Background(), 1)
}

func (m *metrics) recordTrustedPeersOutOfSync() {
if m == nil {
return
}
m.trustedPeersOutOfSync.Add(m.ctx, 1)
m.trustedPeersOutOfSync.Add(context.Background(), 1)
}

func (m *metrics) observeNewHead(height int64) {
func (m *metrics) observeNewSubjectiveHead(height int64, timestamp time.Time) {
if m == nil {
return
}
m.subjectiveHead.Store(height)
}

func (m *metrics) observeLaggingHeader() {
if m == nil {
return
}
if time.Since(m.headerReceived) > m.headersThreshHold {
m.laggingHeadersStart.Add(m.ctx, 1)
}
m.headerReceived = time.Now()
}

func (m *metrics) observeHeaderTimestamp(timestamp time.Time) {
if m == nil {
return
}
ctx := context.Background()
if !m.prevHeader.IsZero() {
m.headerTimestamp.Record(m.ctx, timestamp.Sub(m.prevHeader).Seconds())
m.blockTime.Record(ctx, timestamp.Sub(m.prevHeader).Seconds())
}
m.prevHeader = timestamp

if time.Since(m.headerReceived) > m.headersThreshold {
m.laggingHeadersStart.Add(ctx, 1)
}
m.headerReceived = time.Now()
}
4 changes: 1 addition & 3 deletions sync/sync_head.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,7 @@ func (s *Syncer[H]) setSubjectiveHead(ctx context.Context, netHead H) {
s.pending.Add(netHead)
s.wantSync()
log.Infow("new network head", "height", netHead.Height(), "hash", netHead.Hash())
s.metrics.observeNewHead(int64(netHead.Height()))
s.metrics.observeHeaderTimestamp(netHead.Time())
s.metrics.observeLaggingHeader()
s.metrics.observeNewSubjectiveHead(int64(netHead.Height()), netHead.Time())
}

// incomingNetworkHead processes new potential network headers.
Expand Down

0 comments on commit 81d3d2b

Please sign in to comment.