Skip to content

Commit

Permalink
fix: retry logic
Browse files Browse the repository at this point in the history
  • Loading branch information
metacertain committed May 19, 2021
1 parent 7e9c557 commit 58d2295
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 32 deletions.
72 changes: 40 additions & 32 deletions pkg/retrieval/retrieval.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ type Interface interface {
RetrieveChunk(ctx context.Context, addr swarm.Address, origin bool) (chunk swarm.Chunk, err error)
}

type retrievalResult struct {
chunk swarm.Chunk
peer swarm.Address
err error
retrieved bool
}

type Service struct {
addr swarm.Address
streamer p2p.Streamer
Expand Down Expand Up @@ -95,13 +102,13 @@ const (
func (s *Service) RetrieveChunk(ctx context.Context, addr swarm.Address, origin bool) (swarm.Chunk, error) {
s.metrics.RequestCounter.Inc()

maxPeers := 1
v, err, _ := s.singleflight.Do(addr.String(), func() (interface{}, error) {

if origin {
maxPeers = 8
}
maxPeers := 1
if origin {
maxPeers = 8
}

v, err, _ := s.singleflight.Do(addr.String(), func() (interface{}, error) {
span, logger, ctx := s.tracer.StartSpanFromContext(ctx, "retrieve-chunk", s.logger, opentracing.Tag{Key: "address", Value: addr.String()})
defer span.Finish()

Expand All @@ -111,35 +118,25 @@ func (s *Service) RetrieveChunk(ctx context.Context, addr swarm.Address, origin
defer ticker.Stop()

var (
peerAttempt int
requestAttempt int
peersResults int
resultC = make(chan swarm.Chunk, maxPeers)
errC = make(chan error, maxPeers)
peerAttempt int
peersResults int
resultC = make(chan retrievalResult, maxPeers)
)

for {
if requestAttempt < maxPeers {
requestAttempt := 0
for requestAttempt < 3 {
if peerAttempt < maxPeers {
peerAttempt++

s.metrics.PeerRequestCounter.Inc()

go func() {
chunk, peer, requested, err := s.retrieveChunk(ctx, addr, sp)
if err != nil {
if requested {
requestAttempt++
}

if !peer.IsZero() {
logger.Debugf("retrieval: failed to get chunk %s from peer %s: %v", addr, peer, err)
}

errC <- err
}

if chunk != nil {
resultC <- chunk
resultC <- retrievalResult{
chunk: chunk,
peer: peer,
err: err,
retrieved: requested,
}
}()
} else {
Expand All @@ -149,26 +146,37 @@ func (s *Service) RetrieveChunk(ctx context.Context, addr swarm.Address, origin
select {
case <-ticker.C:
// break
case chunk := <-resultC:
return chunk, nil
case <-errC:
peersResults++
case res := <-resultC:
if res.retrieved {
if res.err != nil {
if !res.peer.IsZero() {
logger.Debugf("retrieval: failed to get chunk %s from peer %s: %v", addr, res.peer, res.err)
}
peersResults++
} else {
return res.chunk, nil
}
}
case <-ctx.Done():
logger.Tracef("retrieval: failed to get chunk %s: %v", addr, ctx.Err())
return nil, fmt.Errorf("retrieval: %w", ctx.Err())
}

// all results received
if peersResults >= maxPeers && requestAttempt >= maxPeers {
if peersResults >= maxPeers {
logger.Tracef("retrieval: failed to get chunk %s", addr)
return nil, storage.ErrNotFound
}

if peerAttempt >= maxPeers {
// wait and reset skiplist
requestAttempt++
peerAttempt = 0
sp = newSkipPeers()
}
}

return nil, storage.ErrNotFound

})
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions pkg/retrieval/retrieval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ func TestRetrievePreemptiveRetry(t *testing.T) {
}
},
),
streamtest.WithBaseAddr(clientAddress),
)

client := retrieval.New(clientAddress, nil, recorder, peerSuggesterFn(peers...), logger, accountingmock.NewAccounting(), pricerMock, nil)
Expand Down

0 comments on commit 58d2295

Please sign in to comment.