diff --git a/metrics.go b/metrics.go deleted file mode 100644 index 81f59f56..00000000 --- a/metrics.go +++ /dev/null @@ -1,39 +0,0 @@ -package header - -import ( - "context" - "time" - - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" -) - -var meter = otel.Meter("header") - -// WithMetrics enables Otel metrics to monitor head and total amount of synced headers. -func WithMetrics[H Header[H]](store Store[H]) error { - headC, _ := meter.Int64ObservableCounter( - "head", - metric.WithDescription("Subjective head of the node"), - ) - - callback := func(ctx context.Context, observer metric.Observer) error { - // add timeout to limit the time it takes to get the head - // in case there is a deadlock - ctx, cancel := context.WithTimeout(ctx, time.Second) - defer cancel() - - head, err := store.Head(ctx) - if err != nil { - observer.ObserveInt64(headC, 0, - metric.WithAttributes( - attribute.String("err", err.Error()))) - } else { - observer.ObserveInt64(headC, int64(head.Height())) - } - return nil - } - _, err := meter.RegisterCallback(callback, headC) - return err -} diff --git a/store/metrics.go b/store/metrics.go new file mode 100644 index 00000000..e5f14211 --- /dev/null +++ b/store/metrics.go @@ -0,0 +1,119 @@ +package store + +import ( + "context" + "sync/atomic" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +var meter = otel.Meter("header/store") + +type metrics struct { + headHeight atomic.Int64 + headHeightInst metric.Int64ObservableGauge + headHeightReg metric.Registration + + flushTimeInst metric.Float64Histogram + readTimeInst metric.Float64Histogram + + writesQueueBlockedInst metric.Int64Counter +} + +func newMetrics() (m *metrics, err error) { + m = new(metrics) + m.headHeightInst, err = meter.Int64ObservableGauge( + "hdr_store_head_height_gauge", + metric.WithDescription("current header store head height(subjective height)"), + ) + if err != nil { + return nil, err + } + m.headHeightReg, err = meter.RegisterCallback(m.observeHeight, m.headHeightInst) + if err != nil { + return nil, err + } + m.flushTimeInst, err = meter.Float64Histogram( + "hdr_store_flush_time_hist", + metric.WithDescription("header store flush time in seconds"), + ) + if err != nil { + return nil, err + } + m.readTimeInst, err = meter.Float64Histogram( + "hdr_store_read_time_hist", + metric.WithDescription("header store single header read time from datastore in seconds and ignoring cache"), + ) + if err != nil { + return nil, err + } + m.writesQueueBlockedInst, err = meter.Int64Counter( + "hdr_store_writes_blocked_counter", + metric.WithDescription("header store writes blocked counter"), + ) + if err != nil { + return nil, err + } + return m, nil +} + +func (m *metrics) newHead(height uint64) { + m.observe(context.Background(), func(ctx context.Context) { + m.headHeight.Store(int64(height)) + }) +} + +func (m *metrics) observeHeight(_ context.Context, obs metric.Observer) error { + obs.ObserveInt64(m.headHeightInst, m.headHeight.Load()) + return nil +} + +func (m *metrics) flush(ctx context.Context, duration time.Duration, amount int, failed bool) { + m.observe(ctx, func(ctx context.Context) { + m.flushTimeInst.Record(ctx, + duration.Seconds(), + metric.WithAttributes( + attribute.Int("amount", amount/100), // divide by 100 to reduce cardinality + attribute.Bool("failed", failed), + ), + ) + }) +} + +func (m *metrics) readSingle(ctx context.Context, duration time.Duration, failed bool) { + m.observe(ctx, func(ctx context.Context) { + m.readTimeInst.Record(ctx, + duration.Seconds(), + metric.WithAttributes(attribute.Bool("failed", failed)), + ) + }) +} + +func (m *metrics) writesQueueBlocked(ctx context.Context) { + m.observe(ctx, func(ctx context.Context) { + m.writesQueueBlockedInst.Add(ctx, 1) + }) +} + +func (m *metrics) observe(ctx context.Context, f func(context.Context)) { + if m == nil { + return + } + + if ctx.Err() != nil { + ctx = context.Background() + } + + f(ctx) +} + +func (m *metrics) Close() error { + if m == nil { + return nil + } + + return m.headHeightReg.Unregister() +} diff --git a/store/options.go b/store/options.go index 8633a1f8..b0c01d62 100644 --- a/store/options.go +++ b/store/options.go @@ -18,13 +18,16 @@ type Parameters struct { // IndexCacheSize defines the maximum amount of entries in the Height to Hash index cache. IndexCacheSize int - // WriteBatchSize defines the size of the batched header write. + // WriteBatchSize defines the size of the batched header flush. // Headers are written in batches not to thrash the underlying Datastore with writes. WriteBatchSize int // storePrefix defines the prefix used to wrap the store // OPTIONAL storePrefix datastore.Key + + // metrics is a flag that enables metrics collection + metrics bool } // DefaultParameters returns the default params to configure the store. @@ -51,6 +54,13 @@ func (p *Parameters) Validate() error { return nil } +// WithMetrics enables metrics on the Store. +func WithMetrics() Option { + return func(p *Parameters) { + p.metrics = true + } +} + // WithStoreCacheSize is a functional option that configures the // `StoreCacheSize` parameter. func WithStoreCacheSize(size int) Option { diff --git a/store/store.go b/store/store.go index f8f88f31..0cee3b4b 100644 --- a/store/store.go +++ b/store/store.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "sync/atomic" + "time" lru "github.com/hashicorp/golang-lru" "github.com/ipfs/go-datastore" @@ -31,6 +32,8 @@ type Store[H header.Header[H]] struct { ds datastore.Batching // adaptive replacement cache of headers cache *lru.ARCCache + // metrics collection instance + metrics *metrics // header heights management // @@ -102,15 +105,24 @@ func newStore[H header.Header[H]](ds datastore.Batching, opts ...Option) (*Store return nil, fmt.Errorf("failed to create height indexer: %w", err) } + var metrics *metrics + if params.metrics { + metrics, err = newMetrics() + if err != nil { + return nil, err + } + } + return &Store[H]{ - Params: params, ds: wrappedStore, + cache: cache, + metrics: metrics, + heightIndex: index, heightSub: newHeightSub[H](), writes: make(chan []H, 16), writesDn: make(chan struct{}), - cache: cache, - heightIndex: index, pending: newBatch[H](params.WriteBatchSize), + Params: params, }, nil } @@ -141,9 +153,14 @@ func (s *Store[H]) Stop(ctx context.Context) error { default: } // signal to prevent further writes to Store - s.writes <- nil select { - case <-s.writesDn: // wait till it is done writing + case s.writes <- nil: + case <-ctx.Done(): + return ctx.Err() + } + // wait till it is done writing + select { + case <-s.writesDn: case <-ctx.Done(): return ctx.Err() } @@ -151,7 +168,7 @@ func (s *Store[H]) Stop(ctx context.Context) error { // cleanup caches s.cache.Purge() s.heightIndex.cache.Purge() - return nil + return s.metrics.Close() } func (s *Store[H]) Height() uint64 { @@ -172,7 +189,7 @@ func (s *Store[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, erro case errors.Is(err, datastore.ErrNotFound), errors.Is(err, header.ErrNotFound): return zero, header.ErrNoHead case err == nil: - s.heightSub.SetHeight(uint64(head.Height())) + s.heightSub.SetHeight(head.Height()) log.Infow("loaded head", "height", head.Height(), "hash", head.Hash()) return head, nil } @@ -188,12 +205,8 @@ func (s *Store[H]) Get(ctx context.Context, hash header.Hash) (H, error) { return h, nil } - b, err := s.ds.Get(ctx, datastore.NewKey(hash.String())) + b, err := s.get(ctx, hash) if err != nil { - if errors.Is(err, datastore.ErrNotFound) { - return zero, header.ErrNotFound - } - return zero, err } @@ -356,15 +369,27 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error { verified, head = append(verified, h), h } + onWrite := func() { + newHead := verified[len(verified)-1] + s.writeHead.Store(&newHead) + log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash()) + s.metrics.newHead(newHead.Height()) + } + // queue headers to be written on disk select { case s.writes <- verified: - ln := len(verified) - s.writeHead.Store(&verified[ln-1]) - wh := *s.writeHead.Load() - log.Infow("new head", "height", wh.Height(), "hash", wh.Hash()) // we return an error here after writing, // as there might be an invalid header in between of a given range + onWrite() + return err + default: + s.metrics.writesQueueBlocked(ctx) + } + // if the writes queue is full, we block until it is not + select { + case s.writes <- verified: + onWrite() return err case <-s.writesDn: return errStoppedStore @@ -393,13 +418,17 @@ func (s *Store[H]) flushLoop() { continue } - err := s.flush(ctx, s.pending.GetAll()...) + startTime := time.Now() + toFlush := s.pending.GetAll() + err := s.flush(ctx, toFlush...) if err != nil { + from, to := toFlush[0].Height(), toFlush[len(toFlush)-1].Height() // TODO(@Wondertan): Should this be a fatal error case with os.Exit? - from, to := uint64(headers[0].Height()), uint64(headers[len(headers)-1].Height()) log.Errorw("writing header batch", "from", from, "to", to) + s.metrics.flush(ctx, time.Since(startTime), s.pending.Len(), true) continue } + s.metrics.flush(ctx, time.Since(startTime), s.pending.Len(), false) // reset pending s.pending.Reset() @@ -472,3 +501,18 @@ func (s *Store[H]) readHead(ctx context.Context) (H, error) { return s.Get(ctx, head) } + +func (s *Store[H]) get(ctx context.Context, hash header.Hash) ([]byte, error) { + startTime := time.Now() + data, err := s.ds.Get(ctx, datastore.NewKey(hash.String())) + if err != nil { + s.metrics.readSingle(ctx, time.Since(startTime), true) + if errors.Is(err, datastore.ErrNotFound) { + return nil, header.ErrNotFound + } + return nil, err + } + + s.metrics.readSingle(ctx, time.Since(startTime), false) + return data, nil +}