Skip to content

Commit

Permalink
fix: flight and context
Browse files Browse the repository at this point in the history
  • Loading branch information
metacertain committed Jun 11, 2021
1 parent 7b958f9 commit f934c85
Showing 1 changed file with 14 additions and 3 deletions.
17 changes: 14 additions & 3 deletions pkg/retrieval/retrieval.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,10 @@ func (s *Service) RetrieveChunk(ctx context.Context, addr swarm.Address, origin
flightRoute = addr.String() + originSuffix
}

v, _, err := s.singleflight.Do(ctx, flightRoute, func(ctx context.Context) (interface{}, error) {
// topCtx is passing the tracing span to the first singleflight call
topCtx := ctx

v, _, err := s.singleflight.Do(ctx, flightRoute, func(ctx context.Context) (interface{}, error) {
maxPeers := 1
if origin {
maxPeers = maxSelects
Expand All @@ -135,12 +137,22 @@ func (s *Service) RetrieveChunk(ctx context.Context, addr swarm.Address, origin

if peerAttempt < maxSelects {

span, _, ctx := s.tracer.StartSpanFromContext(context.Background(), "retrieve-chunk", s.logger, opentracing.Tag{Key: "address", Value: addr.String()})
// create a new context without cancelation but
// set the tracing span to the new context from the context of the first caller
ctx := tracing.WithContext(context.Background(), tracing.FromContext(topCtx))

// get the tracing span
span, _, ctx := s.tracer.StartSpanFromContext(ctx, "retrieve-chunk", s.logger, opentracing.Tag{Key: "address", Value: addr.String()})
defer span.Finish()

peerAttempt++
s.metrics.PeerRequestCounter.Inc()
go func() {

// cancel the goroutine just with the timeout
ctx, cancel := context.WithTimeout(ctx, retrieveChunkTimeout)
defer cancel()

chunk, peer, requested, err := s.retrieveChunk(ctx, addr, sp)
resultC <- retrievalResult{
chunk: chunk,
Expand Down Expand Up @@ -275,7 +287,6 @@ func (s *Service) retrieveChunk(ctx context.Context, addr swarm.Address, sp *ski

defer func() {
if err != nil {
fmt.Println("V", err)
_ = stream.Reset()
} else {
go stream.FullClose()
Expand Down

0 comments on commit f934c85

Please sign in to comment.