Skip to content

Commit

Permalink
feat: support response header (#1292)
Browse files Browse the repository at this point in the history
* feat: support response header via ExtendAttribute
* chore: update http pass through header
* chore: update ExtendAttribute in cdn downloader

Signed-off-by: Jim Ma <majinjing3@gmail.com>
  • Loading branch information
jim3ma committed May 16, 2022
1 parent d601f95 commit 8f26e32
Show file tree
Hide file tree
Showing 40 changed files with 1,064 additions and 418 deletions.
23 changes: 16 additions & 7 deletions cdn/rpcserver/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ func (css *Server) ObtainSeeds(ctx context.Context, req *cdnsystem.SeedRequest,
span.RecordError(err)
return err
}
var (
extendAttributeSend bool
)
for piece := range pieceChan {
pieceSeed := &cdnsystem.PieceSeed{
PeerId: peerID,
Expand All @@ -122,6 +125,11 @@ func (css *Server) ObtainSeeds(ctx context.Context, req *cdnsystem.SeedRequest,
BeginTime: piece.BeginDownloadTime,
EndTime: piece.EndDownloadTime,
}
// only send extend attribute once
if !extendAttributeSend {
extendAttributeSend = true
pieceSeed.ExtendAttribute = registeredTask.ExtendAttribute
}
psc <- pieceSeed
jsonPiece, err := json.Marshal(pieceSeed)
if err != nil {
Expand Down Expand Up @@ -232,13 +240,14 @@ func (css *Server) GetPieceTasks(ctx context.Context, req *base.PieceTaskRequest
pieceMd5Sign = digestutils.Sha256(pieceMd5s...)
}
pp := &base.PiecePacket{
TaskId: req.TaskId,
DstPid: req.DstPid,
DstAddr: fmt.Sprintf("%s:%d", css.config.AdvertiseIP, css.config.DownloadPort),
PieceInfos: pieceInfos,
TotalPiece: seedTask.TotalPieceCount,
ContentLength: seedTask.SourceFileLength,
PieceMd5Sign: pieceMd5Sign,
TaskId: req.TaskId,
DstPid: req.DstPid,
DstAddr: fmt.Sprintf("%s:%d", css.config.AdvertiseIP, css.config.DownloadPort),
PieceInfos: pieceInfos,
TotalPiece: seedTask.TotalPieceCount,
ContentLength: seedTask.SourceFileLength,
PieceMd5Sign: pieceMd5Sign,
ExtendAttribute: seedTask.ExtendAttribute,
}
span.SetAttributes(constants.AttributePiecePacketResult.String(pp.String()))
return pp, nil
Expand Down
31 changes: 24 additions & 7 deletions cdn/supervisor/cdn/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,39 +24,56 @@ import (
"github.com/pkg/errors"

"d7y.io/dragonfly/v2/cdn/supervisor/task"
"d7y.io/dragonfly/v2/pkg/rpc/base"
"d7y.io/dragonfly/v2/pkg/source"
"d7y.io/dragonfly/v2/pkg/util/rangeutils"
"d7y.io/dragonfly/v2/pkg/util/stringutils"
)

func (cm *manager) download(ctx context.Context, seedTask *task.SeedTask, breakPoint int64) (io.ReadCloser, error) {
func (cm *manager) download(ctx context.Context, seedTask *task.SeedTask, breakPoint int64) (io.ReadCloser, *base.ExtendAttribute, error) {
var err error
breakRange := seedTask.Range
if breakPoint > 0 {
// todo replace task.SourceFileLength with totalSourceFileLength to get BreakRange
breakRange, err = getBreakRange(breakPoint, seedTask.Range, seedTask.SourceFileLength)
if err != nil {
return nil, errors.Wrapf(err, "calculate the breakRange")
return nil, nil, errors.Wrapf(err, "calculate the breakRange")
}
}
seedTask.Log().Infof("start downloading %s at range %s with header %s", seedTask.RawURL, breakRange, seedTask.Header)
downloadRequest, err := source.NewRequestWithContext(ctx, seedTask.RawURL, seedTask.Header)
if err != nil {
return nil, errors.Wrap(err, "create download request")
return nil, nil, errors.Wrap(err, "create download request")
}
if !stringutils.IsBlank(breakRange) {
downloadRequest.Header.Add(source.Range, breakRange)
}
response, err := source.Download(downloadRequest)
if err != nil {
return nil, err
return nil, nil, err
}
err = response.Validate()
if err != nil {
return nil, nil, err
}
var (
exa *base.ExtendAttribute
)
if len(response.Header) > 0 {
var header = map[string]string{}
for k, v := range response.Header {
if len(v) > 0 {
header[k] = response.Header.Get(k)
}
}
exa = &base.ExtendAttribute{Header: header}
}
// update Expire info
cm.updateExpireInfo(seedTask.ID, map[string]string{
cm.updateResponseMetadata(seedTask.ID, map[string]string{
source.LastModified: response.Header.Get(source.LastModified),
source.ETag: response.Header.Get(source.ETag),
})
return response.Body, err
}, exa)
return response.Body, exa, err
}

func getBreakRange(breakPoint int64, taskRange string, fileTotalLength int64) (string, error) {
Expand Down
16 changes: 10 additions & 6 deletions cdn/supervisor/cdn/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/ratelimiter/limitreader"
"d7y.io/dragonfly/v2/pkg/ratelimiter/ratelimiter"
"d7y.io/dragonfly/v2/pkg/rpc/base"
"d7y.io/dragonfly/v2/pkg/rpc/cdnsystem/server"
"d7y.io/dragonfly/v2/pkg/synclock"
"d7y.io/dragonfly/v2/pkg/util/digestutils"
Expand Down Expand Up @@ -151,7 +152,7 @@ func (cm *manager) doTrigger(ctx context.Context, seedTask *task.SeedTask) (*tas
if detectResult.BreakPoint == -1 {
seedTask.Log().Infof("cache full hit on local")
return getUpdateTaskInfo(seedTask, task.StatusSuccess, detectResult.FileMetadata.SourceRealDigest, detectResult.FileMetadata.PieceMd5Sign,
detectResult.FileMetadata.SourceFileLen, detectResult.FileMetadata.CdnFileLength, detectResult.FileMetadata.TotalPieceCount), nil
detectResult.FileMetadata.SourceFileLen, detectResult.FileMetadata.CdnFileLength, detectResult.FileMetadata.TotalPieceCount, detectResult.FileMetadata.ExtendAttribute), nil
}

start := time.Now()
Expand All @@ -160,13 +161,15 @@ func (cm *manager) doTrigger(ctx context.Context, seedTask *task.SeedTask) (*tas
ctx, downloadSpan = tracer.Start(ctx, constants.SpanDownloadSource)
downloadSpan.End()
server.StatSeedStart(seedTask.ID, seedTask.RawURL)
respBody, err := cm.download(ctx, seedTask, detectResult.BreakPoint)
respBody, extendAttribute, err := cm.download(ctx, seedTask, detectResult.BreakPoint)
// download fail
if err != nil {
downloadSpan.RecordError(err)
server.StatSeedFinish(seedTask.ID, seedTask.RawURL, false, err, start, time.Now(), 0, 0)
return nil, errors.Wrap(err, "download task file data")
}
seedTask.ExtendAttribute = extendAttribute

defer respBody.Close()
reader := limitreader.NewLimitReaderWithLimiterAndDigest(respBody, cm.limiter, fileDigest, digestutils.Algorithms[digestType])

Expand All @@ -185,7 +188,7 @@ func (cm *manager) doTrigger(ctx context.Context, seedTask *task.SeedTask) (*tas
return nil, err
}
return getUpdateTaskInfo(seedTask, task.StatusSuccess, downloadMetadata.sourceRealDigest, downloadMetadata.pieceMd5Sign,
downloadMetadata.realSourceFileLength, downloadMetadata.realCdnFileLength, downloadMetadata.totalPieceCount), nil
downloadMetadata.realSourceFileLength, downloadMetadata.realCdnFileLength, downloadMetadata.totalPieceCount, extendAttribute), nil
}

func (cm *manager) Delete(taskID string) error {
Expand Down Expand Up @@ -242,8 +245,8 @@ func (cm *manager) handleCDNResult(seedTask *task.SeedTask, downloadMetadata *do
return nil
}

func (cm *manager) updateExpireInfo(taskID string, expireInfo map[string]string) {
if err := cm.metadataManager.updateExpireInfo(taskID, expireInfo); err != nil {
func (cm *manager) updateResponseMetadata(taskID string, expireInfo map[string]string, exa *base.ExtendAttribute) {
if err := cm.metadataManager.updateResponseMetadata(taskID, expireInfo, exa); err != nil {
logger.WithTaskID(taskID).Errorf("failed to update expireInfo(%s): %v", expireInfo, err)
}
logger.WithTaskID(taskID).Debugf("success to update metadata expireInfo(%s)", expireInfo)
Expand All @@ -261,13 +264,14 @@ func getUpdateTaskInfoWithStatusOnly(seedTask *task.SeedTask, cdnStatus string)
}

func getUpdateTaskInfo(seedTask *task.SeedTask, cdnStatus, realMD5, pieceMd5Sign string, sourceFileLength, cdnFileLength int64,
totalPieceCount int32) *task.SeedTask {
totalPieceCount int32, exa *base.ExtendAttribute) *task.SeedTask {
cloneTask := seedTask.Clone()
cloneTask.SourceFileLength = sourceFileLength
cloneTask.CdnFileLength = cdnFileLength
cloneTask.CdnStatus = cdnStatus
cloneTask.TotalPieceCount = totalPieceCount
cloneTask.SourceRealDigest = realMD5
cloneTask.PieceMd5Sign = pieceMd5Sign
cloneTask.ExtendAttribute = exa
return cloneTask
}
8 changes: 8 additions & 0 deletions cdn/supervisor/cdn/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ func (suite *CDNManagerTestSuite) TestTriggerCDN() {
SourceRealDigest: "md5:f1e2488bba4d1267948d9e2f7008571c",
PieceMd5Sign: "bb138842f338fff90af737e4a6b2c6f8e2a7031ca9d5900bc9b646f6406d890f",
Pieces: new(sync.Map),
ExtendAttribute: &base.ExtendAttribute{Header: map[string]string{
"X-Dragonfly-Last-Modified": "Sun, 06 Jun 2021 12:52:30 GMT",
"X-Dragonfly-Etag": "etag",
}},
},
},
{
Expand Down Expand Up @@ -195,6 +199,10 @@ func (suite *CDNManagerTestSuite) TestTriggerCDN() {
SourceRealDigest: "sha256:b9907b9a5ba2b0223868c201b9addfe2ec1da1b90325d57c34f192966b0a68c5",
PieceMd5Sign: "bb138842f338fff90af737e4a6b2c6f8e2a7031ca9d5900bc9b646f6406d890f",
Pieces: new(sync.Map),
ExtendAttribute: &base.ExtendAttribute{Header: map[string]string{
"X-Dragonfly-Last-Modified": "Sun, 06 Jun 2021 12:52:30 GMT",
"X-Dragonfly-Etag": "etag",
}},
},
},
}
Expand Down
4 changes: 3 additions & 1 deletion cdn/supervisor/cdn/metadata_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"d7y.io/dragonfly/v2/cdn/supervisor/cdn/storage"
"d7y.io/dragonfly/v2/cdn/supervisor/task"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/rpc/base"
"d7y.io/dragonfly/v2/pkg/synclock"
"d7y.io/dragonfly/v2/pkg/util/digestutils"
"d7y.io/dragonfly/v2/pkg/util/stringutils"
Expand Down Expand Up @@ -90,7 +91,7 @@ func (mm *metadataManager) updateAccessTime(taskID string, accessTime int64) err
return mm.storage.WriteFileMetadata(taskID, originMetadata)
}

func (mm *metadataManager) updateExpireInfo(taskID string, expireInfo map[string]string) error {
func (mm *metadataManager) updateResponseMetadata(taskID string, expireInfo map[string]string, exa *base.ExtendAttribute) error {
mm.cacheLocker.Lock(taskID, false)
defer mm.cacheLocker.UnLock(taskID, false)

Expand All @@ -100,6 +101,7 @@ func (mm *metadataManager) updateExpireInfo(taskID string, expireInfo map[string
}

originMetadata.ExpireInfo = expireInfo
originMetadata.ExtendAttribute = exa

return mm.storage.WriteFileMetadata(taskID, originMetadata)
}
Expand Down
36 changes: 19 additions & 17 deletions cdn/supervisor/cdn/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"d7y.io/dragonfly/v2/cdn/storedriver"
"d7y.io/dragonfly/v2/cdn/supervisor/task"
"d7y.io/dragonfly/v2/pkg/rpc/base"
"d7y.io/dragonfly/v2/pkg/util/rangeutils"
)

Expand Down Expand Up @@ -81,23 +82,24 @@ type Manager interface {

// FileMetadata meta data of task
type FileMetadata struct {
TaskID string `json:"taskID"`
TaskURL string `json:"taskURL"`
PieceSize int32 `json:"pieceSize"`
SourceFileLen int64 `json:"sourceFileLen"`
AccessTime int64 `json:"accessTime"`
Interval int64 `json:"interval"`
CdnFileLength int64 `json:"cdnFileLength"`
Digest string `json:"digest"`
SourceRealDigest string `json:"sourceRealDigest"`
Tag string `json:"tag"`
ExpireInfo map[string]string `json:"expireInfo"`
Finish bool `json:"finish"`
Success bool `json:"success"`
TotalPieceCount int32 `json:"totalPieceCount"`
PieceMd5Sign string `json:"pieceMd5Sign"`
Range string `json:"range"`
Filter string `json:"filter"`
TaskID string `json:"taskID"`
TaskURL string `json:"taskURL"`
PieceSize int32 `json:"pieceSize"`
SourceFileLen int64 `json:"sourceFileLen"`
AccessTime int64 `json:"accessTime"`
Interval int64 `json:"interval"`
CdnFileLength int64 `json:"cdnFileLength"`
Digest string `json:"digest"`
SourceRealDigest string `json:"sourceRealDigest"`
Tag string `json:"tag"`
ExpireInfo map[string]string `json:"expireInfo"`
Finish bool `json:"finish"`
Success bool `json:"success"`
TotalPieceCount int32 `json:"totalPieceCount"`
PieceMd5Sign string `json:"pieceMd5Sign"`
Range string `json:"range"`
Filter string `json:"filter"`
ExtendAttribute *base.ExtendAttribute `json:"extendAttribute"`
}

// PieceMetaRecord meta data of piece
Expand Down
4 changes: 3 additions & 1 deletion cdn/supervisor/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ type SeedTask struct {
// Pieces pieces of task
Pieces *sync.Map `json:"-"` // map[uint32]*PieceInfo

ExtendAttribute *base.ExtendAttribute `json:"extendAttribute"`

logger *logger.SugaredLoggerOnWith
}

Expand Down Expand Up @@ -216,5 +218,5 @@ const (
)

func IsEqual(task1, task2 SeedTask) bool {
return cmp.Equal(task1, task2, cmpopts.IgnoreFields(SeedTask{}, "Pieces"), cmpopts.IgnoreUnexported(SeedTask{}))
return cmp.Equal(task1, task2, cmpopts.IgnoreFields(SeedTask{}, "Pieces"), cmpopts.IgnoreUnexported(SeedTask{}, base.ExtendAttribute{}))
}
36 changes: 36 additions & 0 deletions client/daemon/peer/peertask_conductor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"d7y.io/dragonfly/v2/pkg/rpc/base"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"
"d7y.io/dragonfly/v2/pkg/source"
"d7y.io/dragonfly/v2/pkg/util/digestutils"
)

Expand Down Expand Up @@ -98,6 +99,7 @@ type peerTaskConductor struct {
contentLength *atomic.Int64
completedLength *atomic.Int64
usedTraffic *atomic.Uint64
header atomic.Value

broker *pieceBroker

Expand Down Expand Up @@ -287,6 +289,7 @@ func (pt *peerTaskConductor) register() error {
pt.Infof("register task success, SizeScope: %s", base.SizeScope_name[int32(result.SizeScope)])
}

var header map[string]string
if !needBackSource {
sizeScope = result.SizeScope
switch result.SizeScope {
Expand All @@ -297,6 +300,9 @@ func (pt *peerTaskConductor) register() error {
if piece, ok := result.DirectPiece.(*scheduler.RegisterResult_SinglePiece); ok {
singlePiece = piece.SinglePiece
}
if result.ExtendAttribute != nil {
header = result.ExtendAttribute.Header
}
case base.SizeScope_TINY:
pt.span.SetAttributes(config.AttributePeerTaskSizeScope.String("tiny"))
if piece, ok := result.DirectPiece.(*scheduler.RegisterResult_PieceContent); ok {
Expand All @@ -312,6 +318,9 @@ func (pt *peerTaskConductor) register() error {
pt.cancel(base.Code_SchedError, err.Error())
return err
}
if result.ExtendAttribute != nil {
header = result.ExtendAttribute.Header
}
}
}

Expand All @@ -328,6 +337,10 @@ func (pt *peerTaskConductor) register() error {
pt.singlePiece = singlePiece
pt.tinyData = tinyData
pt.needBackSource = atomic.NewBool(needBackSource)

if len(header) > 0 {
pt.SetHeader(header)
}
return nil
}

Expand Down Expand Up @@ -394,6 +407,22 @@ func (pt *peerTaskConductor) GetPieceMd5Sign() string {
return pt.digest.Load()
}

func (pt *peerTaskConductor) SetHeader(header map[string]string) {
var hdr = &source.Header{}
for k, v := range header {
hdr.Set(k, v)
}
pt.header.Store(hdr)
}

func (pt *peerTaskConductor) GetHeader() *source.Header {
hdr := pt.header.Load()
if hdr != nil {
return hdr.(*source.Header)
}
return nil
}

func (pt *peerTaskConductor) Context() context.Context {
return pt.ctx
}
Expand Down Expand Up @@ -916,6 +945,12 @@ func (pt *peerTaskConductor) updateMetadata(piecePacket *base.PiecePacket) {
pt.Debugf("update content length: %d", piecePacket.ContentLength)
}

if piecePacket.ExtendAttribute != nil && len(piecePacket.ExtendAttribute.Header) > 0 && pt.GetHeader() == nil {
metadataChanged = true
pt.SetHeader(piecePacket.ExtendAttribute.Header)
pt.Debugf("update response header: %#v", piecePacket.ExtendAttribute.Header)
}

if metadataChanged {
err := pt.UpdateStorage()
if err != nil {
Expand Down Expand Up @@ -1362,6 +1397,7 @@ func (pt *peerTaskConductor) UpdateStorage() error {
ContentLength: pt.GetContentLength(),
TotalPieces: pt.GetTotalPieces(),
PieceMd5Sign: pt.GetPieceMd5Sign(),
Header: pt.GetHeader(),
})
if err != nil {
pt.Log().Errorf("update task to storage manager failed: %s", err)
Expand Down

0 comments on commit 8f26e32

Please sign in to comment.