Skip to content

Commit

Permalink
fix: seed peer did not send done seed result and no content length se…
Browse files Browse the repository at this point in the history
…nd (#1316)

fix: seed peer did not send done seed result
fix: unknown length back source seed did not send content length to other peers

Signed-off-by: Jim Ma <majinjing3@gmail.com>
  • Loading branch information
jim3ma committed May 20, 2022
1 parent 4875d68 commit 53e2a26
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 108 deletions.
33 changes: 19 additions & 14 deletions client/daemon/peer/peertask_conductor.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ func (pt *peerTaskConductor) backSource() {
}

ctx, span := tracer.Start(pt.ctx, config.SpanBackSource)
pt.contentLength.Store(-1)
pt.SetContentLength(-1)
err := pt.pieceManager.DownloadSource(ctx, pt, pt.request)
if err != nil {
pt.Errorf("download from source error: %s", err)
Expand Down Expand Up @@ -535,8 +535,8 @@ func (pt *peerTaskConductor) pullPiecesWithP2P() {
}

func (pt *peerTaskConductor) storeTinyPeerTask() {
l := int64(len(pt.tinyData.Content))
pt.SetContentLength(l)
contentLength := int64(len(pt.tinyData.Content))
pt.SetContentLength(contentLength)
pt.SetTotalPieces(1)
ctx := pt.ctx
var err error
Expand All @@ -547,7 +547,7 @@ func (pt *peerTaskConductor) storeTinyPeerTask() {
TaskID: pt.tinyData.TaskID,
},
DesiredLocation: "",
ContentLength: l,
ContentLength: contentLength,
TotalPieces: 1,
// TODO check digest
})
Expand All @@ -569,23 +569,23 @@ func (pt *peerTaskConductor) storeTinyPeerTask() {
Offset: 0,
Range: clientutil.Range{
Start: 0,
Length: l,
Length: contentLength,
},
Style: 0,
},
UnknownLength: false,
Reader: bytes.NewBuffer(pt.tinyData.Content),
GenPieceDigest: func(n int64) (int32, bool) {
return 1, true
GenMetadata: func(n int64) (int32, int64, bool) {
return 1, contentLength, true
},
})
if err != nil {
pt.Errorf("write tiny data storage failed: %s", err)
pt.cancel(base.Code_ClientError, err.Error())
return
}
if n != l {
pt.Errorf("write tiny data storage failed, want: %d, wrote: %d", l, n)
if n != contentLength {
pt.Errorf("write tiny data storage failed, want: %d, wrote: %d", contentLength, n)
pt.cancel(base.Code_ClientError, err.Error())
return
}
Expand All @@ -597,8 +597,8 @@ func (pt *peerTaskConductor) storeTinyPeerTask() {
return
}

pt.Debugf("store tiny data, len: %d", l)
pt.PublishPieceInfo(0, uint32(l))
pt.Debugf("store tiny data, len: %d", contentLength)
pt.PublishPieceInfo(0, uint32(contentLength))
}

func (pt *peerTaskConductor) receivePeerPacket(pieceRequestCh chan *DownloadPieceRequest) {
Expand Down Expand Up @@ -800,7 +800,7 @@ func (pt *peerTaskConductor) pullSinglePiece() {
ctx, span := tracer.Start(pt.ctx, fmt.Sprintf(config.SpanDownloadPiece, pt.singlePiece.PieceInfo.PieceNum))
span.SetAttributes(config.AttributePiece.Int(int(pt.singlePiece.PieceInfo.PieceNum)))

pt.contentLength.Store(int64(pt.singlePiece.PieceInfo.RangeSize))
pt.SetContentLength(int64(pt.singlePiece.PieceInfo.RangeSize))
pt.SetTotalPieces(1)
pt.SetPieceMd5Sign(digestutils.Sha256(pt.singlePiece.PieceInfo.PieceMd5))

Expand Down Expand Up @@ -1231,7 +1231,12 @@ func (pt *peerTaskConductor) waitLimit(ctx context.Context, request *DownloadPie
}

func (pt *peerTaskConductor) isCompleted() bool {
return pt.completedLength.Load() == pt.contentLength.Load()
if pt.completedLength.Load() == pt.GetContentLength() {
pt.Infof("completed content length: %d", pt.completedLength.Load())
return true
}

return false
}

// for legacy peers only
Expand Down Expand Up @@ -1468,7 +1473,7 @@ func (pt *peerTaskConductor) done() {
// send EOF piece result to scheduler
err := pt.sendPieceResult(
schedulerclient.NewEndOfPiece(pt.taskID, pt.peerID, pt.readyPieces.Settled()))
pt.Debugf("end piece result sent: %v, peer task finished", err)
pt.Debugf("peer task finished, end piece result sent result: %v", err)

err = pt.peerPacketStream.CloseSend()
pt.Debugf("close stream result: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion client/daemon/peer/peertask_piecetask_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (poller *pieceTaskPoller) getPieceTasksByPeer(
return piecePacket, false, nil
}
// need update metadata
if piecePacket.ContentLength > ptc.contentLength.Load() || piecePacket.TotalPiece > ptc.GetTotalPieces() {
if piecePacket.ContentLength > ptc.GetContentLength() || piecePacket.TotalPiece > ptc.GetTotalPieces() {
return piecePacket, false, nil
}
// invalid request num
Expand Down
96 changes: 39 additions & 57 deletions client/daemon/peer/piece_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ func (pm *pieceManager) DownloadPiece(ctx context.Context, request *DownloadPiec
}

func (pm *pieceManager) processPieceFromSource(pt Task,
reader io.Reader, contentLength int64, pieceNum int32, pieceOffset uint64, pieceSize uint32, isLastPiece func(n int64) (int32, bool)) (
reader io.Reader, contentLength int64, pieceNum int32, pieceOffset uint64, pieceSize uint32,
isLastPiece func(n int64) (totalPieces int32, contentLength int64, ok bool)) (
result *DownloadPieceResult, md5 string, err error) {
result = &DownloadPieceResult{
Size: -1,
Expand Down Expand Up @@ -225,8 +226,8 @@ func (pm *pieceManager) processPieceFromSource(pt Task,
Length: int64(pieceSize),
},
},
Reader: reader,
GenPieceDigest: isLastPiece,
Reader: reader,
GenMetadata: isLastPiece,
})

result.FinishTime = time.Now().UnixNano()
Expand Down Expand Up @@ -307,17 +308,18 @@ func (pm *pieceManager) DownloadSource(ctx context.Context, pt Task, request *sc
// 2. save to storage
// handle resource which content length is unknown
if contentLength < 0 {
return pm.downloadUnknownLengthSource(ctx, pt, pieceSize, reader)
return pm.downloadUnknownLengthSource(pt, pieceSize, reader)
}

return pm.downloadKnownLengthSource(ctx, pt, contentLength, pieceSize, reader)
return pm.downloadKnownLengthSource(pt, contentLength, pieceSize, reader)
}

func (pm *pieceManager) downloadKnownLengthSource(ctx context.Context, pt Task, contentLength int64, pieceSize uint32, reader io.Reader) error {
func (pm *pieceManager) downloadKnownLengthSource(pt Task, contentLength int64, pieceSize uint32, reader io.Reader) error {
log := pt.Log()
maxPieceNum := util.ComputePieceNum(contentLength, pieceSize)
pt.SetContentLength(contentLength)
pt.SetTotalPieces(maxPieceNum)

maxPieceNum := util.ComputePieceNum(contentLength, pieceSize)
for pieceNum := int32(0); pieceNum < maxPieceNum; pieceNum++ {
size := pieceSize
offset := uint64(pieceNum) * uint64(pieceSize)
Expand All @@ -329,8 +331,8 @@ func (pm *pieceManager) downloadKnownLengthSource(ctx context.Context, pt Task,
log.Debugf("download piece %d", pieceNum)
result, md5, err := pm.processPieceFromSource(
pt, reader, contentLength, pieceNum, offset, size,
func(int64) (int32, bool) {
return maxPieceNum, pieceNum == maxPieceNum-1
func(int64) (int32, int64, bool) {
return maxPieceNum, contentLength, pieceNum == maxPieceNum-1
})
request := &DownloadPieceRequest{
TaskID: pt.GetTaskID(),
Expand All @@ -356,24 +358,6 @@ func (pm *pieceManager) downloadKnownLengthSource(ctx context.Context, pt Task,
return storage.ErrShortRead
}

if pieceNum == maxPieceNum-1 {
// last piece
err = pt.GetStorage().UpdateTask(ctx,
&storage.UpdateTaskRequest{
PeerTaskMetadata: storage.PeerTaskMetadata{
PeerID: pt.GetPeerID(),
TaskID: pt.GetTaskID(),
},
ContentLength: contentLength,
TotalPieces: maxPieceNum,
})
pt.SetTotalPieces(maxPieceNum)
if err != nil {
log.Errorf("update task failed %s", err)
pt.ReportPieceResult(request, result, detectBackSourceError(err))
return err
}
}
pt.ReportPieceResult(request, result, nil)
pt.PublishPieceInfo(pieceNum, uint32(result.Size))
}
Expand All @@ -382,25 +366,32 @@ func (pm *pieceManager) downloadKnownLengthSource(ctx context.Context, pt Task,
return nil
}

func (pm *pieceManager) downloadUnknownLengthSource(ctx context.Context, pt Task, pieceSize uint32, reader io.Reader) error {
var contentLength int64 = -1
func (pm *pieceManager) downloadUnknownLengthSource(pt Task, pieceSize uint32, reader io.Reader) error {
var (
contentLength int64 = -1
totalPieces int32 = -1
)
log := pt.Log()
for pieceNum := int32(0); ; pieceNum++ {
size := pieceSize
offset := uint64(pieceNum) * uint64(pieceSize)
log.Debugf("download piece %d", pieceNum)
result, md5, err := pm.processPieceFromSource(
pt, reader, contentLength, pieceNum, offset, size,
func(n int64) (int32, bool) {
func(n int64) (int32, int64, bool) {
if n >= int64(pieceSize) {
return -1, false
return -1, -1, false
}
// content length is aligned at pieceSize
// when n == 0, need ignore current piece

// last piece, piece size maybe 0
contentLength = int64(pieceSize)*int64(pieceNum) + n
// when n == 0, content length is aligned at piece size, need ignore current piece
if n == 0 {
return pieceNum, true
totalPieces = pieceNum
} else {
totalPieces = pieceNum + 1
}
return pieceNum + 1, true
return totalPieces, contentLength, true
})
request := &DownloadPieceRequest{
TaskID: pt.GetTaskID(),
Expand All @@ -419,9 +410,11 @@ func (pm *pieceManager) downloadUnknownLengthSource(ctx context.Context, pt Task
log.Errorf("download piece %d error: %s", pieceNum, err)
return err
}

if result.Size == int64(size) {
pt.ReportPieceResult(request, result, nil)
pt.PublishPieceInfo(pieceNum, uint32(result.Size))
log.Debugf("piece %d downloaded, size: %d", pieceNum, result.Size)
continue
} else if result.Size > int64(size) {
err = fmt.Errorf("piece %d size %d should not great than %d", pieceNum, result.Size, size)
Expand All @@ -430,33 +423,22 @@ func (pm *pieceManager) downloadUnknownLengthSource(ctx context.Context, pt Task
return err
}

// last piece, piece size maybe 0
contentLength = int64(pieceSize)*int64(pieceNum) + result.Size
pt.SetTotalPieces(util.ComputePieceNum(contentLength, pieceSize))
err = pt.GetStorage().UpdateTask(ctx,
&storage.UpdateTaskRequest{
PeerTaskMetadata: storage.PeerTaskMetadata{
PeerID: pt.GetPeerID(),
TaskID: pt.GetTaskID(),
},
ContentLength: contentLength,
TotalPieces: pt.GetTotalPieces(),
})
if err != nil {
log.Errorf("update task failed %s", err)
pt.ReportPieceResult(request, result, detectBackSourceError(err))
return err
}
// content length is aligning at piece size
if result.Size == 0 {
pt.SetTotalPieces(totalPieces)
pt.SetContentLength(contentLength)
log.Debugf("final piece is %d", pieceNum-1)
break
}

pt.SetTotalPieces(totalPieces)
pt.SetContentLength(contentLength)
pt.ReportPieceResult(request, result, nil)
pt.PublishPieceInfo(pieceNum, uint32(result.Size))
log.Debugf("final piece %d downloaded, size: %d", pieceNum, result.Size)
break
}

pt.SetContentLength(contentLength)
log.Infof("download from source ok")
return nil
}
Expand All @@ -471,7 +453,7 @@ func detectBackSourceError(err error) error {

func (pm *pieceManager) processPieceFromFile(ctx context.Context, ptm storage.PeerTaskMetadata,
tsd storage.TaskStorageDriver, r io.Reader, pieceNum int32, pieceOffset uint64,
pieceSize uint32, isLastPiece func(n int64) (int32, bool)) (int64, error) {
pieceSize uint32, isLastPiece func(n int64) (int32, int64, bool)) (int64, error) {
var (
n int64
reader = r
Expand All @@ -496,8 +478,8 @@ func (pm *pieceManager) processPieceFromFile(ctx context.Context, ptm storage.Pe
Length: int64(pieceSize),
},
},
Reader: reader,
GenPieceDigest: isLastPiece,
Reader: reader,
GenMetadata: isLastPiece,
})
if err != nil {
msg := fmt.Sprintf("put piece of task %s to storage failed, piece num: %d, wrote: %d, error: %s", ptm.TaskID, pieceNum, n, err)
Expand Down Expand Up @@ -531,7 +513,7 @@ func (pm *pieceManager) ImportFile(ctx context.Context, ptm storage.PeerTaskMeta
for pieceNum := int32(0); pieceNum < maxPieceNum; pieceNum++ {
size := pieceSize
offset := uint64(pieceNum) * uint64(pieceSize)
isLastPiece := func(int64) (int32, bool) { return maxPieceNum, pieceNum == maxPieceNum-1 }
isLastPiece := func(int64) (int32, int64, bool) { return maxPieceNum, contentLength, pieceNum == maxPieceNum-1 }
// calculate piece size for last piece
if contentLength > 0 && int64(offset)+int64(size) > contentLength {
size = uint32(contentLength - int64(offset))
Expand Down
4 changes: 2 additions & 2 deletions client/daemon/rpcserver/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ func (s *server) GetPieceTasks(ctx context.Context, request *base.PieceTaskReque
}, nil
}

logger.Debugf("receive get piece tasks request, task id: %s, src peer: %s, dst peer: %s, piece num: %d, limit: %d, length: %d",
request.TaskId, request.SrcPid, request.DstPid, request.StartNum, request.Limit, len(p.PieceInfos))
logger.Debugf("receive get piece tasks request, task id: %s, src peer: %s, dst peer: %s, piece start num: %d, limit: %d, count: %d, total content length: %d",
request.TaskId, request.SrcPid, request.DstPid, request.StartNum, request.Limit, len(p.PieceInfos), p.ContentLength)
p.DstAddr = s.uploadAddr
return p, nil
}
Expand Down
26 changes: 15 additions & 11 deletions client/daemon/rpcserver/seeder.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,18 @@ func (s *seedSynchronizer) sendRemindingPieceSeeds(desired int32) error {
s.attributeSent = true
}

// we must send done to scheduler
if len(pp.PieceInfos) == 0 {
ps := s.compositePieceSeed(pp, nil)
ps.Done, ps.EndTime = true, uint64(time.Now().UnixNano())
s.Infof("seed tasks start time: %d, end time: %d, cost: %dms", ps.BeginTime, ps.EndTime, (ps.EndTime-ps.BeginTime)/1000000)
err = s.seedsServer.Send(&ps)
if err != nil {
s.Errorf("send reminding piece seeds error: %s", err.Error())
return err
}
}

for _, p := range pp.PieceInfos {
if p.PieceNum != desired {
s.Errorf("desired piece %d, not found", desired)
Expand Down Expand Up @@ -267,17 +279,9 @@ func (s *seedSynchronizer) sendOrderedPieceSeeds(desired, orderedNum int32, fini

func (s *seedSynchronizer) compositePieceSeed(pp *base.PiecePacket, piece *base.PieceInfo) cdnsystem.PieceSeed {
return cdnsystem.PieceSeed{
PeerId: s.seedTaskRequest.PeerId,
HostId: s.seedTaskRequest.PeerHost.Id,
PieceInfo: &base.PieceInfo{
PieceNum: piece.PieceNum,
RangeStart: piece.RangeStart,
RangeSize: piece.RangeSize,
PieceMd5: piece.PieceMd5,
PieceOffset: piece.PieceOffset,
PieceStyle: piece.PieceStyle,
DownloadCost: piece.DownloadCost,
},
PeerId: s.seedTaskRequest.PeerId,
HostId: s.seedTaskRequest.PeerHost.Id,
PieceInfo: piece,
ContentLength: pp.ContentLength,
TotalPieceCount: pp.TotalPiece,
BeginTime: uint64(s.startNanoSecond),
Expand Down

0 comments on commit 53e2a26

Please sign in to comment.