Skip to content

Commit

Permalink
feat: reattempt at overdraft
Browse files Browse the repository at this point in the history
  • Loading branch information
metacertain committed Jun 11, 2021
1 parent 2899d34 commit d77bf0d
Showing 1 changed file with 62 additions and 22 deletions.
84 changes: 62 additions & 22 deletions pkg/retrieval/retrieval.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,21 +454,66 @@ func (s *Service) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (e

func (s *Service) CheckAvailableChunk(ctx context.Context, addr swarm.Address) (err error) {

sp := newSkipPeers()
lastTime := time.Now().Unix()

selectAttempt := 0
requestAttempt := 0

for requestAttempt < 5 {
requestAttempt++
for selectAttempt < 8 {
selectAttempt++
attempted, err := s.checkAvailableChunk(ctx, addr, sp)
if err == nil && attempted {
return nil
}
}

timeNow := time.Now().Unix()
if timeNow > lastTime {
lastTime = timeNow
selectAttempt = 0
sp.Reset()
} else {
select {
case <-time.After(600 * time.Millisecond):
}
}

}

return storage.ErrNotFound
}

func (s *Service) checkAvailableChunk(ctx context.Context, addr swarm.Address, sp *skipPeers) (attempt bool, err error) {

ctx, cancel := context.WithTimeout(ctx, retrieveChunkTimeout)
defer cancel()
peer, err := s.farthestPeer(addr)

peer, err := s.farthestPeer(addr, sp.All())
if err != nil {
return fmt.Errorf("get farthest for address %s, allow upstream %v: %w", addr.String(), err)
return false, fmt.Errorf("get farthest for address %s, allow upstream %v: %w", addr.String(), err)
}

// compute the peer's price for this chunk for price header
chunkPrice := s.pricer.PeerPrice(peer, addr)

// Reserve to see whether we can request the chunk
err = s.accounting.Reserve(ctx, peer, chunkPrice)
if err != nil {
sp.AddOverdraft(peer)
return false, err
}
defer s.accounting.Release(peer, chunkPrice)

sp.Add(peer)

s.logger.Tracef("retrieval: requesting chunk %s from peer %s", addr, peer)
stream, err := s.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName)
if err != nil {
s.metrics.TotalErrors.Inc()
return fmt.Errorf("new stream: %w", err)
return false, fmt.Errorf("new stream: %w", err)
}

defer func() {
Expand All @@ -478,53 +523,48 @@ func (s *Service) CheckAvailableChunk(ctx context.Context, addr swarm.Address) (
go stream.FullClose()
}
}()

// Reserve to see whether we can request the chunk
err = s.accounting.Reserve(ctx, peer, chunkPrice)
if err != nil {
return err
}
defer s.accounting.Release(peer, chunkPrice)

w, r := protobuf.NewWriterAndReader(stream)
if err := w.WriteMsgWithContext(ctx, &pb.Request{
Addr: addr.Bytes(),
}); err != nil {
return fmt.Errorf("write request: %w peer %s", err, peer.String())
return false, fmt.Errorf("write request: %w peer %s", err, peer.String())
}

var d pb.Delivery
if err := r.ReadMsgWithContext(ctx, &d); err != nil {
return fmt.Errorf("read delivery: %w peer %s", err, peer.String())
return true, fmt.Errorf("read delivery: %w peer %s", err, peer.String())
}

stamp := new(postage.Stamp)
err = stamp.UnmarshalBinary(d.Stamp)
if err != nil {
return fmt.Errorf("stamp unmarshal: %w", err)
return true, fmt.Errorf("stamp unmarshal: %w", err)
}

chunk := swarm.NewChunk(addr, d.Data).WithStamp(stamp)
if !cac.Valid(chunk) {
if !soc.Valid(chunk) {
s.metrics.InvalidChunkRetrieved.Inc()
s.metrics.TotalErrors.Inc()
return swarm.ErrInvalidChunk
return true, swarm.ErrInvalidChunk
}
}

// credit the peer after successful delivery
err = s.accounting.Credit(peer, chunkPrice)
if err != nil {
return err
}
s.metrics.ChunkPrice.Observe(float64(chunkPrice))
_ = s.accounting.Credit(peer, chunkPrice)

return nil
return true, nil
}

func (s *Service) farthestPeer(addr swarm.Address) (swarm.Address, error) {
func (s *Service) farthestPeer(addr swarm.Address, skipPeers []swarm.Address) (swarm.Address, error) {
farthest := swarm.Address{}
err := s.peerSuggester.EachPeerRev(func(peer swarm.Address, po uint8) (bool, bool, error) {
for _, a := range skipPeers {
if a.Equal(peer) {
return false, false, nil
}
}

if farthest.IsZero() {
farthest = peer
return false, false, nil
Expand Down

0 comments on commit d77bf0d

Please sign in to comment.