Skip to content

Commit

Permalink
Feature: prefetch ranged requests (#1053)
Browse files Browse the repository at this point in the history
1. implement prefetch ranged requests
2. optimize exact http code in transport
3. simplify reuse peer task logic
4. reuse peer task for ranged request size error
5. fix data race for peer task storage

Signed-off-by: Jim Ma <majinjing3@gmail.com>
  • Loading branch information
jim3ma committed Feb 10, 2022
1 parent 9f6fd21 commit fed4f56
Show file tree
Hide file tree
Showing 21 changed files with 418 additions and 103 deletions.
4 changes: 4 additions & 0 deletions client/clientutil/range.go
Expand Up @@ -29,6 +29,10 @@ type Range struct {
Start, Length int64
}

func (r Range) String() string {
return fmt.Sprintf("bytes=%d-%d", r.Start, r.Start+r.Length-1)
}

// ErrNoOverlap is returned by ParseRange if first-byte-pos of
// all of the byte-range-spec values is greater than the content size.
var ErrNoOverlap = errors.New("invalid range: failed to overlap")
Expand Down
1 change: 1 addition & 0 deletions client/config/constants_otel.go
Expand Up @@ -25,6 +25,7 @@ const (
AttributePeerID = attribute.Key("d7y.peer.id")
AttributeTargetPeerID = attribute.Key("d7y.peer.target.id")
AttributeReusePeerID = attribute.Key("d7y.peer.reuse.id")
AttributeReuseRange = attribute.Key("d7y.peer.reuse.range")
AttributeTargetPeerAddr = attribute.Key("d7y.peer.target.addr")
AttributeMainPeer = attribute.Key("d7y.peer.task.main_peer")
AttributePeerPacketCode = attribute.Key("d7y.peer.packet.code")
Expand Down
1 change: 1 addition & 0 deletions client/config/headers.go
Expand Up @@ -20,6 +20,7 @@ const (
HeaderDragonflyFilter = "X-Dragonfly-Filter"
HeaderDragonflyPeer = "X-Dragonfly-Peer"
HeaderDragonflyTask = "X-Dragonfly-Task"
HeaderDragonflyRange = "X-Dragonfly-Range"
HeaderDragonflyBiz = "X-Dragonfly-Biz"
// HeaderDragonflyRegistry is used for dynamic registry mirrors
HeaderDragonflyRegistry = "X-Dragonfly-Registry"
Expand Down
1 change: 1 addition & 0 deletions client/config/peerhost.go
Expand Up @@ -175,6 +175,7 @@ type DownloadOption struct {
CalculateDigest bool `mapstructure:"calculateDigest" yaml:"calculateDigest"`
TransportOption *TransportOption `mapstructure:"transportOption" yaml:"transportOption"`
GetPiecesMaxRetry int `mapstructure:"getPiecesMaxRetry" yaml:"getPiecesMaxRetry"`
Prefetch bool `mapstructure:"prefetch" yaml:"prefetch"`
}

type TransportOption struct {
Expand Down
2 changes: 1 addition & 1 deletion client/daemon/daemon.go
Expand Up @@ -169,7 +169,7 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) {
return nil, err
}
peerTaskManager, err := peer.NewPeerTaskManager(host, pieceManager, storageManager, sched, opt.Scheduler,
opt.Download.PerPeerRateLimit.Limit, opt.Storage.Multiplex, opt.Download.CalculateDigest, opt.Download.GetPiecesMaxRetry)
opt.Download.PerPeerRateLimit.Limit, opt.Storage.Multiplex, opt.Download.Prefetch, opt.Download.CalculateDigest, opt.Download.GetPiecesMaxRetry)
if err != nil {
return nil, err
}
Expand Down
21 changes: 14 additions & 7 deletions client/daemon/peer/peertask_conductor.go
Expand Up @@ -414,7 +414,7 @@ func (pt *peerTaskConductor) storeTinyPeerTask() {
pt.SetTotalPieces(1)
ctx := pt.ctx
var err error
pt.storage, err = pt.peerTaskManager.storageManager.RegisterTask(ctx,
storageDriver, err := pt.peerTaskManager.storageManager.RegisterTask(ctx,
storage.RegisterTaskRequest{
CommonTaskRequest: storage.CommonTaskRequest{
PeerID: pt.tinyData.PeerID,
Expand All @@ -424,12 +424,13 @@ func (pt *peerTaskConductor) storeTinyPeerTask() {
TotalPieces: 1,
// TODO check digest
})
pt.storage = storageDriver
if err != nil {
logger.Errorf("register tiny data storage failed: %s", err)
pt.cancel(base.Code_ClientError, err.Error())
return
}
n, err := pt.storage.WritePiece(ctx,
n, err := pt.GetStorage().WritePiece(ctx,
&storage.WritePieceRequest{
PeerTaskMetadata: storage.PeerTaskMetadata{
PeerID: pt.tinyData.PeerID,
Expand Down Expand Up @@ -653,7 +654,7 @@ func (pt *peerTaskConductor) pullSinglePiece() {
}

request := &DownloadPieceRequest{
storage: pt.storage,
storage: pt.GetStorage(),
piece: pt.singlePiece.PieceInfo,
log: pt.Log(),
TaskID: pt.GetTaskID(),
Expand Down Expand Up @@ -892,7 +893,7 @@ func (pt *peerTaskConductor) dispatchPieceRequest(pieceRequestCh chan *DownloadP
pt.requestedPieces.Set(piece.PieceNum)
}
req := &DownloadPieceRequest{
storage: pt.storage,
storage: pt.GetStorage(),
piece: piece,
log: pt.Log(),
TaskID: pt.GetTaskID(),
Expand Down Expand Up @@ -1106,6 +1107,12 @@ func (pt *peerTaskConductor) reportFailResult(request *DownloadPieceRequest, res
}

func (pt *peerTaskConductor) InitStorage() (err error) {
pt.lock.Lock()
defer pt.lock.Unlock()
// check storage for partial back source cases.
if pt.storage != nil {
return nil
}
// prepare storage
pt.storage, err = pt.storageManager.RegisterTask(pt.ctx,
storage.RegisterTaskRequest{
Expand All @@ -1125,7 +1132,7 @@ func (pt *peerTaskConductor) InitStorage() (err error) {

func (pt *peerTaskConductor) UpdateStorage() error {
// update storage
err := pt.storage.UpdateTask(pt.ctx,
err := pt.GetStorage().UpdateTask(pt.ctx,
&storage.UpdateTaskRequest{
PeerTaskMetadata: storage.PeerTaskMetadata{
PeerID: pt.GetPeerID(),
Expand Down Expand Up @@ -1278,7 +1285,7 @@ func (pt *peerTaskConductor) fail() {

// Validate stores metadata and validates digest
func (pt *peerTaskConductor) Validate() error {
err := pt.storage.Store(pt.ctx,
err := pt.GetStorage().Store(pt.ctx,
&storage.StoreRequest{
CommonTaskRequest: storage.CommonTaskRequest{
PeerID: pt.peerID,
Expand All @@ -1295,7 +1302,7 @@ func (pt *peerTaskConductor) Validate() error {
if !pt.peerTaskManager.calculateDigest {
return nil
}
err = pt.storage.ValidateDigest(
err = pt.GetStorage().ValidateDigest(
&storage.PeerTaskMetadata{
PeerID: pt.GetPeerID(),
TaskID: pt.GetTaskID(),
Expand Down
4 changes: 4 additions & 0 deletions client/daemon/peer/peertask_file.go
Expand Up @@ -88,6 +88,10 @@ func (ptm *peerTaskManager) newFileTask(
if err != nil {
return nil, nil, err
}
// prefetch parent request
if ptm.enablePrefetch && request.UrlMeta.Range != "" {
go ptm.prefetch(&request.PeerTaskRequest)
}
ctx, span := tracer.Start(ctx, config.SpanFileTask, trace.WithSpanKind(trace.SpanKindClient))

pt := &fileTask{
Expand Down
48 changes: 44 additions & 4 deletions client/daemon/peer/peertask_manager.go
Expand Up @@ -21,6 +21,7 @@ import (
"io"
"sync"

"github.com/go-http-utils/headers"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"golang.org/x/time/rate"
Expand All @@ -29,6 +30,8 @@ import (
"d7y.io/dragonfly/v2/client/daemon/metrics"
"d7y.io/dragonfly/v2/client/daemon/storage"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/idgen"
"d7y.io/dragonfly/v2/pkg/rpc/base"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"
)
Expand Down Expand Up @@ -108,10 +111,10 @@ type peerTaskManager struct {

perPeerRateLimit rate.Limit

// enableMultiplex indicates reusing completed peer task storage
// currently, only check completed peer task after register to scheduler
// TODO multiplex the running peer task
// enableMultiplex indicates to reuse the data of completed peer tasks
enableMultiplex bool
// enablePrefetch indicates to prefetch the whole files of ranged requests
enablePrefetch bool

calculateDigest bool

Expand All @@ -126,6 +129,7 @@ func NewPeerTaskManager(
schedulerOption config.SchedulerOption,
perPeerRateLimit rate.Limit,
multiplex bool,
prefetch bool,
calculateDigest bool,
getPiecesMaxRetry int) (TaskManager, error) {

Expand All @@ -139,6 +143,7 @@ func NewPeerTaskManager(
schedulerOption: schedulerOption,
perPeerRateLimit: perPeerRateLimit,
enableMultiplex: multiplex,
enablePrefetch: prefetch,
calculateDigest: calculateDigest,
getPiecesMaxRetry: getPiecesMaxRetry,
}
Expand Down Expand Up @@ -198,6 +203,41 @@ func (ptm *peerTaskManager) getOrCreatePeerTaskConductor(
return ptc, true, nil
}

func (ptm *peerTaskManager) prefetch(request *scheduler.PeerTaskRequest) {
req := &scheduler.PeerTaskRequest{
Url: request.Url,
PeerId: request.PeerId,
PeerHost: ptm.host,
HostLoad: request.HostLoad,
IsMigrating: request.IsMigrating,
UrlMeta: &base.UrlMeta{
Digest: request.UrlMeta.Digest,
Tag: request.UrlMeta.Tag,
Filter: request.UrlMeta.Filter,
Header: map[string]string{},
},
}
for k, v := range request.UrlMeta.Header {
if k == headers.Range {
continue
}
req.UrlMeta.Header[k] = v
}
taskID := idgen.TaskID(req.Url, req.UrlMeta)
req.PeerId = idgen.PeerID(req.PeerHost.Ip)

var limit = rate.Inf
if ptm.perPeerRateLimit > 0 {
limit = ptm.perPeerRateLimit
}

logger.Infof("prefetch peer task %s/%s", taskID, req.PeerId)
prefetch, err := ptm.getPeerTaskConductor(context.Background(), taskID, req, limit)
if err != nil {
logger.Errorf("prefetch peer task %s/%s error: %s", prefetch.taskID, prefetch.peerID, err)
}
}

func (ptm *peerTaskManager) StartFileTask(ctx context.Context, req *FileTaskRequest) (chan *FileTaskProgress, *TinyData, error) {
if ptm.enableMultiplex {
progress, ok := ptm.tryReuseFilePeerTask(ctx, req)
Expand Down Expand Up @@ -235,7 +275,7 @@ func (ptm *peerTaskManager) StartStreamTask(ctx context.Context, req *StreamTask
}

if ptm.enableMultiplex {
r, attr, ok := ptm.tryReuseStreamPeerTask(ctx, peerTaskRequest)
r, attr, ok := ptm.tryReuseStreamPeerTask(ctx, req)
if ok {
metrics.PeerTaskCacheHitCount.Add(1)
return r, attr, nil
Expand Down

0 comments on commit fed4f56

Please sign in to comment.