Skip to content

Commit

Permalink
chore: optimize peer task success logic
Browse files Browse the repository at this point in the history
Signed-off-by: Jim Ma <majinjing3@gmail.com>
  • Loading branch information
jim3ma committed Aug 30, 2021
1 parent 45e92e8 commit 57e9f83
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 32 deletions.
50 changes: 26 additions & 24 deletions client/daemon/peer/peertask_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,16 @@ type peerTask struct {

// done channel will be close when peer task is finished
done chan struct{}
// peerTaskDone will be true after peer task done
peerTaskDone bool
// success will be true after peer task done
success bool
// span stands open telemetry trace span
span trace.Span

// same actions must be done only once, like close done channel and so on
once sync.Once

// failedPieceCh will hold all pieces which download failed,
// those pieces will be retry later
// those pieces will be retried later
failedPieceCh chan int32
// failedReason will be set when peer task failed
failedReason string
Expand Down Expand Up @@ -226,26 +226,28 @@ loop:
break loop
}
if err != nil {
if !pt.peerTaskDone {
pt.failedCode = dfcodes.UnknownError
if de, ok := err.(*dferrors.DfError); ok {
if de.Code == dfcodes.SchedNeedBackSource {
pt.needBackSource = true
close(pt.peerPacketReady)
return
}
pt.failedCode = de.Code
pt.failedReason = de.Message
pt.Errorf("receive peer packet failed: %s", pt.failedReason)
} else {
pt.Errorf("receive peer packet failed: %s", err)
}
if !firstSpanDone {
firstPeerSpan.RecordError(err)
// when success, context will be cancelled, check if pt.success is true
if pt.success {
return
}
pt.failedCode = dfcodes.UnknownError
if de, ok := err.(*dferrors.DfError); ok {
if de.Code == dfcodes.SchedNeedBackSource {
pt.needBackSource = true
close(pt.peerPacketReady)
return
}
break loop
pt.failedCode = de.Code
pt.failedReason = de.Message
pt.Errorf("receive peer packet failed: %s", pt.failedReason)
} else {
pt.Errorf("receive peer packet failed: %s", err)
}
pt.cancel()
if !firstSpanDone {
firstPeerSpan.RecordError(err)
}
break loop
}

logger.Debugf("receive peerPacket %v for peer %s", peerPacket, pt.peerID)
Expand Down Expand Up @@ -390,7 +392,7 @@ loop:
break loop
case <-pt.ctx.Done():
pt.Debugf("context done due to %s", pt.ctx.Err())
if !pt.peerTaskDone {
if !pt.success {
if pt.failedCode == failedCodeNotSet {
pt.failedReason = reasonContextCanceled
pt.failedCode = dfcodes.ClientContextCanceled
Expand Down Expand Up @@ -528,7 +530,7 @@ func (pt *peerTask) waitAvailablePeerPacket() (int32, bool) {
pt.Infof("peer task done, stop wait available peer packet")
case <-pt.ctx.Done():
pt.Debugf("context done due to %s", pt.ctx.Err())
if !pt.peerTaskDone {
if !pt.success {
if pt.failedCode == failedCodeNotSet {
pt.failedReason = reasonContextCanceled
pt.failedCode = dfcodes.ClientContextCanceled
Expand Down Expand Up @@ -582,7 +584,7 @@ func (pt *peerTask) dispatchPieceRequest(pieceRequestCh chan *DownloadPieceReque
pt.Warnf("peer task done, but still some piece request not process")
case <-pt.ctx.Done():
pt.Warnf("context done due to %s", pt.ctx.Err())
if !pt.peerTaskDone {
if !pt.success {
if pt.failedCode == failedCodeNotSet {
pt.failedReason = reasonContextCanceled
pt.failedCode = dfcodes.ClientContextCanceled
Expand All @@ -602,7 +604,7 @@ func (pt *peerTask) waitFailedPiece() (int32, bool) {
pt.Infof("peer task done, stop wait failed piece")
return -1, false
case <-pt.ctx.Done():
if !pt.peerTaskDone {
if !pt.success {
if pt.failedCode == failedCodeNotSet {
pt.failedReason = reasonContextCanceled
pt.failedCode = dfcodes.ClientContextCanceled
Expand Down
17 changes: 10 additions & 7 deletions client/daemon/peer/peertask_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,10 @@ func (pt *filePeerTask) finish() error {
pt.Debugf("finish end piece result sent")

var (
success = true
code = dfcodes.Success
message = "Success"
success = true
code = dfcodes.Success
message = "Success"
progressDone bool
)

// callback to store data to output
Expand All @@ -322,7 +323,7 @@ func (pt *filePeerTask) finish() error {
CompletedLength: pt.completedLength.Load(),
PeerTaskDone: true,
DoneCallback: func() {
pt.peerTaskDone = true
progressDone = true
close(pt.progressStopCh)
},
}
Expand All @@ -341,13 +342,14 @@ func (pt *filePeerTask) finish() error {
case <-pt.progressStopCh:
pt.Infof("progress stopped")
case <-pt.ctx.Done():
if pt.peerTaskDone {
if progressDone {
pt.Debugf("progress stopped and context done")
} else {
pt.Warnf("wait progress stopped failed, context done, but progress not stopped")
}
}
pt.Debugf("finished: close channel")
pt.success = true
close(pt.done)
pt.span.SetAttributes(config.AttributePeerTaskSuccess.Bool(true))
pt.span.End()
Expand All @@ -365,6 +367,7 @@ func (pt *filePeerTask) cleanUnfinished() {
scheduler.NewEndPieceResult(pt.taskID, pt.peerID, pt.readyPieces.Settled()))
pt.Debugf("clean up end piece result sent")

var progressDone bool
pg := &FilePeerTaskProgress{
State: &ProgressState{
Success: false,
Expand All @@ -377,7 +380,7 @@ func (pt *filePeerTask) cleanUnfinished() {
CompletedLength: pt.completedLength.Load(),
PeerTaskDone: true,
DoneCallback: func() {
pt.peerTaskDone = true
progressDone = true
close(pt.progressStopCh)
},
}
Expand All @@ -396,7 +399,7 @@ func (pt *filePeerTask) cleanUnfinished() {
case <-pt.progressStopCh:
pt.Infof("progress stopped")
case <-pt.ctx.Done():
if pt.peerTaskDone {
if progressDone {
pt.Debugf("progress stopped and context done")
} else {
pt.Warnf("wait progress stopped failed, context done, but progress not stopped")
Expand Down
2 changes: 1 addition & 1 deletion client/daemon/peer/peertask_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,9 +371,9 @@ func (s *streamPeerTask) Start(ctx context.Context) (io.Reader, map[string]strin
func (s *streamPeerTask) finish() error {
// send last progress
s.once.Do(func() {
s.success = true
// let stream return immediately
close(s.streamDone)
s.peerTaskDone = true
// send EOF piece result to scheduler
_ = s.peerPacketStream.Send(
scheduler.NewEndPieceResult(s.taskID, s.peerID, s.readyPieces.Settled()))
Expand Down

0 comments on commit 57e9f83

Please sign in to comment.