Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

Commit

Permalink
bugfix: retry multi times if failed to report pieces
Browse files Browse the repository at this point in the history
Signed-off-by: Starnop <starnopg@gmail.com>
  • Loading branch information
starnop committed Jan 16, 2020
1 parent 72929df commit 5d248de
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 5 deletions.
4 changes: 3 additions & 1 deletion dfget/core/api/supernode_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/dragonflyoss/Dragonfly/dfget/types"
"github.com/dragonflyoss/Dragonfly/pkg/constants"
"github.com/dragonflyoss/Dragonfly/pkg/httputils"
"github.com/pkg/errors"

"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -114,10 +115,11 @@ func (api *supernodeAPI) ReportPiece(node string, req *types.ReportPieceRequest)
resp = new(types.BaseResponse)
if e = api.get(url, resp); e != nil {
logrus.Errorf("failed to report piece{taskid:%s,range:%s},err: %v", req.TaskID, req.PieceRange, e)
return nil, e
return nil, errors.Wrapf(e, "failed to report piece{taskid:%s,range:%s}", req.TaskID, req.PieceRange)
}
if resp.Code != constants.CodeGetPieceReport {
logrus.Errorf("failed to report piece{taskid:%s,range:%s} to supernode: api response code is %d not equal to %d", req.TaskID, req.PieceRange, resp.Code, constants.CodeGetPieceReport)
return nil, errors.Wrapf(e, "failed to report piece{taskid:%s,range:%s} to supernode: api response code is %d not equal to %d", req.TaskID, req.PieceRange, resp.Code, constants.CodeGetPieceReport)
}
return
}
Expand Down
4 changes: 2 additions & 2 deletions dfget/core/api/supernode_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,10 @@ func (s *SupernodeAPITestSuite) TestSupernodeAPI_ReportPiece(c *check.C) {
TaskID: "sssss",
PieceRange: "0-11",
}
s.mock.GetFunc = s.mock.CreateGetFunc(200, []byte(`{"Code":700}`), nil)
s.mock.GetFunc = s.mock.CreateGetFunc(200, []byte(`{"Code":611}`), nil)
r, e := s.api.ReportPiece(localhost, req)
c.Check(e, check.IsNil)
c.Check(r.Code, check.Equals, 700)
c.Check(r.Code, check.Equals, 611)
}

func (s *SupernodeAPITestSuite) TestSupernodeAPI_ServiceDown(c *check.C) {
Expand Down
28 changes: 26 additions & 2 deletions dfget/core/downloader/p2p_downloader/client_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bufio"
"context"
"io"
"math/rand"
"os"
"time"

Expand Down Expand Up @@ -202,12 +203,35 @@ func startSyncWriter(q queue.Queue) queue.Queue {
}

func (cw *ClientWriter) sendSuccessPiece(piece *Piece, cost time.Duration) {
cw.api.ReportPiece(piece.SuperNode, &types.ReportPieceRequest{
reportPieceRequest := &types.ReportPieceRequest{
TaskID: piece.TaskID,
Cid: cw.cfg.RV.Cid,
DstCid: piece.DstCid,
PieceRange: piece.Range,
})
}

var retry = 0
var maxRetryTime = 3
for {
if retry >= maxRetryTime {
logrus.Errorf("failed to report piece to supernode with request(%+v) even after retrying max retry time", reportPieceRequest)
break
}

_, err := cw.api.ReportPiece(piece.SuperNode, reportPieceRequest)
if err == nil {
if retry > 0 {
logrus.Warnf("success to report piece with request(%+v) after retrying (%d) times", reportPieceRequest, retry)
}
break
}

sleepTime := time.Duration(rand.Intn(500)+50) * time.Millisecond
logrus.Warnf("failed to report piece to supernode with request(%+v) for (%d) times and will retry after sleep %.3fs", reportPieceRequest, retry, sleepTime.Seconds())
time.Sleep(sleepTime)
retry++
}

if cost.Seconds() > 2.0 {
logrus.Infof(
"async writer and report suc from dst:%s... cost:%.3f for range:%s",
Expand Down

0 comments on commit 5d248de

Please sign in to comment.