Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize backsource logic and report peer result #589

Merged
merged 3 commits into from
Aug 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
80 changes: 54 additions & 26 deletions client/daemon/peer/peertask_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,16 @@ type peerTask struct {

// done channel will be close when peer task is finished
done chan struct{}
// peerTaskDone will be true after peer task done
peerTaskDone bool
// success will be true after peer task done
success bool
// span stands open telemetry trace span
span trace.Span

// same actions must be done only once, like close done channel and so on
once sync.Once

// failedPieceCh will hold all pieces which download failed,
// those pieces will be retry later
// those pieces will be retried later
failedPieceCh chan int32
// failedReason will be set when peer task failed
failedReason string
Expand Down Expand Up @@ -226,6 +226,10 @@ loop:
break loop
}
if err != nil {
// when success, context will be cancelled, check if pt.success is true
if pt.success {
return
}
pt.failedCode = dfcodes.UnknownError
if de, ok := err.(*dferrors.DfError); ok {
if de.Code == dfcodes.SchedNeedBackSource {
Expand Down Expand Up @@ -388,7 +392,7 @@ loop:
break loop
case <-pt.ctx.Done():
pt.Debugf("context done due to %s", pt.ctx.Err())
if !pt.peerTaskDone {
if !pt.success {
if pt.failedCode == failedCodeNotSet {
pt.failedReason = reasonContextCanceled
pt.failedCode = dfcodes.ClientContextCanceled
Expand Down Expand Up @@ -488,20 +492,38 @@ func (pt *peerTask) waitFirstPeerPacket() bool {
pt.failedReason = err.Error()
}
pt.span.AddEvent(fmt.Sprintf("pulling pieces end due to %s", err))
case <-pt.peerPacketReady:
// preparePieceTasksByPeer func already send piece result with error
pt.Infof("new peer client ready, scheduler time cost: %dus, main peer: %s",
time.Now().Sub(pt.callback.GetStartTime()).Microseconds(), pt.peerPacket.Load().(*scheduler.PeerPacket).MainPeer)
return true
case _, ok := <-pt.peerPacketReady:
if ok {
// preparePieceTasksByPeer func already send piece result with error
pt.Infof("new peer client ready, scheduler time cost: %dus, main peer: %s",
time.Now().Sub(pt.callback.GetStartTime()).Microseconds(), pt.peerPacket.Load().(*scheduler.PeerPacket).MainPeer)
return true
}
// when schedule timeout, receivePeerPacket will close pt.peerPacketReady
if pt.schedulerOption.DisableAutoBackSource {
pt.failedReason = reasonBackSourceDisabled
err := fmt.Errorf("%s, auto back source disabled", pt.failedReason)
pt.span.RecordError(err)
pt.Errorf(err.Error())
} else {
pt.Warnf("start download from source due to dfcodes.SchedNeedBackSource")
pt.span.AddEvent("back source due to scheduler says need back source")
pt.needBackSource = true
pt.backSource()
}
case <-time.After(pt.schedulerOption.ScheduleTimeout.Duration):
if pt.schedulerOption.DisableAutoBackSource {
pt.failedReason = reasonScheduleTimeout
pt.failedCode = dfcodes.ClientScheduleTimeout
logger.Errorf("%s, auto back source disabled", pt.failedReason)
err := fmt.Errorf("%s, auto back source disabled", pt.failedReason)
pt.span.RecordError(err)
pt.Errorf(err.Error())
} else {
pt.Warnf("start download from source due to %s", reasonScheduleTimeout)
pt.span.AddEvent("back source due to schedule timeout")
pt.needBackSource = true
pt.backSource()
}
pt.Errorf("start download from source due to %s", reasonScheduleTimeout)
pt.needBackSource = true
pt.backSource()
}
return false
}
Expand All @@ -511,10 +533,10 @@ func (pt *peerTask) waitAvailablePeerPacket() (int32, bool) {
select {
// when peer task without content length or total pieces count, match here
case <-pt.done:
pt.Infof("peer task done, stop get pieces from peer")
pt.Infof("peer task done, stop wait available peer packet")
case <-pt.ctx.Done():
pt.Debugf("context done due to %s", pt.ctx.Err())
if !pt.peerTaskDone {
if !pt.success {
if pt.failedCode == failedCodeNotSet {
pt.failedReason = reasonContextCanceled
pt.failedCode = dfcodes.ClientContextCanceled
Expand All @@ -529,23 +551,29 @@ func (pt *peerTask) waitAvailablePeerPacket() (int32, bool) {
}
// when schedule timeout, receivePeerPacket will close pt.peerPacketReady
if pt.schedulerOption.DisableAutoBackSource {
pt.failedReason = reasonReScheduleTimeout
pt.failedCode = dfcodes.ClientScheduleTimeout
logger.Errorf("%s, auto back source disabled", pt.failedReason)
pt.failedReason = reasonBackSourceDisabled
err := fmt.Errorf("%s, auto back source disabled", pt.failedReason)
pt.span.RecordError(err)
pt.Errorf(err.Error())
} else {
pt.Errorf("start download from source due to dfcodes.SchedNeedBackSource")
pt.Warnf("start download from source due to dfcodes.SchedNeedBackSource")
pt.span.AddEvent("back source due to scheduler says need back source ")
pt.needBackSource = true
pt.backSource()
}
case <-time.After(pt.schedulerOption.ScheduleTimeout.Duration):
if pt.schedulerOption.DisableAutoBackSource {
pt.failedReason = reasonReScheduleTimeout
pt.failedCode = dfcodes.ClientScheduleTimeout
logger.Errorf("%s, auto back source disabled", pt.failedReason)
err := fmt.Errorf("%s, auto back source disabled", pt.failedReason)
pt.span.RecordError(err)
pt.Errorf(err.Error())
} else {
pt.Warnf("start download from source due to %s", reasonReScheduleTimeout)
pt.span.AddEvent("back source due to schedule timeout")
pt.needBackSource = true
pt.backSource()
}
pt.Errorf("start download from source due to %s", reasonReScheduleTimeout)
pt.needBackSource = true
pt.backSource()
}
return -1, false
}
Expand All @@ -568,7 +596,7 @@ func (pt *peerTask) dispatchPieceRequest(pieceRequestCh chan *DownloadPieceReque
pt.Warnf("peer task done, but still some piece request not process")
case <-pt.ctx.Done():
pt.Warnf("context done due to %s", pt.ctx.Err())
if !pt.peerTaskDone {
if !pt.success {
if pt.failedCode == failedCodeNotSet {
pt.failedReason = reasonContextCanceled
pt.failedCode = dfcodes.ClientContextCanceled
Expand All @@ -585,10 +613,10 @@ func (pt *peerTask) waitFailedPiece() (int32, bool) {
// use no default branch select to wait failed piece or exit
select {
case <-pt.done:
pt.Infof("peer task done, stop get pieces from peer")
pt.Infof("peer task done, stop wait failed piece")
return -1, false
case <-pt.ctx.Done():
if !pt.peerTaskDone {
if !pt.success {
if pt.failedCode == failedCodeNotSet {
pt.failedReason = reasonContextCanceled
pt.failedCode = dfcodes.ClientContextCanceled
Expand Down
17 changes: 10 additions & 7 deletions client/daemon/peer/peertask_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,10 @@ func (pt *filePeerTask) finish() error {
pt.Debugf("finish end piece result sent")

var (
success = true
code = dfcodes.Success
message = "Success"
success = true
code = dfcodes.Success
message = "Success"
progressDone bool
)

// callback to store data to output
Expand All @@ -322,7 +323,7 @@ func (pt *filePeerTask) finish() error {
CompletedLength: pt.completedLength.Load(),
PeerTaskDone: true,
DoneCallback: func() {
pt.peerTaskDone = true
progressDone = true
close(pt.progressStopCh)
},
}
Expand All @@ -341,13 +342,14 @@ func (pt *filePeerTask) finish() error {
case <-pt.progressStopCh:
pt.Infof("progress stopped")
case <-pt.ctx.Done():
if pt.peerTaskDone {
if progressDone {
pt.Debugf("progress stopped and context done")
} else {
pt.Warnf("wait progress stopped failed, context done, but progress not stopped")
}
}
pt.Debugf("finished: close channel")
pt.success = true
close(pt.done)
pt.span.SetAttributes(config.AttributePeerTaskSuccess.Bool(true))
pt.span.End()
Expand All @@ -365,6 +367,7 @@ func (pt *filePeerTask) cleanUnfinished() {
scheduler.NewEndPieceResult(pt.taskID, pt.peerID, pt.readyPieces.Settled()))
pt.Debugf("clean up end piece result sent")

var progressDone bool
pg := &FilePeerTaskProgress{
State: &ProgressState{
Success: false,
Expand All @@ -377,7 +380,7 @@ func (pt *filePeerTask) cleanUnfinished() {
CompletedLength: pt.completedLength.Load(),
PeerTaskDone: true,
DoneCallback: func() {
pt.peerTaskDone = true
progressDone = true
close(pt.progressStopCh)
},
}
Expand All @@ -396,7 +399,7 @@ func (pt *filePeerTask) cleanUnfinished() {
case <-pt.progressStopCh:
pt.Infof("progress stopped")
case <-pt.ctx.Done():
if pt.peerTaskDone {
if progressDone {
pt.Debugf("progress stopped and context done")
} else {
pt.Warnf("wait progress stopped failed, context done, but progress not stopped")
Expand Down
5 changes: 3 additions & 2 deletions client/daemon/peer/peertask_file_callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package peer

import (
"context"
"time"

"d7y.io/dragonfly/v2/client/daemon/storage"
Expand Down Expand Up @@ -91,7 +92,7 @@ func (p *filePeerTaskCallback) Done(pt Task) error {
return e
}
p.ptm.PeerTaskDone(p.req.PeerId)
err := p.pt.schedulerClient.ReportPeerResult(p.pt.ctx, &scheduler.PeerResult{
err := p.pt.schedulerClient.ReportPeerResult(context.Background(), &scheduler.PeerResult{
TaskId: pt.GetTaskID(),
PeerId: pt.GetPeerID(),
SrcIp: p.ptm.host.Ip,
Expand All @@ -116,7 +117,7 @@ func (p *filePeerTaskCallback) Fail(pt Task, code base.Code, reason string) erro
p.ptm.PeerTaskDone(p.req.PeerId)
var end = time.Now()
pt.Log().Errorf("file peer task failed, code: %d, reason: %s", code, reason)
err := p.pt.schedulerClient.ReportPeerResult(p.pt.ctx, &scheduler.PeerResult{
err := p.pt.schedulerClient.ReportPeerResult(context.Background(), &scheduler.PeerResult{
TaskId: pt.GetTaskID(),
PeerId: pt.GetPeerID(),
SrcIp: p.ptm.host.Ip,
Expand Down
15 changes: 10 additions & 5 deletions client/daemon/peer/peertask_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type StreamPeerTask interface {

type streamPeerTask struct {
peerTask
streamDone chan struct{}
successPieceCh chan int32
}

Expand Down Expand Up @@ -149,6 +150,8 @@ func newStreamPeerTask(ctx context.Context,
limiter = rate.NewLimiter(perPeerRateLimit, int(perPeerRateLimit))
}
pt := &streamPeerTask{
successPieceCh: make(chan int32),
streamDone: make(chan struct{}),
peerTask: peerTask{
ctx: ctx,
host: host,
Expand Down Expand Up @@ -177,7 +180,6 @@ func newStreamPeerTask(ctx context.Context,
usedTraffic: atomic.NewInt64(0),
SugaredLoggerOnWith: logger.With("peer", request.PeerId, "task", result.TaskId, "component", "streamPeerTask"),
},
successPieceCh: make(chan int32),
}
// bind func that base peer task did not implement
pt.backSourceFunc = pt.backSource
Expand Down Expand Up @@ -334,9 +336,9 @@ func (s *streamPeerTask) Start(ctx context.Context) (io.Reader, map[string]strin
s.Errorf("CloseWithError failed: %s", err)
}
return
case <-s.done:
case <-s.streamDone:
for {
// all data is wrote to local storage, and all data is wrote to pipe write
// all data wrote to local storage, and all data wrote to pipe write
if s.readyPieces.Settled() == desired {
pw.Close()
return
Expand Down Expand Up @@ -369,17 +371,19 @@ func (s *streamPeerTask) Start(ctx context.Context) (io.Reader, map[string]strin
func (s *streamPeerTask) finish() error {
// send last progress
s.once.Do(func() {
s.success = true
// let stream return immediately
close(s.streamDone)
// send EOF piece result to scheduler
_ = s.peerPacketStream.Send(
scheduler.NewEndPieceResult(s.taskID, s.peerID, s.readyPieces.Settled()))
s.Debugf("end piece result sent, peer task finished")
close(s.done)
//close(s.successPieceCh)
if err := s.callback.Done(s); err != nil {
s.span.RecordError(err)
s.Errorf("done callback error: %s", err)
}
s.span.SetAttributes(config.AttributePeerTaskSuccess.Bool(true))
close(s.done)
})
return nil
}
Expand All @@ -391,6 +395,7 @@ func (s *streamPeerTask) cleanUnfinished() {
_ = s.peerPacketStream.Send(
scheduler.NewEndPieceResult(s.taskID, s.peerID, s.readyPieces.Settled()))
s.Errorf("end piece result sent, peer task failed")
close(s.streamDone)
close(s.done)
//close(s.successPieceCh)
if err := s.callback.Fail(s, s.failedCode, s.failedReason); err != nil {
Expand Down
5 changes: 3 additions & 2 deletions client/daemon/peer/peertask_stream_callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package peer

import (
"context"
"time"

"d7y.io/dragonfly/v2/client/daemon/storage"
Expand Down Expand Up @@ -89,7 +90,7 @@ func (p *streamPeerTaskCallback) Done(pt Task) error {
return e
}
p.ptm.PeerTaskDone(p.req.PeerId)
err := p.pt.schedulerClient.ReportPeerResult(p.pt.ctx, &scheduler.PeerResult{
err := p.pt.schedulerClient.ReportPeerResult(context.Background(), &scheduler.PeerResult{
TaskId: pt.GetTaskID(),
PeerId: pt.GetPeerID(),
SrcIp: p.ptm.host.Ip,
Expand All @@ -114,7 +115,7 @@ func (p *streamPeerTaskCallback) Fail(pt Task, code base.Code, reason string) er
p.ptm.PeerTaskDone(p.req.PeerId)
var end = time.Now()
pt.Log().Errorf("stream peer task failed, code: %d, reason: %s", code, reason)
err := p.pt.schedulerClient.ReportPeerResult(p.pt.ctx, &scheduler.PeerResult{
err := p.pt.schedulerClient.ReportPeerResult(context.Background(), &scheduler.PeerResult{
TaskId: pt.GetTaskID(),
PeerId: pt.GetPeerID(),
SrcIp: p.ptm.host.Ip,
Expand Down