Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

Commit

Permalink
bugfix: notify p2p downloader to pull next piece after reporting
Browse files Browse the repository at this point in the history
Signed-off-by: lowzj <zj3142063@gmail.com>
  • Loading branch information
lowzj committed Apr 27, 2020
1 parent a8ade02 commit dd73993
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 17 deletions.
8 changes: 6 additions & 2 deletions dfget/core/downloader/p2p_downloader/client_stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ type ClientStreamWriter struct {
// The downloader will put the piece into this queue after it downloaded a piece successfully.
// And clientWriter will poll values from this queue constantly and write to disk.
clientQueue queue.Queue

notifyQueue queue.Queue

// finish indicates whether the task written is completed.
finish chan struct{}

Expand Down Expand Up @@ -68,11 +71,12 @@ type ClientStreamWriter struct {
}

// NewClientStreamWriter creates and initialize a ClientStreamWriter instance.
func NewClientStreamWriter(clientQueue queue.Queue, api api.SupernodeAPI, cfg *config.Config) *ClientStreamWriter {
func NewClientStreamWriter(clientQueue, notifyQueue queue.Queue, api api.SupernodeAPI, cfg *config.Config) *ClientStreamWriter {
pr, pw := io.Pipe()
limitReader := limitreader.NewLimitReader(pr, int64(cfg.LocalLimit), cfg.Md5 != "")
clientWriter := &ClientStreamWriter{
clientQueue: clientQueue,
notifyQueue: notifyQueue,
pipeReader: pr,
pipeWriter: pw,
limitReader: limitReader,
Expand Down Expand Up @@ -139,7 +143,7 @@ func (csw *ClientStreamWriter) write(piece *Piece) error {

err := csw.writePieceToPipe(piece)
if err == nil {
go sendSuccessPiece(csw.api, csw.cfg.RV.Cid, piece, time.Since(startTime))
go sendSuccessPiece(csw.api, csw.cfg.RV.Cid, piece, time.Since(startTime), csw.notifyQueue)
}
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (s *ClientStreamWriterTestSuite) TestWrite(c *check.C) {
copy(cases2, cases)

cfg := &config.Config{}
csw := NewClientStreamWriter(nil, nil, cfg)
csw := NewClientStreamWriter(nil, nil, nil, cfg)
go func() {
for _, v := range cases2 {
err := csw.writePieceToPipe(v.piece)
Expand Down
13 changes: 10 additions & 3 deletions dfget/core/downloader/p2p_downloader/client_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type ClientWriter struct {
// The downloader will put the piece into this queue after it downloaded a piece successfully.
// And clientWriter will poll values from this queue constantly and write to disk.
clientQueue queue.Queue
notifyQueue queue.Queue

// finish indicates whether the task written is completed.
finish chan struct{}

Expand Down Expand Up @@ -95,9 +97,11 @@ type ClientWriter struct {

// NewClientWriter creates and initialize a ClientWriter instance.
func NewClientWriter(clientFilePath, serviceFilePath string,
clientQueue queue.Queue, api api.SupernodeAPI, cfg *config.Config, cdnSource apiTypes.CdnSource) PieceWriter {
clientQueue, notifyQueue queue.Queue,
api api.SupernodeAPI, cfg *config.Config, cdnSource apiTypes.CdnSource) PieceWriter {
clientWriter := &ClientWriter{
clientQueue: clientQueue,
notifyQueue: notifyQueue,
clientFilePath: clientFilePath,
serviceFilePath: serviceFilePath,
api: api,
Expand Down Expand Up @@ -219,7 +223,7 @@ func (cw *ClientWriter) write(piece *Piece) error {
cw.pieceIndex++
err := writePieceToFile(piece, cw.serviceFile, cw.cdnSource)
if err == nil {
go sendSuccessPiece(cw.api, cw.cfg.RV.Cid, piece, time.Since(startTime))
go sendSuccessPiece(cw.api, cw.cfg.RV.Cid, piece, time.Since(startTime), cw.notifyQueue)
}
return err
}
Expand Down Expand Up @@ -247,7 +251,7 @@ func startSyncWriter(q queue.Queue) queue.Queue {
return nil
}

func sendSuccessPiece(api api.SupernodeAPI, cid string, piece *Piece, cost time.Duration) {
func sendSuccessPiece(api api.SupernodeAPI, cid string, piece *Piece, cost time.Duration, notifyQueue queue.Queue) {
reportPieceRequest := &types.ReportPieceRequest{
TaskID: piece.TaskID,
Cid: cid,
Expand All @@ -265,6 +269,9 @@ func sendSuccessPiece(api api.SupernodeAPI, cid string, piece *Piece, cost time.

_, err := api.ReportPiece(piece.SuperNode, reportPieceRequest)
if err == nil {
if notifyQueue != nil {
notifyQueue.Put("success")
}
if retry > 0 {
logrus.Warnf("success to report piece with request(%+v) after retrying (%d) times", reportPieceRequest, retry)
}
Expand Down
36 changes: 25 additions & 11 deletions dfget/core/downloader/p2p_downloader/p2p_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ type P2PDownloader struct {
// And clientWriter will poll values from this queue constantly and write to disk.
clientQueue queue.Queue

// notifyQueue maintains a queue for notifying p2p downloader to pull next download tasks.
notifyQueue queue.Queue

// clientFilePath is the full path of the temp file.
clientFilePath string
// serviceFilePath is the full path of the temp service file which
Expand Down Expand Up @@ -150,6 +153,7 @@ func (p2p *P2PDownloader) init() {
p2p.queue.Put(NewPieceSimple(p2p.taskID, p2p.node, constants.TaskStatusStart, p2p.RegisterResult.CDNSource))

p2p.clientQueue = queue.NewQueue(p2p.cfg.ClientQueueSize)
p2p.notifyQueue = queue.NewQueue(p2p.cfg.ClientQueueSize)

p2p.clientFilePath = helper.GetTaskFile(p2p.taskFileName, p2p.cfg.RV.DataDir)
p2p.serviceFilePath = helper.GetServiceFile(p2p.taskFileName, p2p.cfg.RV.DataDir)
Expand All @@ -165,7 +169,9 @@ func (p2p *P2PDownloader) Run(ctx context.Context) error {
if p2p.streamMode {
return fmt.Errorf("streamMode enabled, should be disable")
}
clientWriter := NewClientWriter(p2p.clientFilePath, p2p.serviceFilePath, p2p.clientQueue, p2p.API, p2p.cfg, p2p.RegisterResult.CDNSource)
clientWriter := NewClientWriter(p2p.clientFilePath, p2p.serviceFilePath,
p2p.clientQueue, p2p.notifyQueue,
p2p.API, p2p.cfg, p2p.RegisterResult.CDNSource)
return p2p.run(ctx, clientWriter)
}

Expand All @@ -174,7 +180,7 @@ func (p2p *P2PDownloader) RunStream(ctx context.Context) (io.Reader, error) {
if !p2p.streamMode {
return nil, fmt.Errorf("streamMode disable, should be enabled")
}
clientStreamWriter := NewClientStreamWriter(p2p.clientQueue, p2p.API, p2p.cfg)
clientStreamWriter := NewClientStreamWriter(p2p.clientQueue, p2p.notifyQueue, p2p.API, p2p.cfg)
go func() {
err := p2p.run(ctx, clientStreamWriter)
if err != nil {
Expand Down Expand Up @@ -280,15 +286,9 @@ func (p2p *P2PDownloader) pullPieceTask(item *Piece) (
break
}

sleepTime := time.Duration(rand.Intn(p2p.maxTimeout-p2p.minTimeout)+p2p.minTimeout) * time.Millisecond
logrus.Infof("pull piece task(%+v) result:%s and sleep %.3fs", item, res, sleepTime.Seconds())
time.Sleep(sleepTime)

// gradually increase the sleep time, up to [800-1600]
if p2p.minTimeout < 800 {
p2p.minTimeout *= 2
p2p.maxTimeout *= 2
}
actual, expected := p2p.sleepInterval()
logrus.Infof("pull piece task(%+v) result:%s and sleep actual:%.3fs expected:%.3fs",
item, res, actual.Seconds(), expected.Seconds())
}

// FIXME: try to abstract the judgement to make it more readable.
Expand All @@ -314,6 +314,20 @@ func (p2p *P2PDownloader) pullPieceTask(item *Piece) (
return p2p.pullPieceTask(item)
}

func (p2p *P2PDownloader) sleepInterval() (actual, expected time.Duration) {
expected = time.Duration(rand.Intn(p2p.maxTimeout-p2p.minTimeout)+p2p.minTimeout) * time.Millisecond
start := time.Now()
p2p.notifyQueue.PollTimeout(expected)
actual = time.Now().Sub(start)

// gradually increase the sleep time, up to [800-1600]
if p2p.minTimeout < 800 {
p2p.minTimeout *= 2
p2p.maxTimeout *= 2
}
return actual, expected
}

// getPullRate gets download rate limit dynamically.
func (p2p *P2PDownloader) getPullRate(data *types.PullPieceTaskResponseContinueData) {
if time.Since(p2p.pullRateTime).Seconds() < 3 {
Expand Down

0 comments on commit dd73993

Please sign in to comment.