Skip to content

Commit

Permalink
feat(shrex): metrics (#2095)
Browse files Browse the repository at this point in the history
Closes #2143 
Adds metrics to ShrexGetter, and ShrexEDS/ShrexND Client and Server
(including shrex middleware)
  • Loading branch information
distractedm1nd committed May 3, 2023
1 parent 1329f8b commit 4d31ad4
Show file tree
Hide file tree
Showing 14 changed files with 311 additions and 28 deletions.
21 changes: 16 additions & 5 deletions nodebuilder/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,30 @@ func WithMetrics(metricOpts []otlpmetrichttp.Option, nodeType node.Type) fx.Opti
fx.Invoke(modheader.WithMetrics),
)

samplingMetrics := fx.Options(
fx.Invoke(das.WithMetrics),
fx.Invoke(share.WithPeerManagerMetrics),
fx.Invoke(share.WithShrexClientMetrics),
fx.Invoke(share.WithShrexGetterMetrics),
)

var opts fx.Option
switch nodeType {
case node.Full, node.Light:
case node.Full:
opts = fx.Options(
baseComponents,
fx.Invoke(share.WithShrexServerMetrics),
samplingMetrics,
)
case node.Light:
opts = fx.Options(
baseComponents,
fx.Invoke(das.WithMetrics),
fx.Invoke(share.WithPeerManagerMetrics),
// add more monitoring here
samplingMetrics,
)
case node.Bridge:
opts = fx.Options(
baseComponents,
// add more monitoring here
fx.Invoke(share.WithShrexServerMetrics),
)
default:
panic("invalid node type")
Expand Down
1 change: 0 additions & 1 deletion nodebuilder/share/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

type Config struct {
// UseShareExchange is a flag toggling the usage of shrex protocols for blocksync.
// NOTE: This config variable only has an effect on full and bridge nodes.
UseShareExchange bool
// ShrExEDSParams sets shrexeds client and server configuration parameters
ShrExEDSParams *shrexeds.Parameters
Expand Down
25 changes: 25 additions & 0 deletions nodebuilder/share/opts.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,36 @@
package share

import (
"github.com/celestiaorg/celestia-node/share/getters"
"github.com/celestiaorg/celestia-node/share/p2p/peers"
"github.com/celestiaorg/celestia-node/share/p2p/shrexeds"
"github.com/celestiaorg/celestia-node/share/p2p/shrexnd"
)

// WithPeerManagerMetrics is a utility function that is expected to be
// "invoked" by the fx lifecycle.
func WithPeerManagerMetrics(m *peers.Manager) error {
return m.WithMetrics()
}

func WithShrexClientMetrics(edsClient *shrexeds.Client, ndClient *shrexnd.Client) error {
err := edsClient.WithMetrics()
if err != nil {
return err
}

return ndClient.WithMetrics()
}

func WithShrexServerMetrics(edsServer *shrexeds.Server, ndServer *shrexnd.Server) error {
err := edsServer.WithMetrics()
if err != nil {
return err
}

return ndServer.WithMetrics()
}

func WithShrexGetterMetrics(sg *getters.ShrexGetter) error {
return sg.WithMetrics()
}
65 changes: 65 additions & 0 deletions share/getters/shrex.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ import (
"fmt"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/global"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/instrument/syncint64"
"go.opentelemetry.io/otel/metric/unit"

"github.com/celestiaorg/nmt/namespace"
"github.com/celestiaorg/rsmt2d"

Expand All @@ -23,8 +29,61 @@ const (
// serve getEDS request for block size 256
defaultMinRequestTimeout = time.Minute // should be >= shrexeds server write timeout
defaultMinAttemptsCount = 3
metricObservationTimeout = 100 * time.Millisecond
)

var meter = global.MeterProvider().Meter("shrex/getter")

type metrics struct {
edsAttempts syncint64.Histogram
ndAttempts syncint64.Histogram
}

func (m *metrics) recordEDSAttempt(attemptCount int, success bool) {
if m == nil {
return
}
ctx, cancel := context.WithTimeout(context.Background(), metricObservationTimeout)
defer cancel()
m.edsAttempts.Record(ctx, int64(attemptCount), attribute.Bool("success", success))
}

func (m *metrics) recordNDAttempt(attemptCount int, success bool) {
if m == nil {
return
}

ctx, cancel := context.WithTimeout(context.Background(), metricObservationTimeout)
defer cancel()
m.ndAttempts.Record(ctx, int64(attemptCount), attribute.Bool("success", success))
}

func (sg *ShrexGetter) WithMetrics() error {
edsAttemptHistogram, err := meter.SyncInt64().Histogram(
"getters_shrex_eds_attempts_per_request",
instrument.WithUnit(unit.Dimensionless),
instrument.WithDescription("Number of attempts per shrex/eds request"),
)
if err != nil {
return err
}

ndAttemptHistogram, err := meter.SyncInt64().Histogram(
"getters_shrex_nd_attempts_per_request",
instrument.WithUnit(unit.Dimensionless),
instrument.WithDescription("Number of attempts per shrex/nd request"),
)
if err != nil {
return err
}

sg.metrics = &metrics{
edsAttempts: edsAttemptHistogram,
ndAttempts: ndAttemptHistogram,
}
return nil
}

// ShrexGetter is a share.Getter that uses the shrex/eds and shrex/nd protocol to retrieve shares.
type ShrexGetter struct {
edsClient *shrexeds.Client
Expand All @@ -37,6 +96,8 @@ type ShrexGetter struct {
// minAttemptsCount will be used to split request timeout into multiple attempts. It will allow to
// attempt multiple peers in scope of one request before context timeout is reached
minAttemptsCount int

metrics *metrics
}

func NewShrexGetter(edsClient *shrexeds.Client, ndClient *shrexnd.Client, peerManager *peers.Manager) *ShrexGetter {
Expand Down Expand Up @@ -79,6 +140,7 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, root *share.Root) (*rsmt2d.Ex
"hash", root.String(),
"err", getErr,
"finished (s)", time.Since(start))
sg.metrics.recordEDSAttempt(attempt, false)
return nil, fmt.Errorf("getter/shrex: %w", err)
}

Expand All @@ -89,6 +151,7 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, root *share.Root) (*rsmt2d.Ex
switch {
case getErr == nil:
setStatus(peers.ResultSynced)
sg.metrics.recordEDSAttempt(attempt, true)
return eds, nil
case errors.Is(getErr, context.DeadlineExceeded),
errors.Is(getErr, context.Canceled):
Expand Down Expand Up @@ -135,6 +198,7 @@ func (sg *ShrexGetter) GetSharesByNamespace(
"hash", root.String(),
"err", getErr,
"finished (s)", time.Since(start))
sg.metrics.recordNDAttempt(attempt, false)
return nil, fmt.Errorf("getter/shrex: %w", err)
}

Expand All @@ -145,6 +209,7 @@ func (sg *ShrexGetter) GetSharesByNamespace(
switch {
case getErr == nil:
setStatus(peers.ResultNoop)
sg.metrics.recordNDAttempt(attempt, true)
return nd, nil
case errors.Is(getErr, context.DeadlineExceeded),
errors.Is(getErr, context.Canceled):
Expand Down
72 changes: 72 additions & 0 deletions share/p2p/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package p2p

import (
"context"
"fmt"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/global"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/instrument/syncint64"
"go.opentelemetry.io/otel/metric/unit"
)

var meter = global.MeterProvider().Meter("shrex/eds")

var observationTimeout = 100 * time.Millisecond

type status string

const (
StatusInternalErr status = "internal_err"
StatusNotFound status = "not_found"
StatusTimeout status = "timeout"
StatusSuccess status = "success"
StatusRateLimited status = "rate_limited"
)

type Metrics struct {
totalRequestCounter syncint64.Counter
}

// ObserveRequests increments the total number of requests sent with the given status as an attribute.
func (m *Metrics) ObserveRequests(count int64, status status) {
if m == nil {
return
}

ctx, cancel := context.WithTimeout(context.Background(), observationTimeout)
defer cancel()
m.totalRequestCounter.Add(ctx, count, attribute.String("status", string(status)))
}

func InitClientMetrics(protocol string) (*Metrics, error) {
totalRequestCounter, err := meter.SyncInt64().Counter(
fmt.Sprintf("shrex_%s_client_total_requests", protocol),
instrument.WithUnit(unit.Dimensionless),
instrument.WithDescription(fmt.Sprintf("Total count of sent shrex/%s requests", protocol)),
)
if err != nil {
return nil, err
}

return &Metrics{
totalRequestCounter: totalRequestCounter,
}, nil
}

func InitServerMetrics(protocol string) (*Metrics, error) {
totalRequestCounter, err := meter.SyncInt64().Counter(
fmt.Sprintf("shrex_%s_server_total_responses", protocol),
instrument.WithUnit(unit.Dimensionless),
instrument.WithDescription(fmt.Sprintf("Total count of sent shrex/%s responses", protocol)),
)
if err != nil {
return nil, err
}

return &Metrics{
totalRequestCounter: totalRequestCounter,
}, nil
}
33 changes: 26 additions & 7 deletions share/p2p/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,40 @@ import (

var log = logging.Logger("shrex/middleware")

func RateLimitMiddleware(inner network.StreamHandler, concurrencyLimit int) network.StreamHandler {
var parallelRequests int64
limit := int64(concurrencyLimit)
type Middleware struct {
// concurrencyLimit is the maximum number of requests that can be processed at once.
concurrencyLimit int64
// parallelRequests is the number of requests currently being processed.
parallelRequests atomic.Int64
// numRateLimited is the number of requests that were rate limited.
numRateLimited atomic.Int64
}

func NewMiddleware(concurrencyLimit int) *Middleware {
return &Middleware{
concurrencyLimit: int64(concurrencyLimit),
}
}

// DrainCounter returns the current value of the rate limit counter and resets it to 0.
func (m *Middleware) DrainCounter() int64 {
return m.numRateLimited.Swap(0)
}

func (m *Middleware) RateLimitHandler(handler network.StreamHandler) network.StreamHandler {
return func(stream network.Stream) {
current := atomic.AddInt64(&parallelRequests, 1)
defer atomic.AddInt64(&parallelRequests, -1)
current := m.parallelRequests.Add(1)
defer m.parallelRequests.Add(-1)

if current > limit {
if current > m.concurrencyLimit {
m.numRateLimited.Add(1)
log.Debug("concurrency limit reached")
err := stream.Close()
if err != nil {
log.Debugw("server: closing stream", "err", err)
}
return
}
inner(stream)
handler(stream)
}
}
10 changes: 9 additions & 1 deletion share/p2p/shrexeds/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ import (
type Client struct {
params *Parameters
protocolID protocol.ID
host host.Host

host host.Host
metrics *p2p.Metrics
}

// NewClient creates a new ShrEx/EDS client.
Expand All @@ -53,14 +54,17 @@ func (c *Client) RequestEDS(
if err == nil {
return eds, nil
}
log.Debugw("client: eds request to peer failed", "peer", peer, "hash", dataHash.String(), "error", err)
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
c.metrics.ObserveRequests(1, p2p.StatusTimeout)
return nil, ctx.Err()
}
// some net.Errors also mean the context deadline was exceeded, but yamux/mocknet do not
// unwrap to a ctx err
var ne net.Error
if errors.As(err, &ne) && ne.Timeout() {
if deadline, _ := ctx.Deadline(); deadline.Before(time.Now()) {
c.metrics.ObserveRequests(1, p2p.StatusTimeout)
return nil, context.DeadlineExceeded
}
}
Expand Down Expand Up @@ -106,6 +110,7 @@ func (c *Client) doRequest(
if err != nil {
// server is overloaded and closed the stream
if errors.Is(err, io.EOF) {
c.metrics.ObserveRequests(1, p2p.StatusRateLimited)
return nil, p2p.ErrNotFound
}
stream.Reset() //nolint:errcheck
Expand All @@ -119,15 +124,18 @@ func (c *Client) doRequest(
if err != nil {
return nil, fmt.Errorf("failed to read eds from ods bytes: %w", err)
}
c.metrics.ObserveRequests(1, p2p.StatusSuccess)
return eds, nil
case pb.Status_NOT_FOUND:
c.metrics.ObserveRequests(1, p2p.StatusNotFound)
return nil, p2p.ErrNotFound
case pb.Status_INVALID:
log.Debug("client: invalid request")
fallthrough
case pb.Status_INTERNAL:
fallthrough
default:
c.metrics.ObserveRequests(1, p2p.StatusInternalErr)
return nil, p2p.ErrInvalidResponse
}
}
Expand Down
3 changes: 2 additions & 1 deletion share/p2p/shrexeds/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,9 @@ func TestExchange_RequestEDS(t *testing.T) {
t.Fatal("timeout")
}
}
middleware := p2p.NewMiddleware(rateLimit)
server.host.SetStreamHandler(server.protocolID,
p2p.RateLimitMiddleware(mockHandler, rateLimit))
middleware.RateLimitHandler(mockHandler))

// take server concurrency slots with blocked requests
for i := 0; i < rateLimit; i++ {
Expand Down
Loading

0 comments on commit 4d31ad4

Please sign in to comment.