Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(store)!: metrics for Store #129

Merged
merged 2 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 0 additions & 39 deletions metrics.go

This file was deleted.

119 changes: 119 additions & 0 deletions store/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package store

import (
"context"
"sync/atomic"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

var meter = otel.Meter("header/store")

type metrics struct {
headHeight atomic.Int64
headHeightInst metric.Int64ObservableGauge
headHeightReg metric.Registration

flushTimeInst metric.Float64Histogram
readTimeInst metric.Float64Histogram

writesQueueBlockedInst metric.Int64Counter
}

func newMetrics() (m *metrics, err error) {
m = new(metrics)
m.headHeightInst, err = meter.Int64ObservableGauge(
"hdr_store_head_height_gauge",
metric.WithDescription("current header store head height(subjective height)"),
)
if err != nil {
return nil, err
}
m.headHeightReg, err = meter.RegisterCallback(m.observeHeight, m.headHeightInst)
if err != nil {
return nil, err
}
m.flushTimeInst, err = meter.Float64Histogram(
"hdr_store_flush_time_hist",
metric.WithDescription("header store flush time in seconds"),
)
if err != nil {
return nil, err
}
m.readTimeInst, err = meter.Float64Histogram(
"hdr_store_read_time_hist",
metric.WithDescription("header store single header read time from datastore in seconds and ignoring cache"),
)
if err != nil {
return nil, err
}
m.writesQueueBlockedInst, err = meter.Int64Counter(
"hdr_store_writes_blocked_counter",
metric.WithDescription("header store writes blocked counter"),
)
if err != nil {
return nil, err
}
return m, nil
}

func (m *metrics) newHead(height uint64) {
m.observe(context.Background(), func(ctx context.Context) {
m.headHeight.Store(int64(height))
})
}

func (m *metrics) observeHeight(_ context.Context, obs metric.Observer) error {
obs.ObserveInt64(m.headHeightInst, m.headHeight.Load())
return nil
}

func (m *metrics) flush(ctx context.Context, duration time.Duration, amount int, failed bool) {
m.observe(ctx, func(ctx context.Context) {
m.flushTimeInst.Record(ctx,
duration.Seconds(),
metric.WithAttributes(
attribute.Int("amount", amount/100), // divide by 100 to reduce cardinality
attribute.Bool("failed", failed),
),
)
})
}

func (m *metrics) readSingle(ctx context.Context, duration time.Duration, failed bool) {
m.observe(ctx, func(ctx context.Context) {
m.readTimeInst.Record(ctx,
duration.Seconds(),
metric.WithAttributes(attribute.Bool("failed", failed)),
)
})
}

func (m *metrics) writesQueueBlocked(ctx context.Context) {
m.observe(ctx, func(ctx context.Context) {
m.writesQueueBlockedInst.Add(ctx, 1)
})
}

func (m *metrics) observe(ctx context.Context, f func(context.Context)) {
if m == nil {
return
}

if ctx.Err() != nil {
ctx = context.Background()
}

f(ctx)
}

func (m *metrics) Close() error {
if m == nil {
return nil
}

return m.headHeightReg.Unregister()
}
12 changes: 11 additions & 1 deletion store/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@ type Parameters struct {
// IndexCacheSize defines the maximum amount of entries in the Height to Hash index cache.
IndexCacheSize int

// WriteBatchSize defines the size of the batched header write.
// WriteBatchSize defines the size of the batched header flush.
// Headers are written in batches not to thrash the underlying Datastore with writes.
WriteBatchSize int

// storePrefix defines the prefix used to wrap the store
// OPTIONAL
storePrefix datastore.Key

// metrics is a flag that enables metrics collection
metrics bool
}

// DefaultParameters returns the default params to configure the store.
Expand All @@ -51,6 +54,13 @@ func (p *Parameters) Validate() error {
return nil
}

// WithMetrics enables metrics on the Store.
func WithMetrics() Option {
return func(p *Parameters) {
p.metrics = true
}
}

// WithStoreCacheSize is a functional option that configures the
// `StoreCacheSize` parameter.
func WithStoreCacheSize(size int) Option {
Expand Down
80 changes: 62 additions & 18 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"sync/atomic"
"time"

lru "github.com/hashicorp/golang-lru"
"github.com/ipfs/go-datastore"
Expand All @@ -31,6 +32,8 @@ type Store[H header.Header[H]] struct {
ds datastore.Batching
// adaptive replacement cache of headers
cache *lru.ARCCache
// metrics collection instance
metrics *metrics

// header heights management
//
Expand Down Expand Up @@ -102,15 +105,24 @@ func newStore[H header.Header[H]](ds datastore.Batching, opts ...Option) (*Store
return nil, fmt.Errorf("failed to create height indexer: %w", err)
}

var metrics *metrics
if params.metrics {
metrics, err = newMetrics()
if err != nil {
return nil, err
}
}

return &Store[H]{
Params: params,
ds: wrappedStore,
cache: cache,
metrics: metrics,
heightIndex: index,
heightSub: newHeightSub[H](),
writes: make(chan []H, 16),
writesDn: make(chan struct{}),
cache: cache,
heightIndex: index,
pending: newBatch[H](params.WriteBatchSize),
Params: params,
}, nil
}

Expand Down Expand Up @@ -141,17 +153,22 @@ func (s *Store[H]) Stop(ctx context.Context) error {
default:
}
// signal to prevent further writes to Store
s.writes <- nil
select {
case <-s.writesDn: // wait till it is done writing
case s.writes <- nil:
case <-ctx.Done():
return ctx.Err()
}
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
// wait till it is done writing
select {
case <-s.writesDn:
case <-ctx.Done():
return ctx.Err()
}

// cleanup caches
s.cache.Purge()
s.heightIndex.cache.Purge()
return nil
return s.metrics.Close()
}

func (s *Store[H]) Height() uint64 {
Expand All @@ -172,7 +189,7 @@ func (s *Store[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, erro
case errors.Is(err, datastore.ErrNotFound), errors.Is(err, header.ErrNotFound):
return zero, header.ErrNoHead
case err == nil:
s.heightSub.SetHeight(uint64(head.Height()))
s.heightSub.SetHeight(head.Height())
log.Infow("loaded head", "height", head.Height(), "hash", head.Hash())
return head, nil
}
Expand All @@ -188,12 +205,8 @@ func (s *Store[H]) Get(ctx context.Context, hash header.Hash) (H, error) {
return h, nil
}

b, err := s.ds.Get(ctx, datastore.NewKey(hash.String()))
b, err := s.get(ctx, hash)
if err != nil {
if errors.Is(err, datastore.ErrNotFound) {
return zero, header.ErrNotFound
}

return zero, err
}

Expand Down Expand Up @@ -356,15 +369,27 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error {
verified, head = append(verified, h), h
}

onWrite := func() {
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
newHead := verified[len(verified)-1]
s.writeHead.Store(&newHead)
log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash())
s.metrics.newHead(newHead.Height())
}

// queue headers to be written on disk
select {
case s.writes <- verified:
ln := len(verified)
s.writeHead.Store(&verified[ln-1])
wh := *s.writeHead.Load()
log.Infow("new head", "height", wh.Height(), "hash", wh.Hash())
// we return an error here after writing,
// as there might be an invalid header in between of a given range
onWrite()
return err
default:
s.metrics.writesQueueBlocked(ctx)
}
// if the writes queue is full, we block until it is not
select {
case s.writes <- verified:
onWrite()
return err
case <-s.writesDn:
return errStoppedStore
Expand Down Expand Up @@ -393,13 +418,17 @@ func (s *Store[H]) flushLoop() {
continue
}

err := s.flush(ctx, s.pending.GetAll()...)
startTime := time.Now()
toFlush := s.pending.GetAll()
err := s.flush(ctx, toFlush...)
if err != nil {
from, to := toFlush[0].Height(), toFlush[len(toFlush)-1].Height()
// TODO(@Wondertan): Should this be a fatal error case with os.Exit?
from, to := uint64(headers[0].Height()), uint64(headers[len(headers)-1].Height())
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
log.Errorw("writing header batch", "from", from, "to", to)
s.metrics.flush(ctx, time.Since(startTime), s.pending.Len(), true)
continue
}
s.metrics.flush(ctx, time.Since(startTime), s.pending.Len(), false)
// reset pending
s.pending.Reset()

Expand Down Expand Up @@ -472,3 +501,18 @@ func (s *Store[H]) readHead(ctx context.Context) (H, error) {

return s.Get(ctx, head)
}

func (s *Store[H]) get(ctx context.Context, hash header.Hash) ([]byte, error) {
startTime := time.Now()
data, err := s.ds.Get(ctx, datastore.NewKey(hash.String()))
if err != nil {
s.metrics.readSingle(ctx, time.Since(startTime), true)
if errors.Is(err, datastore.ErrNotFound) {
return nil, header.ErrNotFound
}
return nil, err
}

s.metrics.readSingle(ctx, time.Since(startTime), false)
return data, nil
}