Skip to content

Commit

Permalink
Merge branch 'main-celestia' into discovery_metrics
Browse files Browse the repository at this point in the history
# Conflicts:
#	share/p2p/metrics.go
  • Loading branch information
walldiss committed May 11, 2023
2 parents bff848e + a8d02a2 commit d170d2b
Show file tree
Hide file tree
Showing 11 changed files with 80 additions and 67 deletions.
9 changes: 9 additions & 0 deletions das/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ func (m *metrics) observeSample(
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}
m.sampleTime.Record(ctx, sampleTime.Seconds(),
attribute.Bool(failedLabel, err != nil),
attribute.Int(headerWidthLabel, len(h.DAH.RowsRoots)),
Expand All @@ -169,6 +172,9 @@ func (m *metrics) observeGetHeader(ctx context.Context, d time.Duration) {
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}
m.getHeaderTime.Record(ctx, d.Seconds())
}

Expand All @@ -177,5 +183,8 @@ func (m *metrics) observeNewHead(ctx context.Context) {
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}
m.newHead.Add(ctx, 1)
}
7 changes: 5 additions & 2 deletions share/availability/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,11 +248,14 @@ func (d *Discovery) discover(ctx context.Context) bool {
var wg errgroup.Group
// limit to minimize chances of overreaching the limit
wg.SetLimit(int(d.set.Limit()))
defer wg.Wait() //nolint:errcheck

// stop discovery when we are done
findCtx, findCancel := context.WithCancel(ctx)
defer findCancel()
defer func() {
// some workers could still be running, wait them to finish before canceling findCtx
wg.Wait() //nolint:errcheck
findCancel()
}()

peers, err := d.disc.FindPeers(findCtx, rendezvousPoint)
if err != nil {
Expand Down
24 changes: 12 additions & 12 deletions share/getters/shrex.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ const (
// serve getEDS request for block size 256
defaultMinRequestTimeout = time.Minute // should be >= shrexeds server write timeout
defaultMinAttemptsCount = 3
metricObservationTimeout = 100 * time.Millisecond
)

var meter = global.MeterProvider().Meter("shrex/getter")
Expand All @@ -39,22 +38,23 @@ type metrics struct {
ndAttempts syncint64.Histogram
}

func (m *metrics) recordEDSAttempt(attemptCount int, success bool) {
func (m *metrics) recordEDSAttempt(ctx context.Context, attemptCount int, success bool) {
if m == nil {
return
}
ctx, cancel := context.WithTimeout(context.Background(), metricObservationTimeout)
defer cancel()
if ctx.Err() != nil {
ctx = context.Background()
}
m.edsAttempts.Record(ctx, int64(attemptCount), attribute.Bool("success", success))
}

func (m *metrics) recordNDAttempt(attemptCount int, success bool) {
func (m *metrics) recordNDAttempt(ctx context.Context, attemptCount int, success bool) {
if m == nil {
return
}

ctx, cancel := context.WithTimeout(context.Background(), metricObservationTimeout)
defer cancel()
if ctx.Err() != nil {
ctx = context.Background()
}
m.ndAttempts.Record(ctx, int64(attemptCount), attribute.Bool("success", success))
}

Expand Down Expand Up @@ -140,7 +140,7 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, root *share.Root) (*rsmt2d.Ex
"hash", root.String(),
"err", getErr,
"finished (s)", time.Since(start))
sg.metrics.recordEDSAttempt(attempt, false)
sg.metrics.recordEDSAttempt(ctx, attempt, false)
return nil, fmt.Errorf("getter/shrex: %w", err)
}

Expand All @@ -151,7 +151,7 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, root *share.Root) (*rsmt2d.Ex
switch {
case getErr == nil:
setStatus(peers.ResultSynced)
sg.metrics.recordEDSAttempt(attempt, true)
sg.metrics.recordEDSAttempt(ctx, attempt, true)
return eds, nil
case errors.Is(getErr, context.DeadlineExceeded),
errors.Is(getErr, context.Canceled):
Expand Down Expand Up @@ -198,7 +198,7 @@ func (sg *ShrexGetter) GetSharesByNamespace(
"hash", root.String(),
"err", getErr,
"finished (s)", time.Since(start))
sg.metrics.recordNDAttempt(attempt, false)
sg.metrics.recordNDAttempt(ctx, attempt, false)
return nil, fmt.Errorf("getter/shrex: %w", err)
}

Expand All @@ -209,7 +209,7 @@ func (sg *ShrexGetter) GetSharesByNamespace(
switch {
case getErr == nil:
setStatus(peers.ResultNoop)
sg.metrics.recordNDAttempt(attempt, true)
sg.metrics.recordNDAttempt(ctx, attempt, true)
return nd, nil
case errors.Is(getErr, context.DeadlineExceeded),
errors.Is(getErr, context.Canceled):
Expand Down
11 changes: 4 additions & 7 deletions share/p2p/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package p2p
import (
"context"
"fmt"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/global"
Expand All @@ -14,8 +13,6 @@ import (

var meter = global.MeterProvider().Meter("shrex/eds")

var observationTimeout = 100 * time.Millisecond

type status string

const (
Expand All @@ -32,13 +29,13 @@ type Metrics struct {

// ObserveRequests increments the total number of requests sent with the given status as an
// attribute.
func (m *Metrics) ObserveRequests(count int64, status status) {
func (m *Metrics) ObserveRequests(ctx context.Context, count int64, status status) {
if m == nil {
return
}

ctx, cancel := context.WithTimeout(context.Background(), observationTimeout)
defer cancel()
if ctx.Err() != nil {
ctx = context.Background()
}
m.totalRequestCounter.Add(ctx, count, attribute.String("status", string(status)))
}

Expand Down
11 changes: 6 additions & 5 deletions share/p2p/peers/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,29 +189,30 @@ func (m *Manager) Peer(
p.remove(peerID)
return m.Peer(ctx, datahash)
}
return m.newPeer(datahash, peerID, sourceShrexSub, p.len(), 0)
return m.newPeer(ctx, datahash, peerID, sourceShrexSub, p.len(), 0)
}

// if no peer for datahash is currently available, try to use full node
// obtained from discovery
peerID, ok = m.fullNodes.tryGet()
if ok {
return m.newPeer(datahash, peerID, sourceFullNodes, m.fullNodes.len(), 0)
return m.newPeer(ctx, datahash, peerID, sourceFullNodes, m.fullNodes.len(), 0)
}

// no peers are available right now, wait for the first one
start := time.Now()
select {
case peerID = <-p.next(ctx):
return m.newPeer(datahash, peerID, sourceShrexSub, p.len(), time.Since(start))
return m.newPeer(ctx, datahash, peerID, sourceShrexSub, p.len(), time.Since(start))
case peerID = <-m.fullNodes.next(ctx):
return m.newPeer(datahash, peerID, sourceFullNodes, m.fullNodes.len(), time.Since(start))
return m.newPeer(ctx, datahash, peerID, sourceFullNodes, m.fullNodes.len(), time.Since(start))
case <-ctx.Done():
return "", nil, ctx.Err()
}
}

func (m *Manager) newPeer(
ctx context.Context,
datahash share.DataHash,
peerID peer.ID,
source peerSource,
Expand All @@ -223,7 +224,7 @@ func (m *Manager) newPeer(
"source", source,
"pool_size", poolSize,
"wait (s)", waitTime)
m.metrics.observeGetPeer(source, poolSize, waitTime)
m.metrics.observeGetPeer(ctx, source, poolSize, waitTime)
return peerID, m.doneFunc(datahash, peerID, source), nil
}

Expand Down
2 changes: 1 addition & 1 deletion share/p2p/peers/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func TestManager(t *testing.T) {
stopManager(t, manager)
})

t.Run("get peer from discovery", func(t *testing.T) {
t.Run("mark pool synced", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
t.Cleanup(cancel)

Expand Down
20 changes: 10 additions & 10 deletions share/p2p/peers/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (
)

const (
observeTimeout = 100 * time.Millisecond

isInstantKey = "is_instant"
doneResultKey = "done_result"

Expand Down Expand Up @@ -171,12 +169,14 @@ func initMetrics(manager *Manager) (*metrics, error) {
return metrics, nil
}

func (m *metrics) observeGetPeer(source peerSource, poolSize int, waitTime time.Duration) {
func (m *metrics) observeGetPeer(ctx context.Context,
source peerSource, poolSize int, waitTime time.Duration) {
if m == nil {
return
}
ctx, cancel := context.WithTimeout(context.Background(), observeTimeout)
defer cancel()
if ctx.Err() != nil {
ctx = context.Background()
}
m.getPeer.Add(ctx, 1,
attribute.String(sourceKey, string(source)),
attribute.Bool(isInstantKey, waitTime == 0))
Expand All @@ -196,9 +196,8 @@ func (m *metrics) observeDoneResult(source peerSource, result result) {
if m == nil {
return
}
ctx, cancel := context.WithTimeout(context.Background(), observeTimeout)
defer cancel()

ctx := context.Background()
m.doneResult.Add(ctx, 1,
attribute.String(sourceKey, string(source)),
attribute.String(doneResultKey, string(result)))
Expand All @@ -224,10 +223,11 @@ func (m *metrics) validationObserver(validator shrexsub.ValidatorFn) shrexsub.Va
resStr = "unknown"
}

observeCtx, cancel := context.WithTimeout(context.Background(), observeTimeout)
defer cancel()
if ctx.Err() != nil {
ctx = context.Background()
}

m.validationResult.Add(observeCtx, 1,
m.validationResult.Add(ctx, 1,
attribute.String(validationResultKey, resStr))
return res
}
Expand Down
12 changes: 6 additions & 6 deletions share/p2p/shrexeds/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ func (c *Client) RequestEDS(
}
log.Debugw("client: eds request to peer failed", "peer", peer, "hash", dataHash.String(), "error", err)
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
c.metrics.ObserveRequests(1, p2p.StatusTimeout)
c.metrics.ObserveRequests(ctx, 1, p2p.StatusTimeout)
return nil, err
}
// some net.Errors also mean the context deadline was exceeded, but yamux/mocknet do not
// unwrap to a ctx err
var ne net.Error
if errors.As(err, &ne) && ne.Timeout() {
if deadline, _ := ctx.Deadline(); deadline.Before(time.Now()) {
c.metrics.ObserveRequests(1, p2p.StatusTimeout)
c.metrics.ObserveRequests(ctx, 1, p2p.StatusTimeout)
return nil, context.DeadlineExceeded
}
}
Expand Down Expand Up @@ -110,7 +110,7 @@ func (c *Client) doRequest(
if err != nil {
// server is overloaded and closed the stream
if errors.Is(err, io.EOF) {
c.metrics.ObserveRequests(1, p2p.StatusRateLimited)
c.metrics.ObserveRequests(ctx, 1, p2p.StatusRateLimited)
return nil, p2p.ErrNotFound
}
stream.Reset() //nolint:errcheck
Expand All @@ -124,18 +124,18 @@ func (c *Client) doRequest(
if err != nil {
return nil, fmt.Errorf("failed to read eds from ods bytes: %w", err)
}
c.metrics.ObserveRequests(1, p2p.StatusSuccess)
c.metrics.ObserveRequests(ctx, 1, p2p.StatusSuccess)
return eds, nil
case pb.Status_NOT_FOUND:
c.metrics.ObserveRequests(1, p2p.StatusNotFound)
c.metrics.ObserveRequests(ctx, 1, p2p.StatusNotFound)
return nil, p2p.ErrNotFound
case pb.Status_INVALID:
log.Debug("client: invalid request")
fallthrough
case pb.Status_INTERNAL:
fallthrough
default:
c.metrics.ObserveRequests(1, p2p.StatusInternalErr)
c.metrics.ObserveRequests(ctx, 1, p2p.StatusInternalErr)
return nil, p2p.ErrInvalidResponse
}
}
Expand Down
6 changes: 3 additions & 3 deletions share/p2p/shrexeds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (s *Server) Stop(context.Context) error {
func (s *Server) observeRateLimitedRequests() {
numRateLimited := s.middleware.DrainCounter()
if numRateLimited > 0 {
s.metrics.ObserveRequests(numRateLimited, p2p.StatusRateLimited)
s.metrics.ObserveRequests(context.Background(), numRateLimited, p2p.StatusRateLimited)
}
}

Expand Down Expand Up @@ -103,7 +103,7 @@ func (s *Server) handleStream(stream network.Stream) {
status := p2p_pb.Status_OK
switch {
case errors.Is(err, eds.ErrNotFound):
s.metrics.ObserveRequests(1, p2p.StatusNotFound)
s.metrics.ObserveRequests(ctx, 1, p2p.StatusNotFound)
status = p2p_pb.Status_NOT_FOUND
case err != nil:
logger.Errorw("server: get CAR", "err", err)
Expand Down Expand Up @@ -134,7 +134,7 @@ func (s *Server) handleStream(stream network.Stream) {
return
}

s.metrics.ObserveRequests(1, p2p.StatusSuccess)
s.metrics.ObserveRequests(ctx, 1, p2p.StatusSuccess)
err = stream.Close()
if err != nil {
logger.Debugw("server: closing stream", "err", err)
Expand Down
14 changes: 7 additions & 7 deletions share/p2p/shrexnd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,15 @@ func (c *Client) RequestND(
return shares, err
}
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
c.metrics.ObserveRequests(1, p2p.StatusTimeout)
c.metrics.ObserveRequests(ctx, 1, p2p.StatusTimeout)
return nil, err
}
// some net.Errors also mean the context deadline was exceeded, but yamux/mocknet do not
// unwrap to a ctx err
var ne net.Error
if errors.As(err, &ne) && ne.Timeout() {
if deadline, _ := ctx.Deadline(); deadline.Before(time.Now()) {
c.metrics.ObserveRequests(1, p2p.StatusTimeout)
c.metrics.ObserveRequests(ctx, 1, p2p.StatusTimeout)
return nil, context.DeadlineExceeded
}
}
Expand Down Expand Up @@ -112,14 +112,14 @@ func (c *Client) doRequest(
if err != nil {
// server is overloaded and closed the stream
if errors.Is(err, io.EOF) {
c.metrics.ObserveRequests(1, p2p.StatusRateLimited)
c.metrics.ObserveRequests(ctx, 1, p2p.StatusRateLimited)
return nil, p2p.ErrNotFound
}
stream.Reset() //nolint:errcheck
return nil, fmt.Errorf("client-nd: reading response: %w", err)
}

if err = c.statusToErr(resp.Status); err != nil {
if err = c.statusToErr(ctx, resp.Status); err != nil {
return nil, fmt.Errorf("client-nd: response code is not OK: %w", err)
}

Expand Down Expand Up @@ -187,13 +187,13 @@ func (c *Client) setStreamDeadlines(ctx context.Context, stream network.Stream)
}
}

func (c *Client) statusToErr(code pb.StatusCode) error {
func (c *Client) statusToErr(ctx context.Context, code pb.StatusCode) error {
switch code {
case pb.StatusCode_OK:
c.metrics.ObserveRequests(1, p2p.StatusSuccess)
c.metrics.ObserveRequests(ctx, 1, p2p.StatusSuccess)
return nil
case pb.StatusCode_NOT_FOUND:
c.metrics.ObserveRequests(1, p2p.StatusNotFound)
c.metrics.ObserveRequests(ctx, 1, p2p.StatusNotFound)
return p2p.ErrNotFound
case pb.StatusCode_INVALID:
log.Debug("client-nd: invalid request")
Expand Down
Loading

0 comments on commit d170d2b

Please sign in to comment.