Skip to content

Commit

Permalink
Optimize backsource logic and report peer result (#589)
Browse files Browse the repository at this point in the history
* chore: optimize backsource logic and report peer result

Signed-off-by: Jim Ma <majinjing3@gmail.com>

* chore: optimize peer task success logic

Signed-off-by: Jim Ma <majinjing3@gmail.com>

* chore: add back source trace

Signed-off-by: Jim Ma <majinjing3@gmail.com>
  • Loading branch information
jim3ma committed Aug 30, 2021
1 parent 1d038fa commit c955317
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 42 deletions.
80 changes: 54 additions & 26 deletions client/daemon/peer/peertask_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,16 @@ type peerTask struct {

// done channel will be close when peer task is finished
done chan struct{}
// peerTaskDone will be true after peer task done
peerTaskDone bool
// success will be true after peer task done
success bool
// span stands open telemetry trace span
span trace.Span

// same actions must be done only once, like close done channel and so on
once sync.Once

// failedPieceCh will hold all pieces which download failed,
// those pieces will be retry later
// those pieces will be retried later
failedPieceCh chan int32
// failedReason will be set when peer task failed
failedReason string
Expand Down Expand Up @@ -226,6 +226,10 @@ loop:
break loop
}
if err != nil {
// when success, context will be cancelled, check if pt.success is true
if pt.success {
return
}
pt.failedCode = dfcodes.UnknownError
if de, ok := err.(*dferrors.DfError); ok {
if de.Code == dfcodes.SchedNeedBackSource {
Expand Down Expand Up @@ -388,7 +392,7 @@ loop:
break loop
case <-pt.ctx.Done():
pt.Debugf("context done due to %s", pt.ctx.Err())
if !pt.peerTaskDone {
if !pt.success {
if pt.failedCode == failedCodeNotSet {
pt.failedReason = reasonContextCanceled
pt.failedCode = dfcodes.ClientContextCanceled
Expand Down Expand Up @@ -488,20 +492,38 @@ func (pt *peerTask) waitFirstPeerPacket() bool {
pt.failedReason = err.Error()
}
pt.span.AddEvent(fmt.Sprintf("pulling pieces end due to %s", err))
case <-pt.peerPacketReady:
// preparePieceTasksByPeer func already send piece result with error
pt.Infof("new peer client ready, scheduler time cost: %dus, main peer: %s",
time.Now().Sub(pt.callback.GetStartTime()).Microseconds(), pt.peerPacket.Load().(*scheduler.PeerPacket).MainPeer)
return true
case _, ok := <-pt.peerPacketReady:
if ok {
// preparePieceTasksByPeer func already send piece result with error
pt.Infof("new peer client ready, scheduler time cost: %dus, main peer: %s",
time.Now().Sub(pt.callback.GetStartTime()).Microseconds(), pt.peerPacket.Load().(*scheduler.PeerPacket).MainPeer)
return true
}
// when schedule timeout, receivePeerPacket will close pt.peerPacketReady
if pt.schedulerOption.DisableAutoBackSource {
pt.failedReason = reasonBackSourceDisabled
err := fmt.Errorf("%s, auto back source disabled", pt.failedReason)
pt.span.RecordError(err)
pt.Errorf(err.Error())
} else {
pt.Warnf("start download from source due to dfcodes.SchedNeedBackSource")
pt.span.AddEvent("back source due to scheduler says need back source")
pt.needBackSource = true
pt.backSource()
}
case <-time.After(pt.schedulerOption.ScheduleTimeout.Duration):
if pt.schedulerOption.DisableAutoBackSource {
pt.failedReason = reasonScheduleTimeout
pt.failedCode = dfcodes.ClientScheduleTimeout
logger.Errorf("%s, auto back source disabled", pt.failedReason)
err := fmt.Errorf("%s, auto back source disabled", pt.failedReason)
pt.span.RecordError(err)
pt.Errorf(err.Error())
} else {
pt.Warnf("start download from source due to %s", reasonScheduleTimeout)
pt.span.AddEvent("back source due to schedule timeout")
pt.needBackSource = true
pt.backSource()
}
pt.Errorf("start download from source due to %s", reasonScheduleTimeout)
pt.needBackSource = true
pt.backSource()
}
return false
}
Expand All @@ -511,10 +533,10 @@ func (pt *peerTask) waitAvailablePeerPacket() (int32, bool) {
select {
// when peer task without content length or total pieces count, match here
case <-pt.done:
pt.Infof("peer task done, stop get pieces from peer")
pt.Infof("peer task done, stop wait available peer packet")
case <-pt.ctx.Done():
pt.Debugf("context done due to %s", pt.ctx.Err())
if !pt.peerTaskDone {
if !pt.success {
if pt.failedCode == failedCodeNotSet {
pt.failedReason = reasonContextCanceled
pt.failedCode = dfcodes.ClientContextCanceled
Expand All @@ -529,23 +551,29 @@ func (pt *peerTask) waitAvailablePeerPacket() (int32, bool) {
}
// when schedule timeout, receivePeerPacket will close pt.peerPacketReady
if pt.schedulerOption.DisableAutoBackSource {
pt.failedReason = reasonReScheduleTimeout
pt.failedCode = dfcodes.ClientScheduleTimeout
logger.Errorf("%s, auto back source disabled", pt.failedReason)
pt.failedReason = reasonBackSourceDisabled
err := fmt.Errorf("%s, auto back source disabled", pt.failedReason)
pt.span.RecordError(err)
pt.Errorf(err.Error())
} else {
pt.Errorf("start download from source due to dfcodes.SchedNeedBackSource")
pt.Warnf("start download from source due to dfcodes.SchedNeedBackSource")
pt.span.AddEvent("back source due to scheduler says need back source ")
pt.needBackSource = true
pt.backSource()
}
case <-time.After(pt.schedulerOption.ScheduleTimeout.Duration):
if pt.schedulerOption.DisableAutoBackSource {
pt.failedReason = reasonReScheduleTimeout
pt.failedCode = dfcodes.ClientScheduleTimeout
logger.Errorf("%s, auto back source disabled", pt.failedReason)
err := fmt.Errorf("%s, auto back source disabled", pt.failedReason)
pt.span.RecordError(err)
pt.Errorf(err.Error())
} else {
pt.Warnf("start download from source due to %s", reasonReScheduleTimeout)
pt.span.AddEvent("back source due to schedule timeout")
pt.needBackSource = true
pt.backSource()
}
pt.Errorf("start download from source due to %s", reasonReScheduleTimeout)
pt.needBackSource = true
pt.backSource()
}
return -1, false
}
Expand All @@ -568,7 +596,7 @@ func (pt *peerTask) dispatchPieceRequest(pieceRequestCh chan *DownloadPieceReque
pt.Warnf("peer task done, but still some piece request not process")
case <-pt.ctx.Done():
pt.Warnf("context done due to %s", pt.ctx.Err())
if !pt.peerTaskDone {
if !pt.success {
if pt.failedCode == failedCodeNotSet {
pt.failedReason = reasonContextCanceled
pt.failedCode = dfcodes.ClientContextCanceled
Expand All @@ -585,10 +613,10 @@ func (pt *peerTask) waitFailedPiece() (int32, bool) {
// use no default branch select to wait failed piece or exit
select {
case <-pt.done:
pt.Infof("peer task done, stop get pieces from peer")
pt.Infof("peer task done, stop wait failed piece")
return -1, false
case <-pt.ctx.Done():
if !pt.peerTaskDone {
if !pt.success {
if pt.failedCode == failedCodeNotSet {
pt.failedReason = reasonContextCanceled
pt.failedCode = dfcodes.ClientContextCanceled
Expand Down
17 changes: 10 additions & 7 deletions client/daemon/peer/peertask_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,10 @@ func (pt *filePeerTask) finish() error {
pt.Debugf("finish end piece result sent")

var (
success = true
code = dfcodes.Success
message = "Success"
success = true
code = dfcodes.Success
message = "Success"
progressDone bool
)

// callback to store data to output
Expand All @@ -322,7 +323,7 @@ func (pt *filePeerTask) finish() error {
CompletedLength: pt.completedLength.Load(),
PeerTaskDone: true,
DoneCallback: func() {
pt.peerTaskDone = true
progressDone = true
close(pt.progressStopCh)
},
}
Expand All @@ -341,13 +342,14 @@ func (pt *filePeerTask) finish() error {
case <-pt.progressStopCh:
pt.Infof("progress stopped")
case <-pt.ctx.Done():
if pt.peerTaskDone {
if progressDone {
pt.Debugf("progress stopped and context done")
} else {
pt.Warnf("wait progress stopped failed, context done, but progress not stopped")
}
}
pt.Debugf("finished: close channel")
pt.success = true
close(pt.done)
pt.span.SetAttributes(config.AttributePeerTaskSuccess.Bool(true))
pt.span.End()
Expand All @@ -365,6 +367,7 @@ func (pt *filePeerTask) cleanUnfinished() {
scheduler.NewEndPieceResult(pt.taskID, pt.peerID, pt.readyPieces.Settled()))
pt.Debugf("clean up end piece result sent")

var progressDone bool
pg := &FilePeerTaskProgress{
State: &ProgressState{
Success: false,
Expand All @@ -377,7 +380,7 @@ func (pt *filePeerTask) cleanUnfinished() {
CompletedLength: pt.completedLength.Load(),
PeerTaskDone: true,
DoneCallback: func() {
pt.peerTaskDone = true
progressDone = true
close(pt.progressStopCh)
},
}
Expand All @@ -396,7 +399,7 @@ func (pt *filePeerTask) cleanUnfinished() {
case <-pt.progressStopCh:
pt.Infof("progress stopped")
case <-pt.ctx.Done():
if pt.peerTaskDone {
if progressDone {
pt.Debugf("progress stopped and context done")
} else {
pt.Warnf("wait progress stopped failed, context done, but progress not stopped")
Expand Down
5 changes: 3 additions & 2 deletions client/daemon/peer/peertask_file_callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package peer

import (
"context"
"time"

"d7y.io/dragonfly/v2/client/daemon/storage"
Expand Down Expand Up @@ -91,7 +92,7 @@ func (p *filePeerTaskCallback) Done(pt Task) error {
return e
}
p.ptm.PeerTaskDone(p.req.PeerId)
err := p.pt.schedulerClient.ReportPeerResult(p.pt.ctx, &scheduler.PeerResult{
err := p.pt.schedulerClient.ReportPeerResult(context.Background(), &scheduler.PeerResult{
TaskId: pt.GetTaskID(),
PeerId: pt.GetPeerID(),
SrcIp: p.ptm.host.Ip,
Expand All @@ -116,7 +117,7 @@ func (p *filePeerTaskCallback) Fail(pt Task, code base.Code, reason string) erro
p.ptm.PeerTaskDone(p.req.PeerId)
var end = time.Now()
pt.Log().Errorf("file peer task failed, code: %d, reason: %s", code, reason)
err := p.pt.schedulerClient.ReportPeerResult(p.pt.ctx, &scheduler.PeerResult{
err := p.pt.schedulerClient.ReportPeerResult(context.Background(), &scheduler.PeerResult{
TaskId: pt.GetTaskID(),
PeerId: pt.GetPeerID(),
SrcIp: p.ptm.host.Ip,
Expand Down
15 changes: 10 additions & 5 deletions client/daemon/peer/peertask_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type StreamPeerTask interface {

type streamPeerTask struct {
peerTask
streamDone chan struct{}
successPieceCh chan int32
}

Expand Down Expand Up @@ -149,6 +150,8 @@ func newStreamPeerTask(ctx context.Context,
limiter = rate.NewLimiter(perPeerRateLimit, int(perPeerRateLimit))
}
pt := &streamPeerTask{
successPieceCh: make(chan int32),
streamDone: make(chan struct{}),
peerTask: peerTask{
ctx: ctx,
host: host,
Expand Down Expand Up @@ -177,7 +180,6 @@ func newStreamPeerTask(ctx context.Context,
usedTraffic: atomic.NewInt64(0),
SugaredLoggerOnWith: logger.With("peer", request.PeerId, "task", result.TaskId, "component", "streamPeerTask"),
},
successPieceCh: make(chan int32),
}
// bind func that base peer task did not implement
pt.backSourceFunc = pt.backSource
Expand Down Expand Up @@ -334,9 +336,9 @@ func (s *streamPeerTask) Start(ctx context.Context) (io.Reader, map[string]strin
s.Errorf("CloseWithError failed: %s", err)
}
return
case <-s.done:
case <-s.streamDone:
for {
// all data is wrote to local storage, and all data is wrote to pipe write
// all data wrote to local storage, and all data wrote to pipe write
if s.readyPieces.Settled() == desired {
pw.Close()
return
Expand Down Expand Up @@ -369,17 +371,19 @@ func (s *streamPeerTask) Start(ctx context.Context) (io.Reader, map[string]strin
func (s *streamPeerTask) finish() error {
// send last progress
s.once.Do(func() {
s.success = true
// let stream return immediately
close(s.streamDone)
// send EOF piece result to scheduler
_ = s.peerPacketStream.Send(
scheduler.NewEndPieceResult(s.taskID, s.peerID, s.readyPieces.Settled()))
s.Debugf("end piece result sent, peer task finished")
close(s.done)
//close(s.successPieceCh)
if err := s.callback.Done(s); err != nil {
s.span.RecordError(err)
s.Errorf("done callback error: %s", err)
}
s.span.SetAttributes(config.AttributePeerTaskSuccess.Bool(true))
close(s.done)
})
return nil
}
Expand All @@ -391,6 +395,7 @@ func (s *streamPeerTask) cleanUnfinished() {
_ = s.peerPacketStream.Send(
scheduler.NewEndPieceResult(s.taskID, s.peerID, s.readyPieces.Settled()))
s.Errorf("end piece result sent, peer task failed")
close(s.streamDone)
close(s.done)
//close(s.successPieceCh)
if err := s.callback.Fail(s, s.failedCode, s.failedReason); err != nil {
Expand Down
5 changes: 3 additions & 2 deletions client/daemon/peer/peertask_stream_callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package peer

import (
"context"
"time"

"d7y.io/dragonfly/v2/client/daemon/storage"
Expand Down Expand Up @@ -89,7 +90,7 @@ func (p *streamPeerTaskCallback) Done(pt Task) error {
return e
}
p.ptm.PeerTaskDone(p.req.PeerId)
err := p.pt.schedulerClient.ReportPeerResult(p.pt.ctx, &scheduler.PeerResult{
err := p.pt.schedulerClient.ReportPeerResult(context.Background(), &scheduler.PeerResult{
TaskId: pt.GetTaskID(),
PeerId: pt.GetPeerID(),
SrcIp: p.ptm.host.Ip,
Expand All @@ -114,7 +115,7 @@ func (p *streamPeerTaskCallback) Fail(pt Task, code base.Code, reason string) er
p.ptm.PeerTaskDone(p.req.PeerId)
var end = time.Now()
pt.Log().Errorf("stream peer task failed, code: %d, reason: %s", code, reason)
err := p.pt.schedulerClient.ReportPeerResult(p.pt.ctx, &scheduler.PeerResult{
err := p.pt.schedulerClient.ReportPeerResult(context.Background(), &scheduler.PeerResult{
TaskId: pt.GetTaskID(),
PeerId: pt.GetPeerID(),
SrcIp: p.ptm.host.Ip,
Expand Down

0 comments on commit c955317

Please sign in to comment.