Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

Commit

Permalink
fix: cdn-source pattern supports range task
Browse files Browse the repository at this point in the history
Signed-off-by: lowzj <zj3142063@gmail.com>
  • Loading branch information
lowzj committed Apr 7, 2020
1 parent 9a4e7b0 commit ef57720
Show file tree
Hide file tree
Showing 10 changed files with 130 additions and 31 deletions.
49 changes: 46 additions & 3 deletions dfget/core/api/download_api.go
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/dragonflyoss/Dragonfly/dfget/config"
"github.com/dragonflyoss/Dragonfly/pkg/httputils"
"github.com/dragonflyoss/Dragonfly/pkg/rangeutils"
"github.com/dragonflyoss/Dragonfly/version"
)

Expand Down Expand Up @@ -56,8 +57,10 @@ func NewDownloadAPI() DownloadAPI {
}

func (d *downloadAPI) Download(ip string, port int, req *DownloadRequest, timeout time.Duration) (*http.Response, error) {
if req == nil {
return nil, fmt.Errorf("nil dwonload request")
}
headers := make(map[string]string)
headers[config.StrRange] = config.StrBytes + "=" + req.PieceRange
headers[config.StrPieceNum] = strconv.Itoa(req.PieceNum)
headers[config.StrPieceSize] = fmt.Sprint(req.PieceSize)
headers[config.StrUserAgent] = "dfget/" + version.DFGetVersion
Expand All @@ -67,12 +70,52 @@ func (d *downloadAPI) Download(ip string, port int, req *DownloadRequest, timeou
}
}

var url string
if strings.Contains(req.Path, "://") {
var (
url string
rangeStr string
)
if isFromSource(req) {
rangeStr = getRealRange(req.PieceRange, headers[config.StrRange])
url = req.Path
} else {
rangeStr = req.PieceRange
url = fmt.Sprintf("http://%s:%d%s", ip, port, req.Path)
}
headers[config.StrRange] = httputils.ConstructRangeStr(rangeStr)

return httputils.HTTPGetTimeout(url, headers, timeout)
}

func isFromSource(req *DownloadRequest) bool {
return strings.Contains(req.Path, "://")
}

// getRealRange
// pieceRange: "start-end"
// rangeHeaderValue: "bytes=sourceStart-sourceEnd"
// return: "realStart-realEnd"
func getRealRange(pieceRange string, rangeHeaderValue string) string {
if rangeHeaderValue == "" {
return pieceRange
}
rangeEle := strings.Split(rangeHeaderValue, "=")
if len(rangeEle) != 2 {
return pieceRange
}

lower, upper, err := rangeutils.ParsePieceIndex(rangeEle[1])
if err != nil {
return pieceRange
}
start, end, err := rangeutils.ParsePieceIndex(pieceRange)
if err != nil {
return pieceRange
}

realStart := start + lower
realEnd := end + lower
if realEnd > upper {
realEnd = upper
}
return fmt.Sprintf("%d-%d", realStart, realEnd)
}
51 changes: 51 additions & 0 deletions dfget/core/api/download_api_test.go
@@ -0,0 +1,51 @@
/*
* Copyright The Dragonfly Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package api

import (
"github.com/go-check/check"
)

type DownloadAPITestSuite struct {
}

func init() {
check.Suite(&DownloadAPITestSuite{})
}

// ----------------------------------------------------------------------------
// unit tests for DownloadAPI

func (s *DownloadAPITestSuite) TestGetRealRange(c *check.C) {
cases := []struct {
pieceRange string
rangeValue string
expected string
}{
{"0-1", "", "0-1"},
{"0-1", "1-100", "1-2"},
{"0-100", "1-100", "1-100"},
{"100-100", "1-100", "101-100"},
{"100-200", "1-100", "101-100"},
}

for _, v := range cases {
res := getRealRange(v.pieceRange, "bytes="+v.rangeValue)
c.Assert(res, check.Equals, v.expected,
check.Commentf("%v", v))
}
}
14 changes: 8 additions & 6 deletions supernode/util/range_util.go → pkg/rangeutils/range_util.go
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package util
package rangeutils

import (
"fmt"
Expand All @@ -23,7 +23,8 @@ import (
)

const (
separator = "-"
separator = "-"
invalidPieceIndex = -1
)

// CalculatePieceSize calculates the size of piece
Expand Down Expand Up @@ -51,23 +52,24 @@ func CalculatePieceNum(rangeStr string) int {
}

// ParsePieceIndex parses the start and end index ​​according to range string.
// rangeStr: "start-end"
func ParsePieceIndex(rangeStr string) (start, end int64, err error) {
ranges := strings.Split(rangeStr, separator)
if len(ranges) != 2 {
return -1, -1, fmt.Errorf("range value(%s) is illegal which should be like 0-45535", rangeStr)
return invalidPieceIndex, invalidPieceIndex, fmt.Errorf("range value(%s) is illegal which should be like 0-45535", rangeStr)
}

startIndex, err := strconv.ParseInt(ranges[0], 10, 64)
if err != nil {
return -1, -1, fmt.Errorf("range(%s) start is not a number", rangeStr)
return invalidPieceIndex, invalidPieceIndex, fmt.Errorf("range(%s) start is not a number", rangeStr)
}
endIndex, err := strconv.ParseInt(ranges[1], 10, 64)
if err != nil {
return -1, -1, fmt.Errorf("range(%s) end is not a number", rangeStr)
return invalidPieceIndex, invalidPieceIndex, fmt.Errorf("range(%s) end is not a number", rangeStr)
}

if endIndex < startIndex {
return -1, -1, fmt.Errorf("range(%s) start is larger than end", rangeStr)
return invalidPieceIndex, invalidPieceIndex, fmt.Errorf("range(%s) start is larger than end", rangeStr)
}

return startIndex, endIndex, nil
Expand Down
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package util
package rangeutils

import (
"testing"
Expand Down
10 changes: 5 additions & 5 deletions supernode/daemon/mgr/cdn/downloader.go
Expand Up @@ -20,12 +20,12 @@ import (
"context"
"net/http"

errorType "github.com/dragonflyoss/Dragonfly/pkg/errortypes"
"github.com/dragonflyoss/Dragonfly/pkg/httputils"
"github.com/dragonflyoss/Dragonfly/supernode/util"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"

errorType "github.com/dragonflyoss/Dragonfly/pkg/errortypes"
"github.com/dragonflyoss/Dragonfly/pkg/httputils"
"github.com/dragonflyoss/Dragonfly/pkg/rangeutils"
)

// download downloads the file from the original address and
Expand All @@ -38,7 +38,7 @@ func (cm *Manager) download(ctx context.Context, taskID, url string, headers map
checkCode := []int{http.StatusOK, http.StatusPartialContent}

if startPieceNum > 0 {
breakRange, err := util.CalculateBreakRange(startPieceNum, int(pieceContSize), httpFileLength)
breakRange, err := rangeutils.CalculateBreakRange(startPieceNum, int(pieceContSize), httpFileLength)
if err != nil {
return nil, errors.Wrapf(errorType.ErrInvalidValue, "failed to calculate the breakRange: %v", err)
}
Expand Down
3 changes: 2 additions & 1 deletion supernode/daemon/mgr/cdn/manager.go
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/dragonflyoss/Dragonfly/pkg/limitreader"
"github.com/dragonflyoss/Dragonfly/pkg/metricsutils"
"github.com/dragonflyoss/Dragonfly/pkg/netutils"
"github.com/dragonflyoss/Dragonfly/pkg/rangeutils"
"github.com/dragonflyoss/Dragonfly/pkg/ratelimiter"
"github.com/dragonflyoss/Dragonfly/pkg/stringutils"
"github.com/dragonflyoss/Dragonfly/supernode/config"
Expand Down Expand Up @@ -216,7 +217,7 @@ func (cm *Manager) GetPieceMD5(ctx context.Context, taskID string, pieceNum int,

if source == PieceMd5SourceFile {
// get piece length
start, end, err := util.ParsePieceIndex(pieceRange)
start, end, err := rangeutils.ParsePieceIndex(pieceRange)
if err != nil {
return "", errors.Wrapf(err, "failed to parse piece range(%s)", pieceRange)
}
Expand Down
8 changes: 4 additions & 4 deletions supernode/daemon/mgr/pieceerror/md5_not_match.go
Expand Up @@ -19,11 +19,11 @@ package pieceerror
import (
"context"

"github.com/sirupsen/logrus"

"github.com/dragonflyoss/Dragonfly/apis/types"
"github.com/dragonflyoss/Dragonfly/pkg/rangeutils"
"github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr"
"github.com/dragonflyoss/Dragonfly/supernode/util"

"github.com/sirupsen/logrus"
)

var _ Handler = &FileMd5NotMatchHandler{}
Expand All @@ -45,7 +45,7 @@ func NewFileMd5NotMatchHandler(gcManager mgr.GCMgr, cdnManager mgr.CDNMgr) (Hand
}

func (fnmh *FileMd5NotMatchHandler) Handle(ctx context.Context, pieceErrorRequest *types.PieceErrorRequest) error {
pieceNum := util.CalculatePieceNum(pieceErrorRequest.Range)
pieceNum := rangeutils.CalculatePieceNum(pieceErrorRequest.Range)

// get piece MD5 by reading the meta file
metaPieceMD5, err := fnmh.cdnManager.GetPieceMD5(ctx, pieceErrorRequest.TaskID, pieceNum, pieceErrorRequest.Range, "meta")
Expand Down
3 changes: 2 additions & 1 deletion supernode/daemon/mgr/task/manager.go
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/dragonflyoss/Dragonfly/apis/types"
"github.com/dragonflyoss/Dragonfly/pkg/errortypes"
"github.com/dragonflyoss/Dragonfly/pkg/metricsutils"
"github.com/dragonflyoss/Dragonfly/pkg/rangeutils"
"github.com/dragonflyoss/Dragonfly/pkg/stringutils"
"github.com/dragonflyoss/Dragonfly/pkg/syncmap"
"github.com/dragonflyoss/Dragonfly/supernode/config"
Expand Down Expand Up @@ -272,7 +273,7 @@ func (tm *Manager) UpdatePieceStatus(ctx context.Context, taskID, pieceRange str
logrus.Debugf("get update piece status request: %+v with taskID(%s) pieceRange(%s)", pieceUpdateRequest, taskID, pieceRange)

// calculate the pieceNum according to the pieceRange
pieceNum := util.CalculatePieceNum(pieceRange)
pieceNum := rangeutils.CalculatePieceNum(pieceRange)
if pieceNum == -1 {
return errors.Wrapf(errortypes.ErrInvalidValue,
"failed to parse pieceRange: %s to pieceNum for taskID: %s, clientID: %s",
Expand Down
5 changes: 3 additions & 2 deletions supernode/daemon/mgr/task/manager_util.go
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/dragonflyoss/Dragonfly/pkg/digest"
"github.com/dragonflyoss/Dragonfly/pkg/errortypes"
"github.com/dragonflyoss/Dragonfly/pkg/netutils"
"github.com/dragonflyoss/Dragonfly/pkg/rangeutils"
"github.com/dragonflyoss/Dragonfly/pkg/stringutils"
"github.com/dragonflyoss/Dragonfly/pkg/timeutils"
"github.com/dragonflyoss/Dragonfly/supernode/config"
Expand Down Expand Up @@ -290,7 +291,7 @@ func (tm *Manager) processTaskStart(ctx context.Context, srcCID string, task *ty
// req.DstPID, req.PieceRange, req.PieceResult, req.DfgetTaskStatus
func (tm *Manager) processTaskRunning(ctx context.Context, srcCID, srcPID string, task *types.TaskInfo, req *types.PiecePullRequest,
dfgetTask *types.DfGetTask) (bool, interface{}, error) {
pieceNum := util.CalculatePieceNum(req.PieceRange)
pieceNum := rangeutils.CalculatePieceNum(req.PieceRange)
if pieceNum == -1 {
return false, nil, errors.Wrapf(errortypes.ErrInvalidValue, "pieceRange: %s", req.PieceRange)
}
Expand Down Expand Up @@ -411,7 +412,7 @@ func (tm *Manager) pieceResultToPieceInfo(ctx context.Context, pr *mgr.PieceResu
PeerIP: peer.IP.String(),
PeerPort: peer.Port,
PieceMD5: pieceMD5,
PieceRange: util.CalculatePieceRange(pr.PieceNum, pieceSize),
PieceRange: rangeutils.CalculatePieceRange(pr.PieceNum, pieceSize),
PieceSize: pieceSize,
}, nil
}
Expand Down
16 changes: 8 additions & 8 deletions supernode/server/0.3_bridge.go
Expand Up @@ -21,17 +21,17 @@ import (
"encoding/json"
"net/http"

"github.com/go-openapi/strfmt"
"github.com/gorilla/schema"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"

"github.com/dragonflyoss/Dragonfly/apis/types"
"github.com/dragonflyoss/Dragonfly/pkg/constants"
"github.com/dragonflyoss/Dragonfly/pkg/errortypes"
"github.com/dragonflyoss/Dragonfly/pkg/netutils"
"github.com/dragonflyoss/Dragonfly/pkg/rangeutils"
"github.com/dragonflyoss/Dragonfly/pkg/stringutils"
sutil "github.com/dragonflyoss/Dragonfly/supernode/util"

"github.com/go-openapi/strfmt"
"github.com/gorilla/schema"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

// RegisterResponseData is the data when registering supernode successfully.
Expand Down Expand Up @@ -200,7 +200,7 @@ func (s *Server) pullPieceTask(ctx context.Context, rw http.ResponseWriter, req
}
datas = append(datas, &PullPieceTaskResponseContinueData{
Range: v.PieceRange,
PieceNum: sutil.CalculatePieceNum(v.PieceRange),
PieceNum: rangeutils.CalculatePieceNum(v.PieceRange),
PieceSize: v.PieceSize,
PieceMd5: v.PieceMD5,
Cid: cid,
Expand Down Expand Up @@ -229,7 +229,7 @@ func (s *Server) reportPiece(ctx context.Context, rw http.ResponseWriter, req *h

// If piece is downloaded from supernode, add metrics.
if s.Config.IsSuperCID(dstCID) {
m.pieceDownloadedBytes.WithLabelValues().Add(float64(sutil.CalculatePieceSize(pieceRange)))
m.pieceDownloadedBytes.WithLabelValues().Add(float64(rangeutils.CalculatePieceSize(pieceRange)))
}

request := &types.PieceUpdateRequest{
Expand Down

0 comments on commit ef57720

Please sign in to comment.