From 8c755bc9d7122b45c42e04dd74ca911798845509 Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Mon, 29 Jan 2024 13:21:53 +0200 Subject: [PATCH 01/10] feat(traces/session): cover session with traces --- p2p/session.go | 49 +++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 41 insertions(+), 8 deletions(-) diff --git a/p2p/session.go b/p2p/session.go index 37f1e258..13074d32 100644 --- a/p2p/session.go +++ b/p2p/session.go @@ -9,11 +9,19 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/protocol" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" "github.com/celestiaorg/go-header" p2p_pb "github.com/celestiaorg/go-header/p2p/pb" ) +var ( + tracerSession = otel.Tracer("header/p2p-session") +) + // errEmptyResponse means that server side closes the connection without sending at least 1 // response. var errEmptyResponse = errors.New("empty response") @@ -77,9 +85,15 @@ func newSession[H header.Header[H]]( func (s *session[H]) getRangeByHeight( ctx context.Context, from, amount, headersPerPeer uint64, -) ([]H, error) { +) (_ []H, err error) { log.Debugw("requesting headers", "from", from, "to", from+amount-1) // -1 need to exclude to+1 height + ctx, span := tracerSession.Start(ctx, "get-range-by-height", trace.WithAttributes( + attribute.Int64("from", int64(from)), + attribute.Int64("to", int64(from+amount-1)), + )) + defer span.End() + requests := prepareRequests(from, amount, headersPerPeer) result := make(chan []H, len(requests)) s.reqCh = make(chan *p2p_pb.HeaderRequest, len(requests)) @@ -94,8 +108,11 @@ LOOP: for { select { case <-s.ctx.Done(): - return nil, errors.New("header/p2p: exchange is closed") + err = errors.New("header/p2p: exchange is closed") + span.SetStatus(codes.Error, err.Error()) + return nil, err case <-ctx.Done(): + span.SetStatus(codes.Error, ctx.Err().Error()) return nil, ctx.Err() case res := <-result: headers = append(headers, res...) @@ -113,6 +130,7 @@ LOOP: "from", headers[0].Height(), "to", headers[len(headers)-1].Height(), ) + span.SetStatus(codes.Ok, "") return headers, nil } @@ -152,12 +170,23 @@ func (s *session[H]) doRequest( req *p2p_pb.HeaderRequest, headers chan []H, ) { + ctx, span := tracerSession.Start(ctx, "request-headers-from-peer", trace.WithAttributes( + attribute.String("peerID", stat.peerID.String()), + attribute.Int64("from", int64(req.GetOrigin())), + attribute.Int64("amount", int64(req.Amount)), + )) + + defer span.SetStatus(codes.Ok, "") + defer span.End() + ctx, cancel := context.WithTimeout(ctx, s.requestTimeout) defer cancel() r, size, duration, err := sendMessage(ctx, s.host, stat.peerID, s.protocolID, req) s.metrics.response(ctx, size, duration, err) if err != nil { + span.AddEvent("error during range fetching", trace.WithAttributes( + attribute.String("error", err.Error()))) // we should not punish peer at this point and should try to parse responses, despite that error // was received. log.Debugw("requesting headers from peer failed", "peer", stat.peerID, "err", err) @@ -165,6 +194,8 @@ func (s *session[H]) doRequest( h, err := s.processResponses(r) if err != nil { + span.AddEvent("processing response failed", trace.WithAttributes( + attribute.String("error", err.Error()))) logFn := log.Errorw switch err { @@ -195,21 +226,23 @@ func (s *session[H]) doRequest( "requestedAmount", req.Amount, ) + remainingHeaders := req.Amount - uint64(len(h)) + + span.AddEvent("request succeed", trace.WithAttributes( + attribute.Int64("remaining headers", int64(remainingHeaders)))) + // update peer stats stat.updateStats(size, duration) - responseLn := uint64(len(h)) // ensure that we received the correct amount of headers. - if responseLn < req.Amount { - from := h[responseLn-1].Height() - amount := req.Amount - responseLn - + if remainingHeaders > 0 { + from := h[uint64(len(h))-1].Height() select { case <-s.ctx.Done(): return // create a new request with the remaining headers. // prepareRequests will return a slice with 1 element at this point - case s.reqCh <- prepareRequests(from+1, amount, req.Amount)[0]: + case s.reqCh <- prepareRequests(from+1, remainingHeaders, req.Amount)[0]: log.Debugw("sending additional request to get remaining headers") } } From e5afbeb88c59d2a80c50abaf46760265b7457864 Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Thu, 1 Feb 2024 13:58:49 +0200 Subject: [PATCH 02/10] chore: cover exchange client with additional traces --- p2p/exchange.go | 75 +++++++++++++++++++++++++++++++++++++++++++++---- p2p/server.go | 8 +++--- 2 files changed, 74 insertions(+), 9 deletions(-) diff --git a/p2p/exchange.go b/p2p/exchange.go index ec424b47..b017d15a 100644 --- a/p2p/exchange.go +++ b/p2p/exchange.go @@ -14,12 +14,20 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/p2p/net/conngater" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" "github.com/celestiaorg/go-header" p2p_pb "github.com/celestiaorg/go-header/p2p/pb" ) -var log = logging.Logger("header/p2p") +var ( + log = logging.Logger("header/p2p") + + tracerClient = otel.Tracer("header/p2p-client") +) // minHeadResponses is the minimum number of headers of the same height // received from peers to determine the network head. If all trusted peers @@ -113,6 +121,7 @@ func (ex *Exchange[H]) Stop(ctx context.Context) error { // and return the highest one. func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (H, error) { log.Debug("requesting head") + ctx, span := tracerClient.Start(ctx, "head") reqCtx := ctx startTime := time.Now() @@ -157,8 +166,20 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) ( ) for _, from := range peers { go func(from peer.ID) { + span.AddEvent("sending request to peer", + trace.WithAttributes( + attribute.String("peerID", from.String()), + attribute.Bool("is trusted", reqParams.TrustedHead.IsZero()), + ), + ) headers, err := ex.request(reqCtx, from, headerReq) if err != nil { + span.AddEvent("request failed", + trace.WithAttributes( + attribute.String("peerID", from.String()), + attribute.String("error", err.Error())), + ) + log.Errorw("head request to peer failed", "peer", from, "err", err) headerRespCh <- zero return @@ -171,6 +192,13 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) ( if errors.As(err, &verErr) && verErr.SoftFailure { log.Debugw("received head from tracked peer that soft-failed verification", "tracked peer", from, "err", err) + + span.AddEvent("soft-failed verification header received", + trace.WithAttributes( + attribute.String("peerID", from.String()), + attribute.String("error", err.Error())), + ) + headerRespCh <- headers[0] return } @@ -180,10 +208,17 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) ( } logF("verifying head received from tracked peer", "tracked peer", from, "height", headers[0].Height(), "err", err) + span.AddEvent("verifying head received", + trace.WithAttributes( + attribute.String("peerID", from.String()), + attribute.Int64("height", int64(headers[0].Height())), + attribute.String("error", err.Error())), + ) headerRespCh <- zero return } } + span.AddEvent("request succeeded") // request ensures that the result slice will have at least one Header headerRespCh <- headers[0] }(from) @@ -206,11 +241,12 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) ( if errors.Is(ctx.Err(), context.DeadlineExceeded) { status = headStatusTimeout } - + span.SetStatus(codes.Error, fmt.Sprintf("head request %s", status)) ex.metrics.head(ctx, time.Since(startTime), len(headers), headType, status) return zero, ctx.Err() case <-ex.ctx.Done(): ex.metrics.head(ctx, time.Since(startTime), len(headers), headType, headStatusCanceled) + span.SetStatus(codes.Error, "exchange client stopped") return zero, ex.ctx.Err() } } @@ -218,10 +254,12 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) ( head, err := bestHead[H](headers) if err != nil { ex.metrics.head(ctx, time.Since(startTime), len(headers), headType, headStatusNoHeaders) + span.SetStatus(codes.Error, headStatusNoHeaders) return zero, err } ex.metrics.head(ctx, time.Since(startTime), len(headers), headType, headStatusOk) + span.SetStatus(codes.Ok, "") return head, nil } @@ -230,10 +268,16 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) ( // thereafter. func (ex *Exchange[H]) GetByHeight(ctx context.Context, height uint64) (H, error) { log.Debugw("requesting header", "height", height) + ctx, span := tracerClient.Start(ctx, "get-by-height", + trace.WithAttributes( + attribute.Int64("height", int64(height)), + )) var zero H // sanity check height if height == 0 { - return zero, fmt.Errorf("specified request height must be greater than 0") + err := fmt.Errorf("specified request height must be greater than 0") + span.SetStatus(codes.Error, err.Error()) + return zero, err } // create request req := &p2p_pb.HeaderRequest{ @@ -242,8 +286,10 @@ func (ex *Exchange[H]) GetByHeight(ctx context.Context, height uint64) (H, error } headers, err := ex.performRequest(ctx, req) if err != nil { + span.SetStatus(codes.Error, err.Error()) return zero, err } + span.SetStatus(codes.Ok, "") return headers[0], nil } @@ -254,19 +300,34 @@ func (ex *Exchange[H]) GetRangeByHeight( from H, to uint64, ) ([]H, error) { + ctx, span := tracerClient.Start(ctx, "get-range-by-height", + trace.WithAttributes( + attribute.Int64("from", int64(from.Height())), + attribute.Int64("to", int64(to)), + )) session := newSession[H]( ex.ctx, ex.host, ex.peerTracker, ex.protocolID, ex.Params.RangeRequestTimeout, ex.metrics, withValidation(from), ) defer session.close() // we request the next header height that we don't have: `fromHead`+1 amount := to - (from.Height() + 1) - return session.getRangeByHeight(ctx, from.Height()+1, amount, ex.Params.MaxHeadersPerRangeRequest) + result, err := session.getRangeByHeight(ctx, from.Height()+1, amount, ex.Params.MaxHeadersPerRangeRequest) + if err != nil { + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + span.SetStatus(codes.Ok, "") + return result, nil } // Get performs a request for the Header by the given hash corresponding // to the RawHeader. Note that the Header must be verified thereafter. func (ex *Exchange[H]) Get(ctx context.Context, hash header.Hash) (H, error) { log.Debugw("requesting header", "hash", hash.String()) + ctx, span := tracerClient.Start(ctx, "get-by-hash", + trace.WithAttributes( + attribute.String("hash", hash.String()), + )) var zero H // create request req := &p2p_pb.HeaderRequest{ @@ -275,12 +336,16 @@ func (ex *Exchange[H]) Get(ctx context.Context, hash header.Hash) (H, error) { } headers, err := ex.performRequest(ctx, req) if err != nil { + span.SetStatus(codes.Error, err.Error()) return zero, err } if !bytes.Equal(headers[0].Hash(), hash) { - return zero, fmt.Errorf("incorrect hash in header: expected %x, got %x", hash, headers[0].Hash()) + err = fmt.Errorf("incorrect hash in header: expected %x, got %x", hash, headers[0].Hash()) + span.SetStatus(codes.Error, err.Error()) + return zero, err } + span.SetStatus(codes.Ok, "") return headers[0], nil } diff --git a/p2p/server.go b/p2p/server.go index 327b9158..55ccbddc 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -20,7 +20,7 @@ import ( ) var ( - tracer = otel.Tracer("header/server") + tracerServ = otel.Tracer("header/server") ) // ExchangeServer represents the server-side component for @@ -173,7 +173,7 @@ func (serv *ExchangeServer[H]) handleRequestByHash(hash []byte) ([]H, error) { log.Debugw("server: handling header request", "hash", header.Hash(hash).String()) ctx, cancel := context.WithTimeout(serv.ctx, serv.Params.RangeRequestTimeout) defer cancel() - ctx, span := tracer.Start(ctx, "request-by-hash", trace.WithAttributes( + ctx, span := tracerServ.Start(ctx, "request-by-hash", trace.WithAttributes( attribute.String("hash", header.Hash(hash).String()), )) defer span.End() @@ -204,7 +204,7 @@ func (serv *ExchangeServer[H]) handleRequest(from, to uint64) ([]H, error) { } startTime := time.Now() - ctx, span := tracer.Start(serv.ctx, "request-range", trace.WithAttributes( + ctx, span := tracerServ.Start(serv.ctx, "request-range", trace.WithAttributes( attribute.Int64("from", int64(from)), attribute.Int64("to", int64(to)))) defer span.End() @@ -273,7 +273,7 @@ func (serv *ExchangeServer[H]) handleHeadRequest() ([]H, error) { log.Debug("server: handling head request") ctx, cancel := context.WithTimeout(serv.ctx, serv.Params.RangeRequestTimeout) defer cancel() - ctx, span := tracer.Start(ctx, "request-head") + ctx, span := tracerServ.Start(ctx, "request-head") defer span.End() head, err := serv.store.Head(ctx) From 192ef48081f31dd45c3b87930697b26feabdddaf Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Thu, 1 Feb 2024 16:21:09 +0200 Subject: [PATCH 03/10] chore: add baggage --- p2p/exchange.go | 36 +++++++++++++++++++----------------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/p2p/exchange.go b/p2p/exchange.go index b017d15a..20b4cf74 100644 --- a/p2p/exchange.go +++ b/p2p/exchange.go @@ -16,6 +16,7 @@ import ( "github.com/libp2p/go-libp2p/p2p/net/conngater" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/baggage" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" @@ -122,6 +123,7 @@ func (ex *Exchange[H]) Stop(ctx context.Context) error { func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (H, error) { log.Debug("requesting head") ctx, span := tracerClient.Start(ctx, "head") + defer span.End() reqCtx := ctx startTime := time.Now() @@ -166,20 +168,18 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) ( ) for _, from := range peers { go func(from peer.ID) { - span.AddEvent("sending request to peer", - trace.WithAttributes( - attribute.String("peerID", from.String()), - attribute.Bool("is trusted", reqParams.TrustedHead.IsZero()), - ), - ) + // can skip error handling here as it returns an error if sanity check fails. + // we can be sure that our strings are ok. + peerIDBaggage, _ := baggage.NewMember("peerID", from.String()) + b, _ := baggage.New(peerIDBaggage) + ctx = baggage.ContextWithBaggage(ctx, b) + _, newSpan := span.TracerProvider().Tracer("requesting peer").Start(ctx, "") + defer newSpan.End() + newSpan.AddEvent("sending request to peer") + headers, err := ex.request(reqCtx, from, headerReq) if err != nil { - span.AddEvent("request failed", - trace.WithAttributes( - attribute.String("peerID", from.String()), - attribute.String("error", err.Error())), - ) - + newSpan.AddEvent("request failed", trace.WithAttributes(attribute.String("error", err.Error()))) log.Errorw("head request to peer failed", "peer", from, "err", err) headerRespCh <- zero return @@ -193,9 +193,8 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) ( log.Debugw("received head from tracked peer that soft-failed verification", "tracked peer", from, "err", err) - span.AddEvent("soft-failed verification header received", + newSpan.AddEvent("soft-failed verification header received", trace.WithAttributes( - attribute.String("peerID", from.String()), attribute.String("error", err.Error())), ) @@ -208,9 +207,9 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) ( } logF("verifying head received from tracked peer", "tracked peer", from, "height", headers[0].Height(), "err", err) - span.AddEvent("verifying head received", + + newSpan.AddEvent("verifying head received", trace.WithAttributes( - attribute.String("peerID", from.String()), attribute.Int64("height", int64(headers[0].Height())), attribute.String("error", err.Error())), ) @@ -218,7 +217,7 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) ( return } } - span.AddEvent("request succeeded") + newSpan.AddEvent("request succeeded") // request ensures that the result slice will have at least one Header headerRespCh <- headers[0] }(from) @@ -272,6 +271,7 @@ func (ex *Exchange[H]) GetByHeight(ctx context.Context, height uint64) (H, error trace.WithAttributes( attribute.Int64("height", int64(height)), )) + defer span.End() var zero H // sanity check height if height == 0 { @@ -305,6 +305,7 @@ func (ex *Exchange[H]) GetRangeByHeight( attribute.Int64("from", int64(from.Height())), attribute.Int64("to", int64(to)), )) + defer span.End() session := newSession[H]( ex.ctx, ex.host, ex.peerTracker, ex.protocolID, ex.Params.RangeRequestTimeout, ex.metrics, withValidation(from), ) @@ -328,6 +329,7 @@ func (ex *Exchange[H]) Get(ctx context.Context, hash header.Hash) (H, error) { trace.WithAttributes( attribute.String("hash", hash.String()), )) + defer span.End() var zero H // create request req := &p2p_pb.HeaderRequest{ From 6e185c4bbdfed4f3dd40d43a252025fbe8ab7274 Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Thu, 1 Feb 2024 16:28:53 +0200 Subject: [PATCH 04/10] fix race --- p2p/exchange.go | 1 + 1 file changed, 1 insertion(+) diff --git a/p2p/exchange.go b/p2p/exchange.go index 20b4cf74..1e8b5712 100644 --- a/p2p/exchange.go +++ b/p2p/exchange.go @@ -167,6 +167,7 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) ( headerRespCh = make(chan H, len(peers)) ) for _, from := range peers { + ctx := ctx go func(from peer.ID) { // can skip error handling here as it returns an error if sanity check fails. // we can be sure that our strings are ok. From 939a4f00da3853ac85ec07c482f04c48005c404e Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Thu, 1 Feb 2024 17:58:15 +0200 Subject: [PATCH 05/10] chore: change events to statuse --- p2p/exchange.go | 19 ++++--------------- 1 file changed, 4 insertions(+), 15 deletions(-) diff --git a/p2p/exchange.go b/p2p/exchange.go index 1e8b5712..7dee31d2 100644 --- a/p2p/exchange.go +++ b/p2p/exchange.go @@ -176,11 +176,10 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) ( ctx = baggage.ContextWithBaggage(ctx, b) _, newSpan := span.TracerProvider().Tracer("requesting peer").Start(ctx, "") defer newSpan.End() - newSpan.AddEvent("sending request to peer") headers, err := ex.request(reqCtx, from, headerReq) if err != nil { - newSpan.AddEvent("request failed", trace.WithAttributes(attribute.String("error", err.Error()))) + newSpan.SetStatus(codes.Error, err.Error()) log.Errorw("head request to peer failed", "peer", from, "err", err) headerRespCh <- zero return @@ -193,12 +192,7 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) ( if errors.As(err, &verErr) && verErr.SoftFailure { log.Debugw("received head from tracked peer that soft-failed verification", "tracked peer", from, "err", err) - - newSpan.AddEvent("soft-failed verification header received", - trace.WithAttributes( - attribute.String("error", err.Error())), - ) - + newSpan.SetStatus(codes.Error, err.Error()) headerRespCh <- headers[0] return } @@ -208,17 +202,12 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) ( } logF("verifying head received from tracked peer", "tracked peer", from, "height", headers[0].Height(), "err", err) - - newSpan.AddEvent("verifying head received", - trace.WithAttributes( - attribute.Int64("height", int64(headers[0].Height())), - attribute.String("error", err.Error())), - ) + newSpan.SetStatus(codes.Error, err.Error()) headerRespCh <- zero return } } - newSpan.AddEvent("request succeeded") + newSpan.SetStatus(codes.Ok, "") // request ensures that the result slice will have at least one Header headerRespCh <- headers[0] }(from) From 9e9b6ae4e68d45a7d183bae70c4a54252ba04ca7 Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Thu, 1 Feb 2024 18:07:49 +0200 Subject: [PATCH 06/10] create local ctx --- p2p/exchange.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/p2p/exchange.go b/p2p/exchange.go index 7dee31d2..4825546d 100644 --- a/p2p/exchange.go +++ b/p2p/exchange.go @@ -167,14 +167,13 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) ( headerRespCh = make(chan H, len(peers)) ) for _, from := range peers { - ctx := ctx go func(from peer.ID) { // can skip error handling here as it returns an error if sanity check fails. // we can be sure that our strings are ok. peerIDBaggage, _ := baggage.NewMember("peerID", from.String()) b, _ := baggage.New(peerIDBaggage) - ctx = baggage.ContextWithBaggage(ctx, b) - _, newSpan := span.TracerProvider().Tracer("requesting peer").Start(ctx, "") + baggageCtx := baggage.ContextWithBaggage(ctx, b) + _, newSpan := span.TracerProvider().Tracer("requesting peer").Start(baggageCtx, "") defer newSpan.End() headers, err := ex.request(reqCtx, from, headerReq) From 056f093f46124a11059e08043c8c296b1220e99c Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Thu, 1 Feb 2024 18:48:08 +0200 Subject: [PATCH 07/10] remove baggage --- p2p/exchange.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/p2p/exchange.go b/p2p/exchange.go index 4825546d..4c14e823 100644 --- a/p2p/exchange.go +++ b/p2p/exchange.go @@ -16,7 +16,6 @@ import ( "github.com/libp2p/go-libp2p/p2p/net/conngater" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/baggage" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" @@ -168,12 +167,10 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) ( ) for _, from := range peers { go func(from peer.ID) { - // can skip error handling here as it returns an error if sanity check fails. - // we can be sure that our strings are ok. - peerIDBaggage, _ := baggage.NewMember("peerID", from.String()) - b, _ := baggage.New(peerIDBaggage) - baggageCtx := baggage.ContextWithBaggage(ctx, b) - _, newSpan := span.TracerProvider().Tracer("requesting peer").Start(baggageCtx, "") + _, newSpan := span.TracerProvider().Tracer("requesting peer").Start( + ctx, "", + trace.WithAttributes(attribute.String("peerID", from.String())), + ) defer newSpan.End() headers, err := ex.request(reqCtx, from, headerReq) From 9b565f05d323f71f2e11526e0f5f4cb5df21e3a2 Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Thu, 1 Feb 2024 18:58:43 +0200 Subject: [PATCH 08/10] change events to statuses --- p2p/session.go | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/p2p/session.go b/p2p/session.go index 13074d32..58e85a97 100644 --- a/p2p/session.go +++ b/p2p/session.go @@ -175,8 +175,6 @@ func (s *session[H]) doRequest( attribute.Int64("from", int64(req.GetOrigin())), attribute.Int64("amount", int64(req.Amount)), )) - - defer span.SetStatus(codes.Ok, "") defer span.End() ctx, cancel := context.WithTimeout(ctx, s.requestTimeout) @@ -185,8 +183,7 @@ func (s *session[H]) doRequest( r, size, duration, err := sendMessage(ctx, s.host, stat.peerID, s.protocolID, req) s.metrics.response(ctx, size, duration, err) if err != nil { - span.AddEvent("error during range fetching", trace.WithAttributes( - attribute.String("error", err.Error()))) + span.SetStatus(codes.Error, err.Error()) // we should not punish peer at this point and should try to parse responses, despite that error // was received. log.Debugw("requesting headers from peer failed", "peer", stat.peerID, "err", err) @@ -194,8 +191,7 @@ func (s *session[H]) doRequest( h, err := s.processResponses(r) if err != nil { - span.AddEvent("processing response failed", trace.WithAttributes( - attribute.String("error", err.Error()))) + span.SetStatus(codes.Error, err.Error()) logFn := log.Errorw switch err { @@ -228,8 +224,7 @@ func (s *session[H]) doRequest( remainingHeaders := req.Amount - uint64(len(h)) - span.AddEvent("request succeed", trace.WithAttributes( - attribute.Int64("remaining headers", int64(remainingHeaders)))) + span.SetStatus(codes.Ok, fmt.Sprintf("request succeed. remaining header %d", remainingHeaders)) // update peer stats stat.updateStats(size, duration) From 1892102299dc5b06dd497023fa3217d28342fbc4 Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Thu, 1 Feb 2024 19:16:40 +0200 Subject: [PATCH 09/10] add event for remaining headers --- p2p/session.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/p2p/session.go b/p2p/session.go index 58e85a97..8c035bac 100644 --- a/p2p/session.go +++ b/p2p/session.go @@ -224,13 +224,17 @@ func (s *session[H]) doRequest( remainingHeaders := req.Amount - uint64(len(h)) - span.SetStatus(codes.Ok, fmt.Sprintf("request succeed. remaining header %d", remainingHeaders)) + span.SetStatus(codes.Ok, "") // update peer stats stat.updateStats(size, duration) // ensure that we received the correct amount of headers. if remainingHeaders > 0 { + span.AddEvent("remaining headers", trace.WithAttributes( + attribute.Int64("amount", int64(remainingHeaders))), + ) + from := h[uint64(len(h))-1].Height() select { case <-s.ctx.Done(): From cd263e18a7cf536b5a0367a1336c787f66fbbc33 Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Thu, 1 Feb 2024 20:19:27 +0200 Subject: [PATCH 10/10] create child span --- p2p/exchange.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/p2p/exchange.go b/p2p/exchange.go index 4c14e823..13fdf43f 100644 --- a/p2p/exchange.go +++ b/p2p/exchange.go @@ -167,12 +167,12 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) ( ) for _, from := range peers { go func(from peer.ID) { - _, newSpan := span.TracerProvider().Tracer("requesting peer").Start( - ctx, "", + _, newSpan := tracerClient.Start( + ctx, "requesting peer", trace.WithAttributes(attribute.String("peerID", from.String())), ) defer newSpan.End() - + headers, err := ex.request(reqCtx, from, headerReq) if err != nil { newSpan.SetStatus(codes.Error, err.Error())