Skip to content

Commit

Permalink
refactor: parse http range (#2071)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi committed Jun 28, 2023
1 parent 1409e37 commit 79024c8
Show file tree
Hide file tree
Showing 33 changed files with 532 additions and 596 deletions.
37 changes: 8 additions & 29 deletions client/daemon/objectstorage/objectstorage.go
Expand Up @@ -21,7 +21,6 @@ package objectstorage
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"math"
Expand All @@ -45,10 +44,10 @@ import (
"d7y.io/dragonfly/v2/client/config"
"d7y.io/dragonfly/v2/client/daemon/peer"
"d7y.io/dragonfly/v2/client/daemon/storage"
"d7y.io/dragonfly/v2/client/util"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/digest"
"d7y.io/dragonfly/v2/pkg/idgen"
nethttp "d7y.io/dragonfly/v2/pkg/net/http"
"d7y.io/dragonfly/v2/pkg/objectstorage"
pkgstrings "d7y.io/dragonfly/v2/pkg/strings"
)
Expand Down Expand Up @@ -243,12 +242,11 @@ func (o *objectStorage) getObject(ctx *gin.Context) {
}

var (
bucketName = params.ID
objectKey = strings.TrimPrefix(params.ObjectKey, string(os.PathSeparator))
filter = query.Filter
artifactRange *util.Range
ranges []util.Range
err error
bucketName = params.ID
objectKey = strings.TrimPrefix(params.ObjectKey, string(os.PathSeparator))
filter = query.Filter
rg nethttp.Range
err error
)

// Initialize filter field.
Expand Down Expand Up @@ -279,12 +277,11 @@ func (o *objectStorage) getObject(ctx *gin.Context) {
// Parse http range header.
rangeHeader := ctx.GetHeader(headers.Range)
if len(rangeHeader) > 0 {
ranges, err = o.parseRangeHeader(rangeHeader)
rg, err = nethttp.ParseOneRange(rangeHeader, math.MaxInt64)
if err != nil {
ctx.JSON(http.StatusRequestedRangeNotSatisfiable, gin.H{"errors": err.Error()})
return
}
artifactRange = &ranges[0]

// Range header in dragonfly is without "bytes=".
urlMeta.Range = strings.TrimLeft(rangeHeader, "bytes=")
Expand All @@ -307,7 +304,7 @@ func (o *objectStorage) getObject(ctx *gin.Context) {
reader, attr, err := o.peerTaskManager.StartStreamTask(ctx, &peer.StreamTaskRequest{
URL: signURL,
URLMeta: urlMeta,
Range: artifactRange,
Range: &rg,
PeerID: o.peerIDGenerator.PeerID(),
})
if err != nil {
Expand Down Expand Up @@ -641,21 +638,3 @@ func (o *objectStorage) client() (objectstorage.ObjectStorage, error) {

return client, nil
}

// parseRangeHeader uses to parse range http header for dragonfly.
func (o *objectStorage) parseRangeHeader(rangeHeader string) ([]util.Range, error) {
ranges, err := util.ParseRange(rangeHeader, math.MaxInt64)
if err != nil {
return nil, err
}

if len(ranges) > 1 {
return nil, errors.New("multiple range is not supported")
}

if len(ranges) == 0 {
return nil, errors.New("zero range is not supported")
}

return ranges, nil
}
8 changes: 4 additions & 4 deletions client/daemon/peer/peertask_conductor.go
Expand Up @@ -41,11 +41,11 @@ import (
"d7y.io/dragonfly/v2/client/config"
"d7y.io/dragonfly/v2/client/daemon/metrics"
"d7y.io/dragonfly/v2/client/daemon/storage"
"d7y.io/dragonfly/v2/client/util"
"d7y.io/dragonfly/v2/internal/dferrors"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/digest"
"d7y.io/dragonfly/v2/pkg/idgen"
nethttp "d7y.io/dragonfly/v2/pkg/net/http"
"d7y.io/dragonfly/v2/pkg/rpc/common"
schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"
"d7y.io/dragonfly/v2/pkg/source"
Expand Down Expand Up @@ -151,7 +151,7 @@ type peerTaskConductor struct {

// subtask only
parent *peerTaskConductor
rg *util.Range
rg *nethttp.Range

sourceErrorStatus *status.Status
}
Expand All @@ -176,7 +176,7 @@ func (ptm *peerTaskManager) newPeerTaskConductor(
request *schedulerv1.PeerTaskRequest,
limit rate.Limit,
parent *peerTaskConductor,
rg *util.Range,
rg *nethttp.Range,
seed bool) *peerTaskConductor {
// use a new context with span info
ctx = trace.ContextWithSpan(context.Background(), trace.SpanFromContext(ctx))
Expand Down Expand Up @@ -622,7 +622,7 @@ func (pt *peerTaskConductor) storeTinyPeerTask() {
Num: 0,
Md5: "",
Offset: 0,
Range: util.Range{
Range: nethttp.Range{
Start: 0,
Length: contentLength,
},
Expand Down
4 changes: 2 additions & 2 deletions client/daemon/peer/peertask_file.go
Expand Up @@ -29,17 +29,17 @@ import (
"d7y.io/dragonfly/v2/client/config"
"d7y.io/dragonfly/v2/client/daemon/metrics"
"d7y.io/dragonfly/v2/client/daemon/storage"
"d7y.io/dragonfly/v2/client/util"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/idgen"
"d7y.io/dragonfly/v2/pkg/net/http"
)

type FileTaskRequest struct {
schedulerv1.PeerTaskRequest
Output string
Limit float64
DisableBackSource bool
Range *util.Range
Range *http.Range
KeepOriginalOffset bool
}

Expand Down
10 changes: 5 additions & 5 deletions client/daemon/peer/peertask_manager.go
Expand Up @@ -36,10 +36,10 @@ import (

"d7y.io/dragonfly/v2/client/daemon/metrics"
"d7y.io/dragonfly/v2/client/daemon/storage"
clientutil "d7y.io/dragonfly/v2/client/util"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/internal/util"
"d7y.io/dragonfly/v2/pkg/idgen"
nethttp "d7y.io/dragonfly/v2/pkg/net/http"
schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"
)

Expand Down Expand Up @@ -163,7 +163,7 @@ func (ptm *peerTaskManager) getPeerTaskConductor(ctx context.Context,
request *schedulerv1.PeerTaskRequest,
limit rate.Limit,
parent *peerTaskConductor,
rg *clientutil.Range,
rg *nethttp.Range,
desiredLocation string,
seed bool) (*peerTaskConductor, error) {
var (
Expand Down Expand Up @@ -200,7 +200,7 @@ func (ptm *peerTaskManager) getOrCreatePeerTaskConductor(
request *schedulerv1.PeerTaskRequest,
limit rate.Limit,
parent *peerTaskConductor,
rg *clientutil.Range,
rg *nethttp.Range,
desiredLocation string,
seed bool) (*peerTaskConductor, bool, error) {
if ptc, ok := ptm.findPeerTaskConductor(taskID); ok {
Expand Down Expand Up @@ -241,7 +241,7 @@ func (ptm *peerTaskManager) createSplitedPeerTaskConductor(
request *schedulerv1.PeerTaskRequest,
limit rate.Limit,
parent *peerTaskConductor,
rg *clientutil.Range,
rg *nethttp.Range,
desiredLocation string,
seed bool) (*peerTaskConductor, bool, error) {
ptc := ptm.newPeerTaskConductor(ctx, request, limit, parent, rg, seed)
Expand All @@ -259,7 +259,7 @@ func (ptm *peerTaskManager) createSplitedPeerTaskConductor(
return ptc, true, nil
}

func (ptm *peerTaskManager) enabledPrefetch(rg *clientutil.Range) bool {
func (ptm *peerTaskManager) enabledPrefetch(rg *nethttp.Range) bool {
return ptm.Prefetch && rg != nil
}

Expand Down
21 changes: 11 additions & 10 deletions client/daemon/peer/peertask_manager_test.go
Expand Up @@ -58,6 +58,7 @@ import (
"d7y.io/dragonfly/v2/pkg/dfnet"
"d7y.io/dragonfly/v2/pkg/digest"
"d7y.io/dragonfly/v2/pkg/idgen"
nethttp "d7y.io/dragonfly/v2/pkg/net/http"
"d7y.io/dragonfly/v2/pkg/rpc"
daemonserver "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/server"
schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"
Expand Down Expand Up @@ -337,7 +338,7 @@ type testSpec struct {
taskType int
name string
taskData []byte
httpRange *util.Range // only used in back source cases
httpRange *nethttp.Range // only used in back source cases
pieceParallelCount int32
pieceSize int
sizeScope commonv1.SizeScope
Expand All @@ -354,7 +355,7 @@ type testSpec struct {
backSource bool

mockPieceDownloader func(ctrl *gomock.Controller, taskData []byte, pieceSize int) PieceDownloader
mockHTTPSourceClient func(t *testing.T, ctrl *gomock.Controller, rg *util.Range, taskData []byte, url string) source.ResourceClient
mockHTTPSourceClient func(t *testing.T, ctrl *gomock.Controller, rg *nethttp.Range, taskData []byte, url string) source.ResourceClient

cleanUp []func()
}
Expand Down Expand Up @@ -450,7 +451,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) {
url: "http://localhost/test/data",
sizeScope: commonv1.SizeScope_NORMAL,
mockPieceDownloader: nil,
mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *util.Range, taskData []byte, url string) source.ResourceClient {
mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *nethttp.Range, taskData []byte, url string) source.ResourceClient {
sourceClient := sourcemocks.NewMockResourceClient(ctrl)
sourceClient.EXPECT().GetContentLength(source.RequestEq(url)).AnyTimes().DoAndReturn(
func(request *source.Request) (int64, error) {
Expand All @@ -473,18 +474,18 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) {
backSource: true,
url: "http://localhost/test/data",
sizeScope: commonv1.SizeScope_NORMAL,
httpRange: &util.Range{
httpRange: &nethttp.Range{
Start: 0,
Length: 4096,
},
mockPieceDownloader: nil,
mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *util.Range, taskData []byte, url string) source.ResourceClient {
mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *nethttp.Range, taskData []byte, url string) source.ResourceClient {
sourceClient := sourcemocks.NewMockResourceClient(ctrl)
sourceClient.EXPECT().GetContentLength(source.RequestEq(url)).AnyTimes().DoAndReturn(
func(request *source.Request) (int64, error) {
assert := testifyassert.New(t)
if rg != nil {
rgs, err := util.ParseRange(request.Header.Get(headers.Range), math.MaxInt64)
rgs, err := nethttp.ParseRange(request.Header.Get(headers.Range), math.MaxInt64)
assert.Nil(err)
assert.Equal(1, len(rgs))
assert.Equal(rg.String(), rgs[0].String())
Expand All @@ -495,7 +496,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) {
func(request *source.Request) (*source.Response, error) {
assert := testifyassert.New(t)
if rg != nil {
rgs, err := util.ParseRange(request.Header.Get(headers.Range), math.MaxInt64)
rgs, err := nethttp.ParseRange(request.Header.Get(headers.Range), math.MaxInt64)
assert.Nil(err)
assert.Equal(1, len(rgs))
assert.Equal(rg.String(), rgs[0].String())
Expand All @@ -516,7 +517,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) {
url: "http://localhost/test/data",
sizeScope: commonv1.SizeScope_NORMAL,
mockPieceDownloader: nil,
mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *util.Range, taskData []byte, url string) source.ResourceClient {
mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *nethttp.Range, taskData []byte, url string) source.ResourceClient {
sourceClient := sourcemocks.NewMockResourceClient(ctrl)
sourceClient.EXPECT().GetContentLength(source.RequestEq(url)).AnyTimes().DoAndReturn(
func(request *source.Request) (int64, error) {
Expand All @@ -540,7 +541,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) {
url: "http://localhost/test/data",
sizeScope: commonv1.SizeScope_NORMAL,
mockPieceDownloader: nil,
mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *util.Range, taskData []byte, url string) source.ResourceClient {
mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *nethttp.Range, taskData []byte, url string) source.ResourceClient {
sourceClient := sourcemocks.NewMockResourceClient(ctrl)
sourceClient.EXPECT().GetContentLength(source.RequestEq(url)).AnyTimes().DoAndReturn(
func(request *source.Request) (int64, error) {
Expand Down Expand Up @@ -588,7 +589,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) {
url: "http://localhost/test/data",
sizeScope: commonv1.SizeScope_NORMAL,
mockPieceDownloader: nil,
mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *util.Range, taskData []byte, url string) source.ResourceClient {
mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *nethttp.Range, taskData []byte, url string) source.ResourceClient {
sourceClient := sourcemocks.NewMockResourceClient(ctrl)
sourceClient.EXPECT().GetContentLength(source.RequestEq(url)).AnyTimes().DoAndReturn(
func(request *source.Request) (int64, error) {
Expand Down
10 changes: 5 additions & 5 deletions client/daemon/peer/peertask_reuse.go
Expand Up @@ -31,9 +31,9 @@ import (

"d7y.io/dragonfly/v2/client/config"
"d7y.io/dragonfly/v2/client/daemon/storage"
"d7y.io/dragonfly/v2/client/util"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/idgen"
"d7y.io/dragonfly/v2/pkg/net/http"
)

var _ *logger.SugaredLoggerOnWith // pin this package for no log code generation
Expand All @@ -51,7 +51,7 @@ func (ptm *peerTaskManager) tryReuseFilePeerTask(ctx context.Context,
taskID := idgen.TaskIDV1(request.Url, request.UrlMeta)
var (
reuse *storage.ReusePeerTask
reuseRange *util.Range // the range of parent peer task data to read
reuseRange *http.Range // the range of parent peer task data to read
log *logger.SugaredLoggerOnWith
length int64
err error
Expand Down Expand Up @@ -200,7 +200,7 @@ func (ptm *peerTaskManager) tryReuseFilePeerTask(ctx context.Context,
}

func (ptm *peerTaskManager) storePartialFile(ctx context.Context, request *FileTaskRequest,
log *logger.SugaredLoggerOnWith, reuse *storage.ReusePeerTask, rg *util.Range) error {
log *logger.SugaredLoggerOnWith, reuse *storage.ReusePeerTask, rg *http.Range) error {
f, err := os.OpenFile(request.Output, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
if err != nil {
log.Errorf("open dest file error when reuse peer task: %s", err)
Expand Down Expand Up @@ -230,7 +230,7 @@ func (ptm *peerTaskManager) tryReuseStreamPeerTask(ctx context.Context,
taskID := idgen.TaskIDV1(request.URL, request.URLMeta)
var (
reuse *storage.ReusePeerTask
reuseRange *util.Range // the range of parent peer task data to read
reuseRange *http.Range // the range of parent peer task data to read
log *logger.SugaredLoggerOnWith
length int64
)
Expand Down Expand Up @@ -347,7 +347,7 @@ func (ptm *peerTaskManager) tryReuseSeedPeerTask(ctx context.Context,
taskID := idgen.TaskIDV1(request.Url, request.UrlMeta)
var (
reuse *storage.ReusePeerTask
reuseRange *util.Range // the range of parent peer task data to read
reuseRange *http.Range // the range of parent peer task data to read
log *logger.SugaredLoggerOnWith
)

Expand Down

0 comments on commit 79024c8

Please sign in to comment.