Skip to content

Commit

Permalink
fix(share/p2p/shrexeds): hotfix shrex contexts for bsr (#2192)
Browse files Browse the repository at this point in the history
Currently we are timing out in shrex for the full context length (2.5
minutes) because we fail to
1. Open streams
2. Read status from streams

This PR limits stream open to 5s, and reading the status from the stream
to 5s (via read Deadline config param). We also put peers on timeout
that return a context deadline. It is a bad fix and the correct way to
do this can be discussed once we can look deeper into the problems - but
for now, we need to fix the test network before it ends next week.

This fix does not appear to be enough to fully fix syncing stall/syncing
speed (the only solution that has done that so far is a new
`ResultDropPeer` in peerman), but it is simple and helps a bit.

Related: #2191
  • Loading branch information
distractedm1nd committed May 15, 2023
1 parent 955f184 commit e06aa7f
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 3 deletions.
4 changes: 4 additions & 0 deletions share/getters/shrex.go
Expand Up @@ -129,6 +129,7 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, root *share.Root) (*rsmt2d.Ex
)
for {
if ctx.Err() != nil {
sg.metrics.recordEDSAttempt(ctx, attempt, false)
return nil, ctx.Err()
}
attempt++
Expand All @@ -155,6 +156,7 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, root *share.Root) (*rsmt2d.Ex
return eds, nil
case errors.Is(getErr, context.DeadlineExceeded),
errors.Is(getErr, context.Canceled):
setStatus(peers.ResultCooldownPeer)
case errors.Is(getErr, p2p.ErrNotFound):
getErr = share.ErrNotFound
setStatus(peers.ResultCooldownPeer)
Expand Down Expand Up @@ -187,6 +189,7 @@ func (sg *ShrexGetter) GetSharesByNamespace(
)
for {
if ctx.Err() != nil {
sg.metrics.recordNDAttempt(ctx, attempt, false)
return nil, ctx.Err()
}
attempt++
Expand All @@ -213,6 +216,7 @@ func (sg *ShrexGetter) GetSharesByNamespace(
return nd, nil
case errors.Is(getErr, context.DeadlineExceeded),
errors.Is(getErr, context.Canceled):
setStatus(peers.ResultCooldownPeer)
case errors.Is(getErr, p2p.ErrNotFound):
getErr = share.ErrNotFound
setStatus(peers.ResultCooldownPeer)
Expand Down
14 changes: 11 additions & 3 deletions share/p2p/shrexeds/client.go
Expand Up @@ -83,7 +83,9 @@ func (c *Client) doRequest(
dataHash share.DataHash,
to peer.ID,
) (*rsmt2d.ExtendedDataSquare, error) {
stream, err := c.host.NewStream(ctx, to, c.protocolID)
streamOpenCtx, cancel := context.WithTimeout(ctx, c.params.ServerReadTimeout)
defer cancel()
stream, err := c.host.NewStream(streamOpenCtx, to, c.protocolID)
if err != nil {
return nil, fmt.Errorf("failed to open stream: %w", err)
}
Expand All @@ -106,11 +108,15 @@ func (c *Client) doRequest(

// read and parse status from peer
resp := new(pb.EDSResponse)
err = stream.SetReadDeadline(time.Now().Add(c.params.ServerReadTimeout))
if err != nil {
log.Debugw("client: failed to set read deadline for reading status", "err", err)
}
_, err = serde.Read(stream, resp)
if err != nil {
// server is overloaded and closed the stream
// server closes the stream after returning a non-successful status
if errors.Is(err, io.EOF) {
c.metrics.ObserveRequests(ctx, 1, p2p.StatusRateLimited)
c.metrics.ObserveRequests(ctx, 1, p2p.StatusNotFound)
return nil, p2p.ErrNotFound
}
stream.Reset() //nolint:errcheck
Expand All @@ -119,6 +125,8 @@ func (c *Client) doRequest(

switch resp.Status {
case pb.Status_OK:
// reset stream deadlines to original values, since read deadline was changed during status read
c.setStreamDeadlines(ctx, stream)
// use header and ODS bytes to construct EDS and verify it against dataHash
eds, err := eds.ReadEDS(ctx, stream, dataHash)
if err != nil {
Expand Down

0 comments on commit e06aa7f

Please sign in to comment.