diff --git a/client/daemon/objectstorage/objectstorage.go b/client/daemon/objectstorage/objectstorage.go index 2c13c83ce45..88c088c1e3f 100644 --- a/client/daemon/objectstorage/objectstorage.go +++ b/client/daemon/objectstorage/objectstorage.go @@ -21,7 +21,6 @@ package objectstorage import ( "bytes" "context" - "errors" "fmt" "io" "math" @@ -45,10 +44,10 @@ import ( "d7y.io/dragonfly/v2/client/config" "d7y.io/dragonfly/v2/client/daemon/peer" "d7y.io/dragonfly/v2/client/daemon/storage" - "d7y.io/dragonfly/v2/client/util" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/digest" "d7y.io/dragonfly/v2/pkg/idgen" + nethttp "d7y.io/dragonfly/v2/pkg/net/http" "d7y.io/dragonfly/v2/pkg/objectstorage" pkgstrings "d7y.io/dragonfly/v2/pkg/strings" ) @@ -243,12 +242,11 @@ func (o *objectStorage) getObject(ctx *gin.Context) { } var ( - bucketName = params.ID - objectKey = strings.TrimPrefix(params.ObjectKey, string(os.PathSeparator)) - filter = query.Filter - artifactRange *util.Range - ranges []util.Range - err error + bucketName = params.ID + objectKey = strings.TrimPrefix(params.ObjectKey, string(os.PathSeparator)) + filter = query.Filter + rg nethttp.Range + err error ) // Initialize filter field. @@ -279,12 +277,11 @@ func (o *objectStorage) getObject(ctx *gin.Context) { // Parse http range header. rangeHeader := ctx.GetHeader(headers.Range) if len(rangeHeader) > 0 { - ranges, err = o.parseRangeHeader(rangeHeader) + rg, err = nethttp.ParseOneRange(rangeHeader, math.MaxInt64) if err != nil { ctx.JSON(http.StatusRequestedRangeNotSatisfiable, gin.H{"errors": err.Error()}) return } - artifactRange = &ranges[0] // Range header in dragonfly is without "bytes=". urlMeta.Range = strings.TrimLeft(rangeHeader, "bytes=") @@ -307,7 +304,7 @@ func (o *objectStorage) getObject(ctx *gin.Context) { reader, attr, err := o.peerTaskManager.StartStreamTask(ctx, &peer.StreamTaskRequest{ URL: signURL, URLMeta: urlMeta, - Range: artifactRange, + Range: &rg, PeerID: o.peerIDGenerator.PeerID(), }) if err != nil { @@ -641,21 +638,3 @@ func (o *objectStorage) client() (objectstorage.ObjectStorage, error) { return client, nil } - -// parseRangeHeader uses to parse range http header for dragonfly. -func (o *objectStorage) parseRangeHeader(rangeHeader string) ([]util.Range, error) { - ranges, err := util.ParseRange(rangeHeader, math.MaxInt64) - if err != nil { - return nil, err - } - - if len(ranges) > 1 { - return nil, errors.New("multiple range is not supported") - } - - if len(ranges) == 0 { - return nil, errors.New("zero range is not supported") - } - - return ranges, nil -} diff --git a/client/daemon/peer/peertask_conductor.go b/client/daemon/peer/peertask_conductor.go index 5e112d8bc6a..9486f4a1ab6 100644 --- a/client/daemon/peer/peertask_conductor.go +++ b/client/daemon/peer/peertask_conductor.go @@ -41,11 +41,11 @@ import ( "d7y.io/dragonfly/v2/client/config" "d7y.io/dragonfly/v2/client/daemon/metrics" "d7y.io/dragonfly/v2/client/daemon/storage" - "d7y.io/dragonfly/v2/client/util" "d7y.io/dragonfly/v2/internal/dferrors" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/digest" "d7y.io/dragonfly/v2/pkg/idgen" + nethttp "d7y.io/dragonfly/v2/pkg/net/http" "d7y.io/dragonfly/v2/pkg/rpc/common" schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client" "d7y.io/dragonfly/v2/pkg/source" @@ -151,7 +151,7 @@ type peerTaskConductor struct { // subtask only parent *peerTaskConductor - rg *util.Range + rg *nethttp.Range sourceErrorStatus *status.Status } @@ -176,7 +176,7 @@ func (ptm *peerTaskManager) newPeerTaskConductor( request *schedulerv1.PeerTaskRequest, limit rate.Limit, parent *peerTaskConductor, - rg *util.Range, + rg *nethttp.Range, seed bool) *peerTaskConductor { // use a new context with span info ctx = trace.ContextWithSpan(context.Background(), trace.SpanFromContext(ctx)) @@ -622,7 +622,7 @@ func (pt *peerTaskConductor) storeTinyPeerTask() { Num: 0, Md5: "", Offset: 0, - Range: util.Range{ + Range: nethttp.Range{ Start: 0, Length: contentLength, }, diff --git a/client/daemon/peer/peertask_file.go b/client/daemon/peer/peertask_file.go index ccdee6da9b7..311bac609d7 100644 --- a/client/daemon/peer/peertask_file.go +++ b/client/daemon/peer/peertask_file.go @@ -29,9 +29,9 @@ import ( "d7y.io/dragonfly/v2/client/config" "d7y.io/dragonfly/v2/client/daemon/metrics" "d7y.io/dragonfly/v2/client/daemon/storage" - "d7y.io/dragonfly/v2/client/util" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/idgen" + "d7y.io/dragonfly/v2/pkg/net/http" ) type FileTaskRequest struct { @@ -39,7 +39,7 @@ type FileTaskRequest struct { Output string Limit float64 DisableBackSource bool - Range *util.Range + Range *http.Range KeepOriginalOffset bool } diff --git a/client/daemon/peer/peertask_manager.go b/client/daemon/peer/peertask_manager.go index 21507852897..9cfef25782f 100644 --- a/client/daemon/peer/peertask_manager.go +++ b/client/daemon/peer/peertask_manager.go @@ -36,10 +36,10 @@ import ( "d7y.io/dragonfly/v2/client/daemon/metrics" "d7y.io/dragonfly/v2/client/daemon/storage" - clientutil "d7y.io/dragonfly/v2/client/util" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/internal/util" "d7y.io/dragonfly/v2/pkg/idgen" + nethttp "d7y.io/dragonfly/v2/pkg/net/http" schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client" ) @@ -163,7 +163,7 @@ func (ptm *peerTaskManager) getPeerTaskConductor(ctx context.Context, request *schedulerv1.PeerTaskRequest, limit rate.Limit, parent *peerTaskConductor, - rg *clientutil.Range, + rg *nethttp.Range, desiredLocation string, seed bool) (*peerTaskConductor, error) { var ( @@ -200,7 +200,7 @@ func (ptm *peerTaskManager) getOrCreatePeerTaskConductor( request *schedulerv1.PeerTaskRequest, limit rate.Limit, parent *peerTaskConductor, - rg *clientutil.Range, + rg *nethttp.Range, desiredLocation string, seed bool) (*peerTaskConductor, bool, error) { if ptc, ok := ptm.findPeerTaskConductor(taskID); ok { @@ -241,7 +241,7 @@ func (ptm *peerTaskManager) createSplitedPeerTaskConductor( request *schedulerv1.PeerTaskRequest, limit rate.Limit, parent *peerTaskConductor, - rg *clientutil.Range, + rg *nethttp.Range, desiredLocation string, seed bool) (*peerTaskConductor, bool, error) { ptc := ptm.newPeerTaskConductor(ctx, request, limit, parent, rg, seed) @@ -259,7 +259,7 @@ func (ptm *peerTaskManager) createSplitedPeerTaskConductor( return ptc, true, nil } -func (ptm *peerTaskManager) enabledPrefetch(rg *clientutil.Range) bool { +func (ptm *peerTaskManager) enabledPrefetch(rg *nethttp.Range) bool { return ptm.Prefetch && rg != nil } diff --git a/client/daemon/peer/peertask_manager_test.go b/client/daemon/peer/peertask_manager_test.go index d82a6cbf515..6c7b47a3a75 100644 --- a/client/daemon/peer/peertask_manager_test.go +++ b/client/daemon/peer/peertask_manager_test.go @@ -58,6 +58,7 @@ import ( "d7y.io/dragonfly/v2/pkg/dfnet" "d7y.io/dragonfly/v2/pkg/digest" "d7y.io/dragonfly/v2/pkg/idgen" + nethttp "d7y.io/dragonfly/v2/pkg/net/http" "d7y.io/dragonfly/v2/pkg/rpc" daemonserver "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/server" schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client" @@ -337,7 +338,7 @@ type testSpec struct { taskType int name string taskData []byte - httpRange *util.Range // only used in back source cases + httpRange *nethttp.Range // only used in back source cases pieceParallelCount int32 pieceSize int sizeScope commonv1.SizeScope @@ -354,7 +355,7 @@ type testSpec struct { backSource bool mockPieceDownloader func(ctrl *gomock.Controller, taskData []byte, pieceSize int) PieceDownloader - mockHTTPSourceClient func(t *testing.T, ctrl *gomock.Controller, rg *util.Range, taskData []byte, url string) source.ResourceClient + mockHTTPSourceClient func(t *testing.T, ctrl *gomock.Controller, rg *nethttp.Range, taskData []byte, url string) source.ResourceClient cleanUp []func() } @@ -450,7 +451,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) { url: "http://localhost/test/data", sizeScope: commonv1.SizeScope_NORMAL, mockPieceDownloader: nil, - mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *util.Range, taskData []byte, url string) source.ResourceClient { + mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *nethttp.Range, taskData []byte, url string) source.ResourceClient { sourceClient := sourcemocks.NewMockResourceClient(ctrl) sourceClient.EXPECT().GetContentLength(source.RequestEq(url)).AnyTimes().DoAndReturn( func(request *source.Request) (int64, error) { @@ -473,18 +474,18 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) { backSource: true, url: "http://localhost/test/data", sizeScope: commonv1.SizeScope_NORMAL, - httpRange: &util.Range{ + httpRange: &nethttp.Range{ Start: 0, Length: 4096, }, mockPieceDownloader: nil, - mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *util.Range, taskData []byte, url string) source.ResourceClient { + mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *nethttp.Range, taskData []byte, url string) source.ResourceClient { sourceClient := sourcemocks.NewMockResourceClient(ctrl) sourceClient.EXPECT().GetContentLength(source.RequestEq(url)).AnyTimes().DoAndReturn( func(request *source.Request) (int64, error) { assert := testifyassert.New(t) if rg != nil { - rgs, err := util.ParseRange(request.Header.Get(headers.Range), math.MaxInt64) + rgs, err := nethttp.ParseRange(request.Header.Get(headers.Range), math.MaxInt64) assert.Nil(err) assert.Equal(1, len(rgs)) assert.Equal(rg.String(), rgs[0].String()) @@ -495,7 +496,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) { func(request *source.Request) (*source.Response, error) { assert := testifyassert.New(t) if rg != nil { - rgs, err := util.ParseRange(request.Header.Get(headers.Range), math.MaxInt64) + rgs, err := nethttp.ParseRange(request.Header.Get(headers.Range), math.MaxInt64) assert.Nil(err) assert.Equal(1, len(rgs)) assert.Equal(rg.String(), rgs[0].String()) @@ -516,7 +517,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) { url: "http://localhost/test/data", sizeScope: commonv1.SizeScope_NORMAL, mockPieceDownloader: nil, - mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *util.Range, taskData []byte, url string) source.ResourceClient { + mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *nethttp.Range, taskData []byte, url string) source.ResourceClient { sourceClient := sourcemocks.NewMockResourceClient(ctrl) sourceClient.EXPECT().GetContentLength(source.RequestEq(url)).AnyTimes().DoAndReturn( func(request *source.Request) (int64, error) { @@ -540,7 +541,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) { url: "http://localhost/test/data", sizeScope: commonv1.SizeScope_NORMAL, mockPieceDownloader: nil, - mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *util.Range, taskData []byte, url string) source.ResourceClient { + mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *nethttp.Range, taskData []byte, url string) source.ResourceClient { sourceClient := sourcemocks.NewMockResourceClient(ctrl) sourceClient.EXPECT().GetContentLength(source.RequestEq(url)).AnyTimes().DoAndReturn( func(request *source.Request) (int64, error) { @@ -588,7 +589,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) { url: "http://localhost/test/data", sizeScope: commonv1.SizeScope_NORMAL, mockPieceDownloader: nil, - mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *util.Range, taskData []byte, url string) source.ResourceClient { + mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *nethttp.Range, taskData []byte, url string) source.ResourceClient { sourceClient := sourcemocks.NewMockResourceClient(ctrl) sourceClient.EXPECT().GetContentLength(source.RequestEq(url)).AnyTimes().DoAndReturn( func(request *source.Request) (int64, error) { diff --git a/client/daemon/peer/peertask_reuse.go b/client/daemon/peer/peertask_reuse.go index cf973deeaf8..5cb662aad99 100644 --- a/client/daemon/peer/peertask_reuse.go +++ b/client/daemon/peer/peertask_reuse.go @@ -31,9 +31,9 @@ import ( "d7y.io/dragonfly/v2/client/config" "d7y.io/dragonfly/v2/client/daemon/storage" - "d7y.io/dragonfly/v2/client/util" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/idgen" + "d7y.io/dragonfly/v2/pkg/net/http" ) var _ *logger.SugaredLoggerOnWith // pin this package for no log code generation @@ -51,7 +51,7 @@ func (ptm *peerTaskManager) tryReuseFilePeerTask(ctx context.Context, taskID := idgen.TaskIDV1(request.Url, request.UrlMeta) var ( reuse *storage.ReusePeerTask - reuseRange *util.Range // the range of parent peer task data to read + reuseRange *http.Range // the range of parent peer task data to read log *logger.SugaredLoggerOnWith length int64 err error @@ -200,7 +200,7 @@ func (ptm *peerTaskManager) tryReuseFilePeerTask(ctx context.Context, } func (ptm *peerTaskManager) storePartialFile(ctx context.Context, request *FileTaskRequest, - log *logger.SugaredLoggerOnWith, reuse *storage.ReusePeerTask, rg *util.Range) error { + log *logger.SugaredLoggerOnWith, reuse *storage.ReusePeerTask, rg *http.Range) error { f, err := os.OpenFile(request.Output, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644) if err != nil { log.Errorf("open dest file error when reuse peer task: %s", err) @@ -230,7 +230,7 @@ func (ptm *peerTaskManager) tryReuseStreamPeerTask(ctx context.Context, taskID := idgen.TaskIDV1(request.URL, request.URLMeta) var ( reuse *storage.ReusePeerTask - reuseRange *util.Range // the range of parent peer task data to read + reuseRange *http.Range // the range of parent peer task data to read log *logger.SugaredLoggerOnWith length int64 ) @@ -347,7 +347,7 @@ func (ptm *peerTaskManager) tryReuseSeedPeerTask(ctx context.Context, taskID := idgen.TaskIDV1(request.Url, request.UrlMeta) var ( reuse *storage.ReusePeerTask - reuseRange *util.Range // the range of parent peer task data to read + reuseRange *http.Range // the range of parent peer task data to read log *logger.SugaredLoggerOnWith ) diff --git a/client/daemon/peer/peertask_reuse_test.go b/client/daemon/peer/peertask_reuse_test.go index a24f33f4d44..82821275778 100644 --- a/client/daemon/peer/peertask_reuse_test.go +++ b/client/daemon/peer/peertask_reuse_test.go @@ -37,7 +37,7 @@ import ( "d7y.io/dragonfly/v2/client/daemon/storage" "d7y.io/dragonfly/v2/client/daemon/storage/mocks" "d7y.io/dragonfly/v2/client/daemon/test" - "d7y.io/dragonfly/v2/client/util" + "d7y.io/dragonfly/v2/pkg/net/http" ) func TestReuseFilePeerTask(t *testing.T) { @@ -120,7 +120,7 @@ func TestReuseFilePeerTask(t *testing.T) { enablePrefetch: false, storageManager: func(sm *mocks.MockManager) { //sm.EXPECT().FindPartialCompletedTask(gomock.Any(), gomock.Any()).DoAndReturn( - // func(taskID string, rg *util.Range) *storage.ReusePeerTask { + // func(taskID string, rg *http.Range) *storage.ReusePeerTask { // return nil // }) //sm.EXPECT().FindCompletedSubTask(gomock.Any()).DoAndReturn( @@ -152,7 +152,7 @@ func TestReuseFilePeerTask(t *testing.T) { }, }, Output: testOutput, - Range: &util.Range{Start: 200, Length: 100}, + Range: &http.Range{Start: 200, Length: 100}, }, enablePrefetch: true, storageManager: func(sm *mocks.MockManager) { @@ -196,12 +196,12 @@ func TestReuseFilePeerTask(t *testing.T) { }, }, Output: testOutput, - Range: &util.Range{Start: 0, Length: 10}, + Range: &http.Range{Start: 0, Length: 10}, }, enablePrefetch: true, storageManager: func(sm *mocks.MockManager) { sm.EXPECT().FindPartialCompletedTask(gomock.Any(), gomock.Any()).DoAndReturn( - func(taskID string, rg *util.Range) *storage.ReusePeerTask { + func(taskID string, rg *http.Range) *storage.ReusePeerTask { return nil }) sm.EXPECT().FindCompletedSubTask(gomock.Any()).DoAndReturn( @@ -229,7 +229,7 @@ func TestReuseFilePeerTask(t *testing.T) { }, }, Output: testOutput, - Range: &util.Range{Start: 300, Length: 100}, + Range: &http.Range{Start: 300, Length: 100}, }, enablePrefetch: true, storageManager: func(sm *mocks.MockManager) { @@ -239,7 +239,7 @@ func TestReuseFilePeerTask(t *testing.T) { return nil }) sm.EXPECT().FindPartialCompletedTask(gomock.Any(), gomock.Any()).DoAndReturn( - func(id string, rg *util.Range) *storage.ReusePeerTask { + func(id string, rg *http.Range) *storage.ReusePeerTask { taskID = id return &storage.ReusePeerTask{ PeerTaskMetadata: storage.PeerTaskMetadata{ @@ -278,7 +278,7 @@ func TestReuseFilePeerTask(t *testing.T) { }, }, Output: testOutput, - Range: &util.Range{Start: 300, Length: 100000}, + Range: &http.Range{Start: 300, Length: 100000}, }, enablePrefetch: true, storageManager: func(sm *mocks.MockManager) { @@ -288,7 +288,7 @@ func TestReuseFilePeerTask(t *testing.T) { return nil }) sm.EXPECT().FindPartialCompletedTask(gomock.Any(), gomock.Any()).DoAndReturn( - func(id string, rg *util.Range) *storage.ReusePeerTask { + func(id string, rg *http.Range) *storage.ReusePeerTask { taskID = id return &storage.ReusePeerTask{ PeerTaskMetadata: storage.PeerTaskMetadata{ @@ -419,7 +419,7 @@ func TestReuseStreamPeerTask(t *testing.T) { enablePrefetch: false, storageManager: func(sm *mocks.MockManager) { //sm.EXPECT().FindPartialCompletedTask(gomock.Any(), gomock.Any()).DoAndReturn( - // func(taskID string, rg *util.Range) *storage.ReusePeerTask { + // func(taskID string, rg *http.Range) *storage.ReusePeerTask { // return nil // }) //sm.EXPECT().FindCompletedSubTask(gomock.Any()).DoAndReturn( @@ -448,7 +448,7 @@ func TestReuseStreamPeerTask(t *testing.T) { Filter: "", Header: nil, }, - Range: &util.Range{Start: 0, Length: 10}, + Range: &http.Range{Start: 0, Length: 10}, PeerID: "", }, enablePrefetch: true, @@ -500,13 +500,13 @@ func TestReuseStreamPeerTask(t *testing.T) { Filter: "", Header: nil, }, - Range: &util.Range{Start: 0, Length: 10}, + Range: &http.Range{Start: 0, Length: 10}, PeerID: "", }, enablePrefetch: true, storageManager: func(sm *mocks.MockManager) { sm.EXPECT().FindPartialCompletedTask(gomock.Any(), gomock.Any()).DoAndReturn( - func(taskID string, rg *util.Range) *storage.ReusePeerTask { + func(taskID string, rg *http.Range) *storage.ReusePeerTask { return nil }) sm.EXPECT().FindCompletedSubTask(gomock.Any()).DoAndReturn( @@ -531,7 +531,7 @@ func TestReuseStreamPeerTask(t *testing.T) { Filter: "", Header: nil, }, - Range: &util.Range{Start: 0, Length: 10}, + Range: &http.Range{Start: 0, Length: 10}, PeerID: "", }, enablePrefetch: true, @@ -542,7 +542,7 @@ func TestReuseStreamPeerTask(t *testing.T) { return nil }) sm.EXPECT().FindPartialCompletedTask(gomock.Any(), gomock.Any()).DoAndReturn( - func(id string, rg *util.Range) *storage.ReusePeerTask { + func(id string, rg *http.Range) *storage.ReusePeerTask { taskID = id return &storage.ReusePeerTask{ PeerTaskMetadata: storage.PeerTaskMetadata{ @@ -587,7 +587,7 @@ func TestReuseStreamPeerTask(t *testing.T) { Filter: "", Header: nil, }, - Range: &util.Range{Start: 0, Length: 100}, + Range: &http.Range{Start: 0, Length: 100}, PeerID: "", }, enablePrefetch: true, @@ -598,7 +598,7 @@ func TestReuseStreamPeerTask(t *testing.T) { return nil }) sm.EXPECT().FindPartialCompletedTask(gomock.Any(), gomock.Any()).DoAndReturn( - func(id string, rg *util.Range) *storage.ReusePeerTask { + func(id string, rg *http.Range) *storage.ReusePeerTask { taskID = id return &storage.ReusePeerTask{ PeerTaskMetadata: storage.PeerTaskMetadata{ @@ -647,7 +647,7 @@ func TestReuseStreamPeerTask(t *testing.T) { Filter: "", Header: nil, }, - Range: &util.Range{Start: 100, Length: 100000}, + Range: &http.Range{Start: 100, Length: 100000}, PeerID: "", }, enablePrefetch: true, @@ -658,7 +658,7 @@ func TestReuseStreamPeerTask(t *testing.T) { return nil }) sm.EXPECT().FindPartialCompletedTask(gomock.Any(), gomock.Any()).DoAndReturn( - func(id string, rg *util.Range) *storage.ReusePeerTask { + func(id string, rg *http.Range) *storage.ReusePeerTask { taskID = id return &storage.ReusePeerTask{ PeerTaskMetadata: storage.PeerTaskMetadata{ diff --git a/client/daemon/peer/peertask_seed.go b/client/daemon/peer/peertask_seed.go index 4ed793c3d01..d428fe40045 100644 --- a/client/daemon/peer/peertask_seed.go +++ b/client/daemon/peer/peertask_seed.go @@ -25,14 +25,14 @@ import ( schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1" "d7y.io/dragonfly/v2/client/config" - "d7y.io/dragonfly/v2/client/util" "d7y.io/dragonfly/v2/pkg/idgen" + "d7y.io/dragonfly/v2/pkg/net/http" ) type SeedTaskRequest struct { schedulerv1.PeerTaskRequest Limit float64 - Range *util.Range + Range *http.Range } type SeedTaskResponse struct { diff --git a/client/daemon/peer/peertask_stream.go b/client/daemon/peer/peertask_stream.go index 670a107a087..fcd1279db14 100644 --- a/client/daemon/peer/peertask_stream.go +++ b/client/daemon/peer/peertask_stream.go @@ -31,9 +31,9 @@ import ( "d7y.io/dragonfly/v2/client/config" "d7y.io/dragonfly/v2/client/daemon/metrics" "d7y.io/dragonfly/v2/client/daemon/storage" - "d7y.io/dragonfly/v2/client/util" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/idgen" + "d7y.io/dragonfly/v2/pkg/net/http" ) type StreamTaskRequest struct { @@ -42,7 +42,7 @@ type StreamTaskRequest struct { // url meta info URLMeta *commonv1.UrlMeta // http range - Range *util.Range + Range *http.Range // peer's id and must be global uniqueness PeerID string } @@ -66,7 +66,7 @@ type streamTask struct { func (ptm *peerTaskManager) newStreamTask( ctx context.Context, request *schedulerv1.PeerTaskRequest, - rg *util.Range) (*streamTask, error) { + rg *http.Range) (*streamTask, error) { metrics.StreamTaskCount.Add(1) var limit = rate.Inf if ptm.PerPeerRateLimit > 0 { diff --git a/client/daemon/peer/piece_downloader_test.go b/client/daemon/peer/piece_downloader_test.go index f009cd8466c..406af121334 100644 --- a/client/daemon/peer/piece_downloader_test.go +++ b/client/daemon/peer/piece_downloader_test.go @@ -37,8 +37,8 @@ import ( commonv1 "d7y.io/api/pkg/apis/common/v1" "d7y.io/dragonfly/v2/client/daemon/test" - "d7y.io/dragonfly/v2/client/util" logger "d7y.io/dragonfly/v2/internal/dflog" + nethttp "d7y.io/dragonfly/v2/pkg/net/http" "d7y.io/dragonfly/v2/pkg/source" "d7y.io/dragonfly/v2/pkg/source/clients/httpprotocol" ) @@ -78,7 +78,7 @@ func TestPieceDownloader_DownloadPiece(t *testing.T) { { handleFunc: func(w http.ResponseWriter, r *http.Request) { assert.Equal("/download/tas/task-1", r.URL.Path) - rg := util.MustParseRange(r.Header.Get("Range"), math.MaxInt64) + rg := nethttp.MustParseRange(r.Header.Get("Range"), math.MaxInt64) w.Header().Set(headers.ContentLength, fmt.Sprintf("%d", rg.Length)) if _, err := w.Write(testData[rg.Start : rg.Start+rg.Length]); err != nil { t.Error(err) @@ -93,7 +93,7 @@ func TestPieceDownloader_DownloadPiece(t *testing.T) { { handleFunc: func(w http.ResponseWriter, r *http.Request) { assert.Equal("/download/tas/task-2", r.URL.Path) - rg := util.MustParseRange(r.Header.Get("Range"), math.MaxInt64) + rg := nethttp.MustParseRange(r.Header.Get("Range"), math.MaxInt64) w.Header().Set(headers.ContentLength, fmt.Sprintf("%d", rg.Length)) if _, err := w.Write(testData[rg.Start : rg.Start+rg.Length]); err != nil { t.Error(err) @@ -108,7 +108,7 @@ func TestPieceDownloader_DownloadPiece(t *testing.T) { { handleFunc: func(w http.ResponseWriter, r *http.Request) { assert.Equal("/download/tas/task-3", r.URL.Path) - rg := util.MustParseRange(r.Header.Get("Range"), math.MaxInt64) + rg := nethttp.MustParseRange(r.Header.Get("Range"), math.MaxInt64) w.Header().Set(headers.ContentLength, fmt.Sprintf("%d", rg.Length)) if _, err := w.Write(testData[rg.Start : rg.Start+rg.Length]); err != nil { t.Error(err) diff --git a/client/daemon/peer/piece_manager.go b/client/daemon/peer/piece_manager.go index aa35e0cfcc4..4fd92cb8e08 100644 --- a/client/daemon/peer/piece_manager.go +++ b/client/daemon/peer/piece_manager.go @@ -44,17 +44,16 @@ import ( "d7y.io/dragonfly/v2/client/config" "d7y.io/dragonfly/v2/client/daemon/storage" - clientutil "d7y.io/dragonfly/v2/client/util" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/internal/util" "d7y.io/dragonfly/v2/pkg/digest" - httputil "d7y.io/dragonfly/v2/pkg/net/http" + nethttp "d7y.io/dragonfly/v2/pkg/net/http" "d7y.io/dragonfly/v2/pkg/retry" "d7y.io/dragonfly/v2/pkg/source" ) type PieceManager interface { - DownloadSource(ctx context.Context, pt Task, request *schedulerv1.PeerTaskRequest, parsedRange *clientutil.Range) error + DownloadSource(ctx context.Context, pt Task, request *schedulerv1.PeerTaskRequest, parsedRange *nethttp.Range) error DownloadPiece(ctx context.Context, request *DownloadPieceRequest) (*DownloadPieceResult, error) ImportFile(ctx context.Context, ptm storage.PeerTaskMetadata, tsd storage.TaskStorageDriver, req *dfdaemonv1.ImportTaskRequest) error Import(ctx context.Context, ptm storage.PeerTaskMetadata, tsd storage.TaskStorageDriver, contentLength int64, reader io.Reader) error @@ -215,7 +214,7 @@ func (pm *pieceManager) DownloadPiece(ctx context.Context, request *DownloadPiec Num: request.piece.PieceNum, Md5: request.piece.PieceMd5, Offset: request.piece.PieceOffset, - Range: clientutil.Range{ + Range: nethttp.Range{ Start: int64(request.piece.RangeStart), Length: int64(request.piece.RangeSize), }, @@ -275,7 +274,7 @@ func (pm *pieceManager) processPieceFromSource(pt Task, // storage manager will get digest from Reader, keep empty here is ok Md5: "", Offset: pieceOffset, - Range: clientutil.Range{ + Range: nethttp.Range{ Start: int64(pieceOffset), Length: int64(pieceSize), }, @@ -298,7 +297,7 @@ func (pm *pieceManager) processPieceFromSource(pt Task, return } -func (pm *pieceManager) DownloadSource(ctx context.Context, pt Task, peerTaskRequest *schedulerv1.PeerTaskRequest, parsedRange *clientutil.Range) error { +func (pm *pieceManager) DownloadSource(ctx context.Context, pt Task, peerTaskRequest *schedulerv1.PeerTaskRequest, parsedRange *nethttp.Range) error { if peerTaskRequest.UrlMeta == nil { peerTaskRequest.UrlMeta = &commonv1.UrlMeta{ Header: map[string]string{}, @@ -336,16 +335,16 @@ func (pm *pieceManager) DownloadSource(ctx context.Context, pt Task, peerTaskReq supportConcurrent = true if parsedRange != nil { // we have the total content length, parse the real range - newRange, err := httputil.ParseRange(peerTaskRequest.UrlMeta.Range, uint64(metadata.TotalContentLength)) + newRanges, err := nethttp.ParseURLMetaRange(peerTaskRequest.UrlMeta.Range, metadata.TotalContentLength) if err != nil { log.Errorf("update task error: %s", err) return err } - parsedRange.Start = int64(newRange.StartIndex) - parsedRange.Length = int64(newRange.Length()) + parsedRange.Start = newRanges.Start + parsedRange.Length = newRanges.Length } else { // for non-ranged request, add a dummy range - parsedRange = &clientutil.Range{ + parsedRange = &nethttp.Range{ Start: 0, Length: metadata.TotalContentLength, } @@ -462,7 +461,7 @@ singleDownload: return pm.downloadKnownLengthSource(ctx, pt, contentLength, pieceSize, reader, response, peerTaskRequest, parsedRange, supportConcurrent) } -func (pm *pieceManager) downloadKnownLengthSource(ctx context.Context, pt Task, contentLength int64, pieceSize uint32, reader io.Reader, response *source.Response, peerTaskRequest *schedulerv1.PeerTaskRequest, parsedRange *clientutil.Range, supportConcurrent bool) error { +func (pm *pieceManager) downloadKnownLengthSource(ctx context.Context, pt Task, contentLength int64, pieceSize uint32, reader io.Reader, response *source.Response, peerTaskRequest *schedulerv1.PeerTaskRequest, parsedRange *nethttp.Range, supportConcurrent bool) error { log := pt.Log() maxPieceNum := pt.GetTotalPieces() for pieceNum := int32(0); pieceNum < maxPieceNum; pieceNum++ { @@ -626,7 +625,7 @@ func (pm *pieceManager) processPieceFromFile(ctx context.Context, ptm storage.Pe // storage manager will get digest from Reader, keep empty here is ok Md5: "", Offset: pieceOffset, - Range: clientutil.Range{ + Range: nethttp.Range{ Start: int64(pieceOffset), Length: int64(pieceSize), }, @@ -771,7 +770,7 @@ func (pm *pieceManager) Import(ctx context.Context, ptm storage.PeerTaskMetadata return nil } -func (pm *pieceManager) concurrentDownloadSource(ctx context.Context, pt Task, peerTaskRequest *schedulerv1.PeerTaskRequest, parsedRange *clientutil.Range, startPieceNum int32) error { +func (pm *pieceManager) concurrentDownloadSource(ctx context.Context, pt Task, peerTaskRequest *schedulerv1.PeerTaskRequest, parsedRange *nethttp.Range, startPieceNum int32) error { // parsedRange is always exist pieceSize := pm.computePieceSize(parsedRange.Length) pieceCount := util.ComputePieceCount(parsedRange.Length, pieceSize) @@ -863,7 +862,7 @@ func (pm *pieceManager) downloadPieceFromSource(ctx context.Context, pt Task, log *logger.SugaredLoggerOnWith, peerTaskRequest *schedulerv1.PeerTaskRequest, pieceSize uint32, num int32, - parsedRange *clientutil.Range, + parsedRange *nethttp.Range, pieceCount int32, downloadedPieceCount *atomic.Int32) error { backSourceRequest, err := source.NewRequestWithContext(ctx, peerTaskRequest.Url, peerTaskRequest.UrlMeta.Header) diff --git a/client/daemon/peer/piece_manager_mock.go b/client/daemon/peer/piece_manager_mock.go index 40b865b15e7..fb04695640c 100644 --- a/client/daemon/peer/piece_manager_mock.go +++ b/client/daemon/peer/piece_manager_mock.go @@ -12,7 +12,7 @@ import ( dfdaemon "d7y.io/api/pkg/apis/dfdaemon/v1" scheduler "d7y.io/api/pkg/apis/scheduler/v1" storage "d7y.io/dragonfly/v2/client/daemon/storage" - util "d7y.io/dragonfly/v2/client/util" + http "d7y.io/dragonfly/v2/pkg/net/http" gomock "github.com/golang/mock/gomock" ) @@ -55,7 +55,7 @@ func (mr *MockPieceManagerMockRecorder) DownloadPiece(ctx, request interface{}) } // DownloadSource mocks base method. -func (m *MockPieceManager) DownloadSource(ctx context.Context, pt Task, request *scheduler.PeerTaskRequest, parsedRange *util.Range) error { +func (m *MockPieceManager) DownloadSource(ctx context.Context, pt Task, request *scheduler.PeerTaskRequest, parsedRange *http.Range) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "DownloadSource", ctx, pt, request, parsedRange) ret0, _ := ret[0].(error) diff --git a/client/daemon/peer/piece_manager_test.go b/client/daemon/peer/piece_manager_test.go index 8f5a4c3e3e8..efdd53a9e68 100644 --- a/client/daemon/peer/piece_manager_test.go +++ b/client/daemon/peer/piece_manager_test.go @@ -45,6 +45,7 @@ import ( clientutil "d7y.io/dragonfly/v2/client/util" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/internal/util" + nethttp "d7y.io/dragonfly/v2/pkg/net/http" _ "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/server" "d7y.io/dragonfly/v2/pkg/source" "d7y.io/dragonfly/v2/pkg/source/clients/httpprotocol" @@ -397,7 +398,7 @@ func TestPieceManager_DownloadSource(t *testing.T) { buf = bytes.NewBuffer(testBytes) l = len(testBytes) } else { - parsedRange, err := clientutil.ParseRange(rg, int64(len(testBytes))) + parsedRange, err := nethttp.ParseRange(rg, int64(len(testBytes))) assert.Nil(err) w.Header().Set(headers.ContentRange, fmt.Sprintf("bytes %d-%d/%d", diff --git a/client/daemon/peer/traffic_shaper_test.go b/client/daemon/peer/traffic_shaper_test.go index 315f31136e9..acd3b01eef4 100644 --- a/client/daemon/peer/traffic_shaper_test.go +++ b/client/daemon/peer/traffic_shaper_test.go @@ -53,6 +53,7 @@ import ( "d7y.io/dragonfly/v2/pkg/dfnet" "d7y.io/dragonfly/v2/pkg/digest" "d7y.io/dragonfly/v2/pkg/idgen" + nethttp "d7y.io/dragonfly/v2/pkg/net/http" "d7y.io/dragonfly/v2/pkg/rpc" daemonserver "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/server" schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client" @@ -300,7 +301,7 @@ type trafficShaperTestSpec struct { totalRateLimit rate.Limit backSource bool mockPieceDownloader func(ctrl *gomock.Controller, taskData []byte, pieceSize int) PieceDownloader - mockHTTPSourceClient func(t *testing.T, ctrl *gomock.Controller, rg *util.Range, taskData []byte, url string) source.ResourceClient + mockHTTPSourceClient func(t *testing.T, ctrl *gomock.Controller, rg *nethttp.Range, taskData []byte, url string) source.ResourceClient cleanUp []func() } @@ -321,8 +322,8 @@ func TestTrafficShaper_TaskSuite(t *testing.T) { }) return downloader } - sourceClient := func(hasLength bool) func(t *testing.T, ctrl *gomock.Controller, rg *util.Range, taskData []byte, url string) source.ResourceClient { - return func(t *testing.T, ctrl *gomock.Controller, rg *util.Range, taskData []byte, url string) source.ResourceClient { + sourceClient := func(hasLength bool) func(t *testing.T, ctrl *gomock.Controller, rg *nethttp.Range, taskData []byte, url string) source.ResourceClient { + return func(t *testing.T, ctrl *gomock.Controller, rg *nethttp.Range, taskData []byte, url string) source.ResourceClient { sourceClient := sourcemocks.NewMockResourceClient(ctrl) sourceClient.EXPECT().GetContentLength(gomock.Any()).AnyTimes().DoAndReturn( func(request *source.Request) (int64, error) { diff --git a/client/daemon/rpcserver/rpcserver.go b/client/daemon/rpcserver/rpcserver.go index 3ab71014d17..b852ab09bf5 100644 --- a/client/daemon/rpcserver/rpcserver.go +++ b/client/daemon/rpcserver/rpcserver.go @@ -722,14 +722,14 @@ func (s *server) download(ctx context.Context, req *dfdaemonv1.DownRequest, stre KeepOriginalOffset: req.KeepOriginalOffset, } if len(req.UrlMeta.Range) > 0 { - r, err := http.ParseRange(req.UrlMeta.Range, math.MaxInt64) + r, err := http.ParseURLMetaRange(req.UrlMeta.Range, math.MaxInt64) if err != nil { err = fmt.Errorf("parse range %s error: %s", req.UrlMeta.Range, err) return err } - peerTask.Range = &util.Range{ - Start: int64(r.StartIndex), - Length: int64(r.Length()), + peerTask.Range = &http.Range{ + Start: r.Start, + Length: r.Length, } } diff --git a/client/daemon/rpcserver/seeder.go b/client/daemon/rpcserver/seeder.go index a0a73f5b08d..de736ca3cc6 100644 --- a/client/daemon/rpcserver/seeder.go +++ b/client/daemon/rpcserver/seeder.go @@ -32,7 +32,6 @@ import ( "d7y.io/dragonfly/v2/client/config" "d7y.io/dragonfly/v2/client/daemon/metrics" "d7y.io/dragonfly/v2/client/daemon/peer" - clientutil "d7y.io/dragonfly/v2/client/util" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/idgen" "d7y.io/dragonfly/v2/pkg/net/http" @@ -80,16 +79,16 @@ func (s *seeder) ObtainSeeds(seedRequest *cdnsystemv1.SeedRequest, seedsServer c log := logger.With("peer", req.PeerId, "task", seedRequest.TaskId, "component", "seedService") if len(req.UrlMeta.Range) > 0 { - r, err := http.ParseRange(req.UrlMeta.Range, math.MaxInt64) + r, err := http.ParseURLMetaRange(req.UrlMeta.Range, math.MaxInt64) if err != nil { metrics.SeedPeerDownloadFailureCount.Add(1) err = fmt.Errorf("parse range %s error: %s", req.UrlMeta.Range, err) log.Errorf(err.Error()) return err } - req.Range = &clientutil.Range{ - Start: int64(r.StartIndex), - Length: int64(r.Length()), + req.Range = &http.Range{ + Start: r.Start, + Length: r.Length, } } diff --git a/client/daemon/storage/local_storage.go b/client/daemon/storage/local_storage.go index 9656ae793c2..be064431056 100644 --- a/client/daemon/storage/local_storage.go +++ b/client/daemon/storage/local_storage.go @@ -33,10 +33,10 @@ import ( commonv1 "d7y.io/api/pkg/apis/common/v1" - clientutil "d7y.io/dragonfly/v2/client/util" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/internal/util" "d7y.io/dragonfly/v2/pkg/digest" + "d7y.io/dragonfly/v2/pkg/net/http" "d7y.io/dragonfly/v2/pkg/source" ) @@ -634,7 +634,7 @@ func (t *localTaskStore) saveMetadata() error { return err } -func (t *localTaskStore) partialCompleted(rg *clientutil.Range) bool { +func (t *localTaskStore) partialCompleted(rg *http.Range) bool { t.RLock() defer t.RUnlock() @@ -642,7 +642,7 @@ func (t *localTaskStore) partialCompleted(rg *clientutil.Range) bool { return false } - realRange := &clientutil.Range{ + realRange := &http.Range{ Start: rg.Start, Length: rg.Length, } @@ -666,7 +666,7 @@ func (t *localTaskStore) partialCompleted(rg *clientutil.Range) bool { return true } -func computePiecePosition(total int64, rg *clientutil.Range, compute func(length int64) uint32) (start, end int32) { +func computePiecePosition(total int64, rg *http.Range, compute func(length int64) uint32) (start, end int32) { pieceSize := compute(total) start = int32(math.Floor(float64(rg.Start) / float64(pieceSize))) end = int32(math.Floor(float64(rg.Start+rg.Length-1) / float64(pieceSize))) diff --git a/client/daemon/storage/local_storage_subtask.go b/client/daemon/storage/local_storage_subtask.go index c10fed0a122..4d238d3795e 100644 --- a/client/daemon/storage/local_storage_subtask.go +++ b/client/daemon/storage/local_storage_subtask.go @@ -26,9 +26,9 @@ import ( commonv1 "d7y.io/api/pkg/apis/common/v1" - "d7y.io/dragonfly/v2/client/util" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/digest" + "d7y.io/dragonfly/v2/pkg/net/http" ) // TODO need refactor with localTaskStore, currently, localSubTaskStore code copies from localTaskStore @@ -41,7 +41,7 @@ type localSubTaskStore struct { // when digest not match, invalid will be set invalid atomic.Bool - Range *util.Range + Range *http.Range } func (t *localSubTaskStore) WritePiece(ctx context.Context, req *WritePieceRequest) (int64, error) { diff --git a/client/daemon/storage/local_storage_test.go b/client/daemon/storage/local_storage_test.go index a7a7302eaae..a3f48653742 100644 --- a/client/daemon/storage/local_storage_test.go +++ b/client/daemon/storage/local_storage_test.go @@ -39,6 +39,7 @@ import ( logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/internal/util" "d7y.io/dragonfly/v2/pkg/digest" + "d7y.io/dragonfly/v2/pkg/net/http" _ "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/server" ) @@ -117,7 +118,7 @@ func TestLocalTaskStore_PutAndGetPiece(t *testing.T) { PeerID: peerID, TaskID: taskID, }, - Range: &clientutil.Range{ + Range: &http.Range{ Start: 100, Length: int64(len(testBytes)), }, @@ -188,7 +189,7 @@ func TestLocalTaskStore_PutAndGetPiece(t *testing.T) { Num: int32(p.index), Md5: piecesMd5[p.index], Offset: uint64(p.start), - Range: clientutil.Range{ + Range: http.Range{ Start: int64(p.start), Length: int64(p.end - p.start), }, @@ -218,7 +219,7 @@ func TestLocalTaskStore_PutAndGetPiece(t *testing.T) { Num: int32(p.index), Md5: piecesMd5[p.index], Offset: uint64(p.start), - Range: clientutil.Range{ + Range: http.Range{ Start: int64(p.start), Length: int64(p.end - p.start), }, @@ -359,7 +360,7 @@ func TestLocalTaskStore_StoreTaskData_Simple(t *testing.T) { assert.Equal(testData, bs, "data must match") } -func calcFileMd5(filePath string, rg *clientutil.Range) (string, error) { +func calcFileMd5(filePath string, rg *http.Range) (string, error) { var md5String string file, err := os.Open(filePath) if err != nil { @@ -395,7 +396,7 @@ func Test_computePiecePosition(t *testing.T) { var testCases = []struct { name string total int64 - rg *clientutil.Range + rg *http.Range start int32 end int32 piece uint32 @@ -403,7 +404,7 @@ func Test_computePiecePosition(t *testing.T) { { name: "0", total: 500, - rg: &clientutil.Range{ + rg: &http.Range{ Start: 0, Length: 10, }, @@ -414,7 +415,7 @@ func Test_computePiecePosition(t *testing.T) { { name: "1", total: 500, - rg: &clientutil.Range{ + rg: &http.Range{ Start: 30, Length: 60, }, @@ -425,7 +426,7 @@ func Test_computePiecePosition(t *testing.T) { { name: "2", total: 500, - rg: &clientutil.Range{ + rg: &http.Range{ Start: 30, Length: 130, }, @@ -436,7 +437,7 @@ func Test_computePiecePosition(t *testing.T) { { name: "3", total: 500, - rg: &clientutil.Range{ + rg: &http.Range{ Start: 350, Length: 100, }, @@ -447,7 +448,7 @@ func Test_computePiecePosition(t *testing.T) { { name: "4", total: 500, - rg: &clientutil.Range{ + rg: &http.Range{ Start: 400, Length: 100, }, @@ -458,7 +459,7 @@ func Test_computePiecePosition(t *testing.T) { { name: "5", total: 500, - rg: &clientutil.Range{ + rg: &http.Range{ Start: 0, Length: 500, }, @@ -485,14 +486,14 @@ func TestLocalTaskStore_partialCompleted(t *testing.T) { name string ContentLength int64 ReadyPieceCount int32 - Range clientutil.Range + Range http.Range Found bool }{ { name: "range bytes=x-y partial completed", ContentLength: 1024, ReadyPieceCount: 1, - Range: clientutil.Range{ + Range: http.Range{ Start: 1, Length: 1023, }, @@ -502,7 +503,7 @@ func TestLocalTaskStore_partialCompleted(t *testing.T) { name: "range bytes=x-y no partial completed", ContentLength: util.DefaultPieceSize * 10, ReadyPieceCount: 1, - Range: clientutil.Range{ + Range: http.Range{ Start: 1, Length: util.DefaultPieceSize * 2, }, @@ -512,7 +513,7 @@ func TestLocalTaskStore_partialCompleted(t *testing.T) { name: "range bytes=x- no partial completed", ContentLength: util.DefaultPieceSizeLimit * 1, ReadyPieceCount: 1, - Range: clientutil.Range{ + Range: http.Range{ Start: 1, Length: math.MaxInt - 1, }, diff --git a/client/daemon/storage/metadata.go b/client/daemon/storage/metadata.go index e5599981147..bc0ec1046ca 100644 --- a/client/daemon/storage/metadata.go +++ b/client/daemon/storage/metadata.go @@ -21,7 +21,7 @@ import ( commonv1 "d7y.io/api/pkg/apis/common/v1" - "d7y.io/dragonfly/v2/client/util" + "d7y.io/dragonfly/v2/pkg/net/http" "d7y.io/dragonfly/v2/pkg/source" ) @@ -48,7 +48,7 @@ type PieceMetadata struct { Num int32 `json:"num,omitempty"` Md5 string `json:"md5,omitempty"` Offset uint64 `json:"offset,omitempty"` - Range util.Range `json:"range,omitempty"` + Range http.Range `json:"range,omitempty"` Style commonv1.PieceStyle `json:"style,omitempty"` // time(nanosecond) consumed Cost uint64 `json:"cost,omitempty"` @@ -94,13 +94,13 @@ type ReadPieceRequest struct { type ReadAllPiecesRequest struct { PeerTaskMetadata - Range *util.Range + Range *http.Range } type RegisterSubTaskRequest struct { Parent PeerTaskMetadata SubTask PeerTaskMetadata - Range *util.Range + Range *http.Range } type UpdateTaskRequest struct { diff --git a/client/daemon/storage/mocks/stroage_manager_mock.go b/client/daemon/storage/mocks/stroage_manager_mock.go index b80b2a7e2a7..53a52def8f1 100644 --- a/client/daemon/storage/mocks/stroage_manager_mock.go +++ b/client/daemon/storage/mocks/stroage_manager_mock.go @@ -12,7 +12,7 @@ import ( common "d7y.io/api/pkg/apis/common/v1" storage "d7y.io/dragonfly/v2/client/daemon/storage" - util "d7y.io/dragonfly/v2/client/util" + http "d7y.io/dragonfly/v2/pkg/net/http" gomock "github.com/golang/mock/gomock" ) @@ -340,7 +340,7 @@ func (mr *MockManagerMockRecorder) FindCompletedTask(taskID interface{}) *gomock } // FindPartialCompletedTask mocks base method. -func (m *MockManager) FindPartialCompletedTask(taskID string, rg *util.Range) *storage.ReusePeerTask { +func (m *MockManager) FindPartialCompletedTask(taskID string, rg *http.Range) *storage.ReusePeerTask { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "FindPartialCompletedTask", taskID, rg) ret0, _ := ret[0].(*storage.ReusePeerTask) diff --git a/client/daemon/storage/storage_manager.go b/client/daemon/storage/storage_manager.go index bc3a0d66a22..74db496df2c 100644 --- a/client/daemon/storage/storage_manager.go +++ b/client/daemon/storage/storage_manager.go @@ -45,6 +45,7 @@ import ( "d7y.io/dragonfly/v2/client/daemon/gc" "d7y.io/dragonfly/v2/client/util" logger "d7y.io/dragonfly/v2/internal/dflog" + nethttp "d7y.io/dragonfly/v2/pkg/net/http" ) type TaskStorageDriver interface { @@ -104,7 +105,7 @@ type Manager interface { // FindCompletedSubTask try to find a completed subtask for fast path FindCompletedSubTask(taskID string) *ReusePeerTask // FindPartialCompletedTask try to find a partial completed task for fast path - FindPartialCompletedTask(taskID string, rg *util.Range) *ReusePeerTask + FindPartialCompletedTask(taskID string, rg *nethttp.Range) *ReusePeerTask // CleanUp cleans all storage data CleanUp() } @@ -515,7 +516,7 @@ func (s *storageManager) FindCompletedTask(taskID string) *ReusePeerTask { return nil } -func (s *storageManager) FindPartialCompletedTask(taskID string, rg *util.Range) *ReusePeerTask { +func (s *storageManager) FindPartialCompletedTask(taskID string, rg *nethttp.Range) *ReusePeerTask { s.indexRWMutex.RLock() defer s.indexRWMutex.RUnlock() ts, ok := s.indexTask2PeerTask[taskID] diff --git a/client/daemon/transport/transport.go b/client/daemon/transport/transport.go index 5186ec069f3..58684e18ac1 100644 --- a/client/daemon/transport/transport.go +++ b/client/daemon/transport/transport.go @@ -43,7 +43,6 @@ import ( "d7y.io/dragonfly/v2/client/config" "d7y.io/dragonfly/v2/client/daemon/metrics" "d7y.io/dragonfly/v2/client/daemon/peer" - "d7y.io/dragonfly/v2/client/util" logger "d7y.io/dragonfly/v2/internal/dflog" nethttp "d7y.io/dragonfly/v2/pkg/net/http" ) @@ -245,11 +244,11 @@ func (rt *transport) download(ctx context.Context, req *http.Request) (*http.Res // Init meta value meta := &commonv1.UrlMeta{Header: map[string]string{}} - var rg *util.Range + var rg *nethttp.Range // Set meta range's value if rangeHeader := req.Header.Get("Range"); len(rangeHeader) > 0 { - rgs, err := util.ParseRange(rangeHeader, math.MaxInt64) + rgs, err := nethttp.ParseRange(rangeHeader, math.MaxInt64) if err != nil { span.RecordError(err) return badRequest(req, err.Error()) diff --git a/client/daemon/upload/upload_manager.go b/client/daemon/upload/upload_manager.go index 6a9a546ad72..b9c9afe1c36 100644 --- a/client/daemon/upload/upload_manager.go +++ b/client/daemon/upload/upload_manager.go @@ -40,8 +40,8 @@ import ( "d7y.io/dragonfly/v2/client/config" "d7y.io/dragonfly/v2/client/daemon/storage" - "d7y.io/dragonfly/v2/client/util" logger "d7y.io/dragonfly/v2/internal/dflog" + nethttp "d7y.io/dragonfly/v2/pkg/net/http" ) const ( @@ -211,7 +211,7 @@ func (um *uploadManager) getDownload(ctx *gin.Context) { log := logger.WithTaskAndPeerID(taskID, peerID).With("component", "uploadManager") log.Debugf("upload piece for task %s/%s to %s, request header: %#v", taskID, peerID, ctx.Request.RemoteAddr, ctx.Request.Header) - rg, err := util.ParseRange(ctx.GetHeader(headers.Range), math.MaxInt64) + rg, err := nethttp.ParseRange(ctx.GetHeader(headers.Range), math.MaxInt64) if err != nil { log.Errorf("parse range with error: %s", err) ctx.JSON(http.StatusBadRequest, gin.H{"errors": err.Error()}) diff --git a/client/util/range.go b/client/util/range.go deleted file mode 100644 index 2bf734da3c4..00000000000 --- a/client/util/range.go +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Copyright 2020 The Dragonfly Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package util - -import ( - "errors" - "fmt" - "net/textproto" - "strconv" - "strings" -) - -// Range specifies the byte range to be sent to the client. -type Range struct { - Start, Length int64 -} - -func (r Range) String() string { - return fmt.Sprintf("bytes=%d-%d", r.Start, r.Start+r.Length-1) -} - -// ErrNoOverlap is returned by ParseRange if first-byte-pos of -// all of the byte-range-spec values is greater than the content size. -var ErrNoOverlap = errors.New("invalid range: failed to overlap") - -// ParseRange parses a Range header string as per RFC 7233. -// ErrNoOverlap is returned if none of the ranges overlap. -// Example: -// -// "Range": "bytes=100-200" -// "Range": "bytes=-50" -// "Range": "bytes=150-" -// "Range": "bytes=0-0,-1" -// -// copy from go/1.15.2 net/http/fs.go ParseRange -func ParseRange(s string, size int64) ([]Range, error) { - if s == "" { - return nil, nil // header not present - } - const b = "bytes=" - if !strings.HasPrefix(s, b) { - return nil, errors.New("invalid range") - } - var ranges []Range - noOverlap := false - for _, ra := range strings.Split(s[len(b):], ",") { - ra = textproto.TrimString(ra) - if ra == "" { - continue - } - i := strings.Index(ra, "-") - if i < 0 { - return nil, errors.New("invalid range") - } - start, end := textproto.TrimString(ra[:i]), textproto.TrimString(ra[i+1:]) - var r Range - if start == "" { - // If no Serve is specified, end specifies the - // range Serve relative to the end of the file. - i, err := strconv.ParseInt(end, 10, 64) - if err != nil { - return nil, errors.New("invalid range") - } - if i > size { - i = size - } - r.Start = size - i - r.Length = size - r.Start - } else { - i, err := strconv.ParseInt(start, 10, 64) - if err != nil || i < 0 { - return nil, errors.New("invalid range") - } - if i >= size { - // If the range begins after the size of the content, - // then it does not overlap. - noOverlap = true - continue - } - r.Start = i - if end == "" { - // If no end is specified, range extends to end of the file. - r.Length = size - r.Start - } else { - i, err := strconv.ParseInt(end, 10, 64) - if err != nil || r.Start > i { - return nil, errors.New("invalid range") - } - if i >= size { - i = size - 1 - } - r.Length = i - r.Start + 1 - } - } - ranges = append(ranges, r) - } - if noOverlap && len(ranges) == 0 { - // The specified ranges did not overlap with the content. - return nil, ErrNoOverlap - } - return ranges, nil -} - -// Example: -// -// "Content-Range": "bytes 100-200/1000" -// "Content-Range": "bytes 100-200/*" -func GetContentRange(start, end, total int64) string { - // unknown total: -1 - if total == -1 { - return fmt.Sprintf("bytes %d-%d/*", start, end) - } - - return fmt.Sprintf("bytes %d-%d/%d", start, end, total) -} - -func MustParseRange(s string, size int64) Range { - rs, err := ParseRange(s, size) - if err != nil { - panic(fmt.Sprintf("parse range %q error: %s", s, err)) - } - if len(rs) != 1 { - panic(fmt.Sprintf("parse range length must be 1")) - } - return rs[0] -} - -func ParseOneRange(s string, size int64) (Range, error) { - rs, err := ParseRange(s, size) - if err != nil { - return Range{}, err - } - if len(rs) != 1 { - return Range{}, fmt.Errorf("parse range length must be 1") - } - return rs[0], nil -} diff --git a/client/util/range_test.go b/client/util/range_test.go deleted file mode 100644 index 594661ca588..00000000000 --- a/client/util/range_test.go +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Copyright 2020 The Dragonfly Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package util - -import ( - "testing" -) - -var ParseRangeTests = []struct { - s string - length int64 - r []Range -}{ - {"", 0, nil}, - {"", 1000, nil}, - {"foo", 0, nil}, - {"bytes=", 0, nil}, - {"bytes=7", 10, nil}, - {"bytes= 7 ", 10, nil}, - {"bytes=1-", 0, nil}, - {"bytes=5-4", 10, nil}, - {"bytes=0-2,5-4", 10, nil}, - {"bytes=2-5,4-3", 10, nil}, - {"bytes=--5,4--3", 10, nil}, - {"bytes=A-", 10, nil}, - {"bytes=A- ", 10, nil}, - {"bytes=A-Z", 10, nil}, - {"bytes= -Z", 10, nil}, - {"bytes=5-Z", 10, nil}, - {"bytes=Ran-dom, garbage", 10, nil}, - {"bytes=0x01-0x02", 10, nil}, - {"bytes= ", 10, nil}, - {"bytes= , , , ", 10, nil}, - - {"bytes=0-9", 10, []Range{{0, 10}}}, - {"bytes=0-", 10, []Range{{0, 10}}}, - {"bytes=5-", 10, []Range{{5, 5}}}, - {"bytes=0-20", 10, []Range{{0, 10}}}, - {"bytes=15-,0-5", 10, []Range{{0, 6}}}, - {"bytes=1-2,5-", 10, []Range{{1, 2}, {5, 5}}}, - {"bytes=-2 , 7-", 11, []Range{{9, 2}, {7, 4}}}, - {"bytes=0-0 ,2-2, 7-", 11, []Range{{0, 1}, {2, 1}, {7, 4}}}, - {"bytes=-5", 10, []Range{{5, 5}}}, - {"bytes=-15", 10, []Range{{0, 10}}}, - {"bytes=0-499", 10000, []Range{{0, 500}}}, - {"bytes=500-999", 10000, []Range{{500, 500}}}, - {"bytes=-500", 10000, []Range{{9500, 500}}}, - {"bytes=9500-", 10000, []Range{{9500, 500}}}, - {"bytes=0-0,-1", 10000, []Range{{0, 1}, {9999, 1}}}, - {"bytes=500-600,601-999", 10000, []Range{{500, 101}, {601, 399}}}, - {"bytes=500-700,601-999", 10000, []Range{{500, 201}, {601, 399}}}, - - // Match Apache laxity: - {"bytes= 1 -2 , 4- 5, 7 - 8 , ,,", 11, []Range{{1, 2}, {4, 2}, {7, 2}}}, -} - -func TestParseRange(t *testing.T) { - for _, test := range ParseRangeTests { - r := test.r - ranges, err := ParseRange(test.s, test.length) - if err != nil && r != nil { - t.Errorf("ParseRange(%q) returned error %q", test.s, err) - } - if len(ranges) != len(r) { - t.Errorf("len(ParseRange(%q)) = %d, want %d", test.s, len(ranges), len(r)) - continue - } - for i := range r { - if ranges[i].Start != r[i].Start { - t.Errorf("ParseRange(%q)[%d].Serve = %d, want %d", test.s, i, ranges[i].Start, r[i].Start) - } - if ranges[i].Length != r[i].Length { - t.Errorf("ParseRange(%q)[%d].Length = %d, want %d", test.s, i, ranges[i].Length, r[i].Length) - } - } - } -} - -func TestGetRange(t *testing.T) { - if GetContentRange(100, 200, -1) != "bytes 100-200/*" { - t.Fail() - } - if GetContentRange(100, 200, 500) != "bytes 100-200/500" { - t.Fail() - } -} diff --git a/manager/permission/rbac/rbac_test.go b/manager/permission/rbac/rbac_test.go index 796d7b61d41..d05727cb8b7 100644 --- a/manager/permission/rbac/rbac_test.go +++ b/manager/permission/rbac/rbac_test.go @@ -81,26 +81,26 @@ func TestGetApiGroupName(t *testing.T) { func TestHTTPMethodToAction(t *testing.T) { tests := []struct { method string - exceptedAction string + expectedAction string }{ { method: "GET", - exceptedAction: ReadAction, + expectedAction: ReadAction, }, { method: "POST", - exceptedAction: AllAction, + expectedAction: AllAction, }, { method: "UNKNOWN", - exceptedAction: ReadAction, + expectedAction: ReadAction, }, } for _, tt := range tests { action := HTTPMethodToAction(tt.method) - if action != tt.exceptedAction { - t.Errorf("HttpMethodToAction(%v) = %v, want %v", tt.method, action, tt.exceptedAction) + if action != tt.expectedAction { + t.Errorf("HttpMethodToAction(%v) = %v, want %v", tt.method, action, tt.expectedAction) } } } diff --git a/pkg/net/http/range.go b/pkg/net/http/range.go index 91aa6db4998..be26a698a64 100644 --- a/pkg/net/http/range.go +++ b/pkg/net/http/range.go @@ -17,145 +17,159 @@ package http import ( + "errors" "fmt" + "net/textproto" "strconv" "strings" ) const ( - separator = "-" -) + // RangePrefix is prefix of range header. + RangePrefix = "bytes=" -type Range struct { - StartIndex uint64 `json:"start_index"` - EndIndex uint64 `json:"end_index"` -} + // RangeSeparator is separator of range header. + RangeSeparator = "-" +) -func (r Range) String() string { - return fmt.Sprintf("%d%s%d", r.StartIndex, separator, r.EndIndex) -} +// ErrNoOverlap is returned by ParseRange if first-byte-pos of +// all of the byte-range-spec values is greater than the content size. +var ErrNoOverlap = errors.New("invalid range: failed to overlap") -func (r Range) Length() uint64 { - return r.EndIndex - r.StartIndex + 1 +// Range specifies the byte range to be sent to the client. +type Range struct { + Start, Length int64 } -// GetRange parses Range according to range string. -// rangeStr: "start-end" -func GetRange(rangeStr string) (r *Range, err error) { - ranges := strings.Split(rangeStr, separator) - if len(ranges) != 2 { - return nil, fmt.Errorf("range value(%s) is illegal which should be like 0-45535", rangeStr) - } - - startIndex, err := strconv.ParseUint(ranges[0], 10, 64) - if err != nil { - return nil, fmt.Errorf("range(%s) start is not a non-negative number", rangeStr) - } - endIndex, err := strconv.ParseUint(ranges[1], 10, 64) - if err != nil { - return nil, fmt.Errorf("range(%s) end is not a non-negative number", rangeStr) - } - - if endIndex < startIndex { - return nil, fmt.Errorf("range(%s) start is larger than end", rangeStr) - } - - return &Range{ - StartIndex: startIndex, - EndIndex: endIndex, - }, nil +// String specifies the string of http header. +func (r *Range) String() string { + return fmt.Sprintf("%s%d%s%d", RangePrefix, r.Start, RangeSeparator, r.Start+r.Length-1) } -// ParseRange parses the start and the end from rangeStr and returns them. -// length is file total length -func ParseRange(rangeStr string, length uint64) (*Range, error) { - if strings.Count(rangeStr, "-") != 1 { - return nil, fmt.Errorf("invalid range: %s, should be like 0-1023", rangeStr) - } - - // -{endIndex} - if strings.HasPrefix(rangeStr, "-") { - rangeStruct, err := handlePrefixRange(rangeStr, length) - if err != nil { - return nil, err +// ParseRange parses a Range header string as per RFC 7233. +// ErrNoOverlap is returned if none of the ranges overlap. +// Example: +// +// "Range": "bytes=100-200" +// "Range": "bytes=-50" +// "Range": "bytes=150-" +// "Range": "bytes=0-0,-1" +// +// copy from go/1.15.2 net/http/fs.go ParseRange +func ParseRange(s string, size int64) ([]Range, error) { + if s == "" { + return nil, nil // header not present + } + + const b = RangePrefix + if !strings.HasPrefix(s, b) { + return nil, errors.New("invalid range") + } + + var ranges []Range + noOverlap := false + for _, ra := range strings.Split(s[len(b):], ",") { + ra = textproto.TrimString(ra) + if ra == "" { + continue } - return rangeStruct, nil - } - // {startIndex}- - if strings.HasSuffix(rangeStr, "-") { - rangeStruct, err := handleSuffixRange(rangeStr, length) - if err != nil { - return nil, err + i := strings.Index(ra, "-") + if i < 0 { + return nil, errors.New("invalid range") + } + start, end := textproto.TrimString(ra[:i]), textproto.TrimString(ra[i+1:]) + + var r Range + if start == "" { + // If no Serve is specified, end specifies the + // range Serve relative to the end of the file. + i, err := strconv.ParseInt(end, 10, 64) + if err != nil { + return nil, errors.New("invalid range") + } + + if i > size { + i = size + } + + r.Start = size - i + r.Length = size - r.Start + } else { + i, err := strconv.ParseInt(start, 10, 64) + if err != nil || i < 0 { + return nil, errors.New("invalid range") + } + + if i >= size { + // If the range begins after the size of the content, + // then it does not overlap. + noOverlap = true + continue + } + + r.Start = i + if end == "" { + // If no end is specified, range extends to end of the file. + r.Length = size - r.Start + } else { + i, err := strconv.ParseInt(end, 10, 64) + if err != nil || r.Start > i { + return nil, errors.New("invalid range") + } + + if i >= size { + i = size - 1 + } + r.Length = i - r.Start + 1 + } } - return rangeStruct, nil - } - rangeStruct, err := handlePairRange(rangeStr, length) - if err != nil { - return nil, err + ranges = append(ranges, r) } - return rangeStruct, nil -} -func handlePrefixRange(rangeStr string, length uint64) (*Range, error) { - downLength, err := strconv.ParseUint(strings.TrimPrefix(rangeStr, "-"), 10, 64) - if err != nil { - return nil, fmt.Errorf("failed to parse range: %s to int: %v", rangeStr, err) + if noOverlap && len(ranges) == 0 { + // The specified ranges did not overlap with the content. + return nil, ErrNoOverlap } - if downLength > length { - return nil, fmt.Errorf("range: %s, the downLength is larger than length", rangeStr) - } - - return &Range{ - StartIndex: length - downLength, - EndIndex: length - 1, - }, nil + return ranges, nil } -func handleSuffixRange(rangeStr string, length uint64) (*Range, error) { - startIndex, err := strconv.ParseUint(strings.TrimSuffix(rangeStr, "-"), 10, 64) +// MustParseRange is like ParseRange but panics if the range cannot be parsed. +func MustParseRange(s string, size int64) Range { + rs, err := ParseRange(s, size) if err != nil { - return nil, fmt.Errorf("failed to parse range: %s to uint: %v", rangeStr, err) + panic(fmt.Sprintf("parse range %q error: %s", s, err)) } - if startIndex > length { - return nil, fmt.Errorf("range: %s, the startIndex is larger than length", rangeStr) + if len(rs) != 1 { + panic(fmt.Sprintf("parse range length must be 1")) } - return &Range{ - StartIndex: startIndex, - EndIndex: length - 1, - }, nil + return rs[0] } -func handlePairRange(rangeStr string, length uint64) (*Range, error) { - rangePair := strings.Split(rangeStr, "-") - - startIndex, err := strconv.ParseUint(rangePair[0], 10, 64) +// ParseOneRange parses only one range of Range header string as per RFC 7233. +func ParseOneRange(s string, size int64) (Range, error) { + rs, err := ParseRange(s, size) if err != nil { - return nil, fmt.Errorf("failed to parse range: %s to uint: %v", rangeStr, err) - } - if startIndex > length { - return nil, fmt.Errorf("range: %s, the startIndex is larger than length", rangeStr) + return Range{}, err } - endIndex, err := strconv.ParseUint(rangePair[1], 10, 64) - if err != nil { - return nil, fmt.Errorf("failed to parse range: %s to uint: %v", rangeStr, err) - } - if endIndex >= length { - //attention - endIndex = length - 1 + if len(rs) != 1 { + return Range{}, fmt.Errorf("parse range length must be 1") } - if endIndex < startIndex { - return nil, fmt.Errorf("range: %s, the start is larger the end", rangeStr) - } + return rs[0], nil +} - return &Range{ - StartIndex: startIndex, - EndIndex: endIndex, - }, nil +// ParseRange parses a Range string of grpc UrlMeta. +// Example: +// +// "Range": "100-200" +// "Range": "-50" +// "Range": "150-" +func ParseURLMetaRange(s string, size int64) (Range, error) { + return ParseOneRange(fmt.Sprintf("%s%s", RangePrefix, s), size) } diff --git a/pkg/net/http/range_test.go b/pkg/net/http/range_test.go index b201ed082b5..ca49844895b 100644 --- a/pkg/net/http/range_test.go +++ b/pkg/net/http/range_test.go @@ -22,94 +22,285 @@ import ( "github.com/stretchr/testify/assert" ) +func TestRange_String(t *testing.T) { + tests := []struct { + s string + rg Range + expect func(t *testing.T, s string) + }{ + { + s: "bytes=0-9", + rg: Range{ + Start: 0, + Length: 10, + }, + expect: func(t *testing.T, s string) { + assert := assert.New(t) + assert.Equal(s, "bytes=0-9") + }, + }, + { + s: "bytes=1-10", + rg: Range{ + Start: 1, + Length: 10, + }, + expect: func(t *testing.T, s string) { + assert := assert.New(t) + assert.Equal(s, "bytes=1-10") + }, + }, + { + s: "bytes=1-0", + rg: Range{ + Start: 1, + Length: 0, + }, + expect: func(t *testing.T, s string) { + assert := assert.New(t) + assert.Equal(s, "bytes=1-0") + }, + }, + { + s: "bytes=1-1", + rg: Range{ + Start: 1, + Length: 1, + }, + expect: func(t *testing.T, s string) { + assert := assert.New(t) + assert.Equal(s, "bytes=1-1") + }, + }, + } + + for _, tc := range tests { + t.Run(tc.s, func(t *testing.T) { + tc.expect(t, tc.rg.String()) + }) + } +} + func TestParseRange(t *testing.T) { - var cases = []struct { - rangeStr string - totalLength uint64 - targetLength uint64 - expected *Range - wantErr bool + tests := []struct { + s string + size int64 + rg []Range + }{ + {"", 0, nil}, + {"", 1000, nil}, + {"foo", 0, nil}, + {"bytes=", 0, nil}, + {"bytes=7", 10, nil}, + {"bytes= 7 ", 10, nil}, + {"bytes=1-", 0, nil}, + {"bytes=5-4", 10, nil}, + {"bytes=0-2,5-4", 10, nil}, + {"bytes=2-5,4-3", 10, nil}, + {"bytes=--5,4--3", 10, nil}, + {"bytes=A-", 10, nil}, + {"bytes=A- ", 10, nil}, + {"bytes=A-Z", 10, nil}, + {"bytes= -Z", 10, nil}, + {"bytes=5-Z", 10, nil}, + {"bytes=Ran-dom, garbage", 10, nil}, + {"bytes=0x01-0x02", 10, nil}, + {"bytes= ", 10, nil}, + {"bytes= , , , ", 10, nil}, + + {"bytes=0-9", 10, []Range{{0, 10}}}, + {"bytes=0-", 10, []Range{{0, 10}}}, + {"bytes=5-", 10, []Range{{5, 5}}}, + {"bytes=0-20", 10, []Range{{0, 10}}}, + {"bytes=15-,0-5", 10, []Range{{0, 6}}}, + {"bytes=1-2,5-", 10, []Range{{1, 2}, {5, 5}}}, + {"bytes=-2 , 7-", 11, []Range{{9, 2}, {7, 4}}}, + {"bytes=0-0 ,2-2, 7-", 11, []Range{{0, 1}, {2, 1}, {7, 4}}}, + {"bytes=-5", 10, []Range{{5, 5}}}, + {"bytes=-15", 10, []Range{{0, 10}}}, + {"bytes=0-499", 10000, []Range{{0, 500}}}, + {"bytes=500-999", 10000, []Range{{500, 500}}}, + {"bytes=-500", 10000, []Range{{9500, 500}}}, + {"bytes=9500-", 10000, []Range{{9500, 500}}}, + {"bytes=0-0,-1", 10000, []Range{{0, 1}, {9999, 1}}}, + {"bytes=500-600,601-999", 10000, []Range{{500, 101}, {601, 399}}}, + {"bytes=500-700,601-999", 10000, []Range{{500, 201}, {601, 399}}}, + + // Match Apache laxity: + {"bytes= 1 -2 , 4- 5, 7 - 8 , ,,", 11, []Range{{1, 2}, {4, 2}, {7, 2}}}, + } + + for _, tc := range tests { + rg := tc.rg + ranges, err := ParseRange(tc.s, tc.size) + if err != nil && rg != nil { + t.Errorf("ParseRange(%q) returned error %q", tc.s, err) + } + + if len(ranges) != len(rg) { + t.Errorf("len(ParseRange(%q)) = %d, want %d", tc.s, len(ranges), len(rg)) + continue + } + + for i := range rg { + if ranges[i].Start != rg[i].Start { + t.Errorf("ParseRange(%q)[%d].Serve = %d, want %d", tc.s, i, ranges[i].Start, rg[i].Start) + } + + if ranges[i].Length != rg[i].Length { + t.Errorf("ParseRange(%q)[%d].Length = %d, want %d", tc.s, i, ranges[i].Length, rg[i].Length) + } + } + } +} + +func TestParseOneRange(t *testing.T) { + tests := []struct { + s string + size int64 + rg Range + }{ + {"bytes=0-9", 10, Range{0, 10}}, + {"bytes=0-", 10, Range{0, 10}}, + {"bytes=5-", 10, Range{5, 5}}, + {"bytes=0-20", 10, Range{0, 10}}, + {"bytes=1-2", 10, Range{1, 2}}, + {"bytes=0-0", 11, Range{0, 1}}, + {"bytes=-5", 10, Range{5, 5}}, + {"bytes=-15", 10, Range{0, 10}}, + {"bytes=0-499", 10000, Range{0, 500}}, + {"bytes=500-999", 10000, Range{500, 500}}, + {"bytes=-500", 10000, Range{9500, 500}}, + {"bytes=9500-", 10000, Range{9500, 500}}, + {"bytes=0-0", 10000, Range{0, 1}}, + {"bytes=500-600", 10000, Range{500, 101}}, + {"bytes=500-700", 10000, Range{500, 201}}, + + // Match Apache laxity: + {"bytes= 1 -2 ", 11, Range{1, 2}}, + } + + for _, tc := range tests { + erg := tc.rg + rg, err := ParseOneRange(tc.s, tc.size) + if err != nil { + t.Errorf("ParseOneRange(%q) returned error %q", tc.s, err) + } + + if rg.Start != erg.Start { + t.Errorf("ParseOneRange(%q).Serve = %d, want %d", tc.s, rg.Start, erg.Start) + } + + if rg.Length != erg.Length { + t.Errorf("ParseOneRange(%q).Length = %d, want %d", tc.s, rg.Length, erg.Length) + } + } +} + +func TestParseURLMetaRange(t *testing.T) { + tests := []struct { + s string + size int64 + expect func(t *testing.T, rg Range, err error) }{ { - rangeStr: "0-65575", - totalLength: 65576, - targetLength: 65576, - expected: &Range{ - StartIndex: 0, - EndIndex: 65575, - }, - wantErr: false, - }, { - rangeStr: "2-2", - totalLength: 65576, - targetLength: 1, - expected: &Range{ - StartIndex: 2, - EndIndex: 2, - }, - wantErr: false, - }, { - rangeStr: "2-", - totalLength: 65576, - targetLength: 65574, - expected: &Range{ - StartIndex: 2, - EndIndex: 65575, - }, - wantErr: false, - }, { - rangeStr: "-100", - totalLength: 65576, - targetLength: 100, - expected: &Range{ - StartIndex: 65476, - EndIndex: 65575, - }, - wantErr: false, - }, { - rangeStr: "0-66575", - totalLength: 65576, - targetLength: 65576, - expected: &Range{ - StartIndex: 0, - EndIndex: 65575, - }, - wantErr: false, + s: "0-65575", + size: 65576, + expect: func(t *testing.T, rg Range, err error) { + assert := assert.New(t) + assert.NoError(err) + assert.EqualValues(rg, Range{ + Start: 0, + Length: 65576, + }) + }, }, { - rangeStr: "0-65-575", - totalLength: 65576, - expected: nil, - wantErr: true, + s: "2-2", + size: 65576, + expect: func(t *testing.T, rg Range, err error) { + assert := assert.New(t) + assert.NoError(err) + assert.EqualValues(rg, Range{ + Start: 2, + Length: 1, + }) + }, }, { - rangeStr: "0-hello", - totalLength: 65576, - expected: nil, - wantErr: true, + s: "2-", + size: 65576, + expect: func(t *testing.T, rg Range, err error) { + assert := assert.New(t) + assert.NoError(err) + assert.EqualValues(rg, Range{ + Start: 2, + Length: 65574, + }) + }, }, { - rangeStr: "65575-0", - totalLength: 65576, - expected: nil, - wantErr: true, + s: "-100", + size: 65576, + expect: func(t *testing.T, rg Range, err error) { + assert := assert.New(t) + assert.NoError(err) + assert.EqualValues(rg, Range{ + Start: 65476, + Length: 100, + }) + }, + }, + { + s: "0-66575", + size: 65576, + expect: func(t *testing.T, rg Range, err error) { + assert := assert.New(t) + assert.NoError(err) + assert.EqualValues(rg, Range{ + Start: 0, + Length: 65576, + }) + }, }, { - rangeStr: "-1-8", - totalLength: 65576, - expected: nil, - wantErr: true, + s: "0-65-575", + size: 65576, + expect: func(t *testing.T, rg Range, err error) { + assert := assert.New(t) + assert.Error(err) + }, + }, + { + s: "0-hello", + size: 65576, + expect: func(t *testing.T, rg Range, err error) { + assert := assert.New(t) + assert.Error(err) + }, + }, + { + s: "65575-0", + size: 65576, + expect: func(t *testing.T, rg Range, err error) { + assert := assert.New(t) + assert.Error(err) + }, + }, + { + s: "-1-8", + size: 65576, + expect: func(t *testing.T, rg Range, err error) { + assert := assert.New(t) + assert.Error(err) + }, }, } - for _, v := range cases { - t.Run(v.rangeStr, func(t *testing.T) { - result, err := ParseRange(v.rangeStr, v.totalLength) - assert.Equal(t, v.expected, result) - assert.Equal(t, v.wantErr, err != nil) - if !v.wantErr { - assert.Equal(t, v.targetLength, result.Length()) - } + for _, tc := range tests { + t.Run(tc.s, func(t *testing.T) { + rg, err := ParseURLMetaRange(tc.s, tc.size) + tc.expect(t, rg, err) }) } } diff --git a/pkg/source/clients/hdfsprotocol/hdfs_source_client.go b/pkg/source/clients/hdfsprotocol/hdfs_source_client.go index 916f401dd90..5333ae26784 100644 --- a/pkg/source/clients/hdfsprotocol/hdfs_source_client.go +++ b/pkg/source/clients/hdfsprotocol/hdfs_source_client.go @@ -134,16 +134,16 @@ func (h *hdfsSourceClient) Download(request *source.Request) (*source.Response, } if request.Header.Get(source.Range) != "" { - requestRange, err := http.ParseRange(request.Header.Get(source.Range), uint64(limitReadN)) + rg, err := http.ParseURLMetaRange(request.Header.Get(source.Range), limitReadN) if err != nil { return nil, err } - _, err = hdfsFile.Seek(int64(requestRange.StartIndex), 0) + _, err = hdfsFile.Seek(rg.Start, 0) if err != nil { hdfsFile.Close() return nil, err } - limitReadN = int64(requestRange.Length()) + limitReadN = rg.Length } response := source.NewResponse( diff --git a/pkg/source/clients/httpprotocol/http_source_client_test.go b/pkg/source/clients/httpprotocol/http_source_client_test.go index 7ebbcee931d..c2831269672 100644 --- a/pkg/source/clients/httpprotocol/http_source_client_test.go +++ b/pkg/source/clients/httpprotocol/http_source_client_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "io" + "math" "net/http" "testing" "time" @@ -77,15 +78,15 @@ func (suite *HTTPSourceClientTestSuite) SetupTest() { }) httpmock.RegisterResponder(http.MethodGet, normalRawURL, func(request *http.Request) (*http.Response, error) { - if rang := request.Header.Get(headers.Range); rang != "" { - r, _ := nethttp.GetRange(rang[6:]) + if rg := request.Header.Get(headers.Range); rg != "" { + r, _ := nethttp.ParseOneRange(rg, math.MaxInt64) header := http.Header{} header.Set(headers.LastModified, lastModified) header.Set(headers.ETag, etag) res := &http.Response{ StatusCode: http.StatusPartialContent, - ContentLength: int64(r.EndIndex) - int64(r.StartIndex) + int64(1), - Body: httpmock.NewRespBodyFromString(testContent[r.StartIndex:r.EndIndex]), + ContentLength: r.Length, + Body: httpmock.NewRespBodyFromString(testContent[r.Start : r.Start+r.Length-1]), Header: header, } return res, nil diff --git a/scheduler/resource/peer_test.go b/scheduler/resource/peer_test.go index f30d5810196..4710763b028 100644 --- a/scheduler/resource/peer_test.go +++ b/scheduler/resource/peer_test.go @@ -37,8 +37,8 @@ import ( schedulerv2 "d7y.io/api/pkg/apis/scheduler/v2" v2mocks "d7y.io/api/pkg/apis/scheduler/v2/mocks" - "d7y.io/dragonfly/v2/client/util" "d7y.io/dragonfly/v2/pkg/idgen" + nethttp "d7y.io/dragonfly/v2/pkg/net/http" configmocks "d7y.io/dragonfly/v2/scheduler/config/mocks" ) @@ -498,7 +498,7 @@ func TestPeer_DownloadTinyFile(t *testing.T) { assert.Equal(r.URL.Path, fmt.Sprintf("/download/%s/%s", peer.Task.ID[:3], peer.Task.ID)) assert.Equal(r.URL.RawQuery, fmt.Sprintf("peerId=%s", peer.ID)) - rgs, err := util.ParseRange(r.Header.Get(headers.Range), 128) + rgs, err := nethttp.ParseRange(r.Header.Get(headers.Range), 128) assert.Nil(err) assert.Equal(1, len(rgs)) rg := rgs[0] diff --git a/test/e2e/dfget_test.go b/test/e2e/dfget_test.go index 9b8ad253148..0f5b0483dc5 100644 --- a/test/e2e/dfget_test.go +++ b/test/e2e/dfget_test.go @@ -26,7 +26,7 @@ import ( . "github.com/onsi/ginkgo/v2" //nolint . "github.com/onsi/gomega" //nolint - "d7y.io/dragonfly/v2/client/util" + "d7y.io/dragonfly/v2/pkg/net/http" "d7y.io/dragonfly/v2/test/e2e/e2eutil" ) @@ -68,9 +68,9 @@ func getFileSizes() map[string]int { return details } -func getRandomRange(size int) *util.Range { +func getRandomRange(size int) *http.Range { if size == 0 { - return &util.Range{ + return &http.Range{ Start: 0, Length: 0, } @@ -86,7 +86,7 @@ func getRandomRange(size int) *util.Range { } // range for [start, end] - rg := &util.Range{ + rg := &http.Range{ Start: int64(start), Length: int64(end + 1 - start), } @@ -282,7 +282,7 @@ func singleDfgetTest(name, ns, label, podNamePrefix, container string) { }) } -func downloadSingleFile(ns string, pod *e2eutil.PodExec, path, url string, size int, rg *util.Range, rawRg string) { +func downloadSingleFile(ns string, pod *e2eutil.PodExec, path, url string, size int, rg *http.Range, rawRg string) { var ( sha256sum []string dfget []string