Skip to content

Commit

Permalink
feat: cdn download tiny file (#1040)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi committed Jan 25, 2022
1 parent 7e3d6d0 commit c552a0c
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 78 deletions.
21 changes: 0 additions & 21 deletions scheduler/resource/cdn.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,27 +95,6 @@ func (c *cdn) TriggerTask(ctx context.Context, task *Task) (*Peer, *rpcscheduler
// Handle end of piece
if piece.Done {
peer.Log.Infof("receive end of piece: %#v %#v", piece, piece.PieceInfo)

// Handle tiny scope size task
if piece.ContentLength <= TinyFileSize {
peer.Log.Info("peer type is tiny file")
data, err := peer.DownloadTinyFile(ctx)
if err != nil {
return nil, nil, err
}

// Tiny file downloaded directly from CDN is exception
if len(data) != int(piece.ContentLength) {
return nil, nil, errors.Errorf(
"piece actual data length is different from content length, content length is %d, data length is %d",
piece.ContentLength, len(data),
)
}

// Tiny file downloaded successfully
task.DirectPiece = data
}

return peer, &rpcscheduler.PeerResult{
TotalPieceCount: piece.TotalPieceCount,
ContentLength: piece.ContentLength,
Expand Down
19 changes: 15 additions & 4 deletions scheduler/resource/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ import (
"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
)

const (
// Download tiny file timeout
downloadTinyFileContextTimeout = 2 * time.Minute
)

const (
// Peer has been created but did not start running
PeerStatePending = "Pending"
Expand Down Expand Up @@ -389,29 +394,35 @@ func (p *Peer) DeleteStream() {
}

// Download tiny file from peer
func (p *Peer) DownloadTinyFile(ctx context.Context) ([]byte, error) {
// Download url: http://${host}:${port}/download/${taskIndex}/${taskID}?peerId=scheduler;
func (p *Peer) DownloadTinyFile() ([]byte, error) {
ctx, cancel := context.WithTimeout(context.Background(), downloadTinyFileContextTimeout)
defer cancel()

// Download url: http://${host}:${port}/download/${taskIndex}/${taskID}?peerId=scheduler
url := url.URL{
Scheme: "http",
Host: fmt.Sprintf("%s:%d", p.Host.IP, p.Host.DownloadPort),
Path: fmt.Sprintf("download/%s/%s", p.Task.ID[:3], p.Task.ID),
RawQuery: "peerId=scheduler",
}
p.Log.Infof("download tiny file url: %#v", url)

req, err := http.NewRequestWithContext(ctx, http.MethodGet, url.String(), nil)
if err != nil {
return []byte{}, err
}
req.Header.Set(headers.Range, fmt.Sprintf("bytes=%d-%d", 0, p.Task.ContentLength.Load()))
p.Log.Infof("download tiny file %s, header is : %#v", url.String(), req.Header)

resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
// The HTTP 206 Partial Content success status response code indicates that
// the request has succeeded and the body contains the requested ranges of data, as described in the Range header of the request.
// Refer to https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/206
if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusBadRequest {
return []byte{}, fmt.Errorf("%v: %v", url.String(), resp.Status)
}

Expand Down
43 changes: 4 additions & 39 deletions scheduler/resource/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package resource

import (
"context"
"fmt"
"net"
"net/http"
Expand All @@ -26,7 +25,6 @@ import (
"path"
"strconv"
"testing"
"time"

"github.com/go-http-utils/headers"
"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -880,7 +878,7 @@ func TestPeer_DownloadTinyFile(t *testing.T) {
return
}

w.WriteHeader(http.StatusOK)
w.WriteHeader(http.StatusPartialContent)
}))
defer s.Close()

Expand All @@ -892,10 +890,7 @@ func TestPeer_DownloadTinyFile(t *testing.T) {
name: "download tiny file",
expect: func(t *testing.T, peer *Peer) {
assert := assert.New(t)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

_, err := peer.DownloadTinyFile(ctx)
_, err := peer.DownloadTinyFile()
assert.NoError(err)
},
},
Expand All @@ -904,47 +899,17 @@ func TestPeer_DownloadTinyFile(t *testing.T) {
expect: func(t *testing.T, peer *Peer) {
assert := assert.New(t)
peer.Task.ContentLength.Store(2)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

_, err := peer.DownloadTinyFile(ctx)
_, err := peer.DownloadTinyFile()
assert.EqualError(err, fmt.Sprintf("http://%s:%d/download/%s/%s?peerId=scheduler: 406 Not Acceptable",
peer.Host.IP, peer.Host.DownloadPort, peer.Task.ID[:3], peer.Task.ID))
},
},
{
name: "download tiny file failed because of port error",
expect: func(t *testing.T, peer *Peer) {
assert := assert.New(t)
peer.Host.DownloadPort = 8000
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

_, err := peer.DownloadTinyFile(ctx)
assert.Error(err)
},
},
{
name: "download tiny file failed because of ip error",
expect: func(t *testing.T, peer *Peer) {
assert := assert.New(t)
peer.Host.IP = "127.0.0.2"
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

_, err := peer.DownloadTinyFile(ctx)
assert.Error(err)
},
},
{
name: "download tiny file failed because of http status code",
expect: func(t *testing.T, peer *Peer) {
assert := assert.New(t)
peer.Task.ID = "foo"
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

_, err := peer.DownloadTinyFile(ctx)
_, err := peer.DownloadTinyFile()
assert.EqualError(err, fmt.Sprintf("http://%s:%d/download/%s/%s?peerId=scheduler: 404 Not Found",
peer.Host.IP, peer.Host.DownloadPort, peer.Task.ID[:3], peer.Task.ID))
},
Expand Down
12 changes: 2 additions & 10 deletions scheduler/rpcserver/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,21 +68,13 @@ func (s *Server) RegisterPeerTask(ctx context.Context, req *scheduler.PeerTaskRe
switch sizeScope {
case base.SizeScope_TINY:
peer.Log.Info("task size scope is tiny and return piece content directly")
// When task.DirectPiece length is 0, data is downloaded by common peers failed
if int64(len(task.DirectPiece)) == task.ContentLength.Load() {
if len(task.DirectPiece) > 0 && int64(len(task.DirectPiece)) == task.ContentLength.Load() {
if err := peer.FSM.Event(resource.PeerEventRegisterTiny); err != nil {
dferr := dferrors.New(base.Code_SchedError, err.Error())
peer.Log.Errorf("peer %s register is failed: %v", req.PeerId, err)
return nil, dferr
}

// Dfdaemon does not report piece info when scope size is SizeScope_TINY
if err := peer.FSM.Event(resource.PeerEventDownload); err != nil {
dferr := dferrors.New(base.Code_SchedError, err.Error())
peer.Log.Errorf("peer %s register is failed: %v", req.PeerId, err)
return nil, dferr
}

return &scheduler.RegisterResult{
TaskId: task.ID,
SizeScope: base.SizeScope_TINY,
Expand All @@ -93,7 +85,7 @@ func (s *Server) RegisterPeerTask(ctx context.Context, req *scheduler.PeerTaskRe
}

// Fallback to base.SizeScope_SMALL
peer.Log.Warnf("task size scope is tiny, but task.DirectPiece length is %d, not %d. fall through to size scope small",
peer.Log.Warnf("task size scope is tiny, length of direct piece is %d and content length is %d. fall through to size scope small",
len(task.DirectPiece), task.ContentLength.Load())
fallthrough
case base.SizeScope_SMALL:
Expand Down
43 changes: 42 additions & 1 deletion scheduler/rpcserver/rpcserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,47 @@ func TestRPCServer_RegisterPeerTask(t *testing.T) {
assert.Equal(result.SizeScope, base.SizeScope_NORMAL)
},
},
{
name: "length of direct piece is zero",
req: &rpcscheduler.PeerTaskRequest{},
mock: func(req *rpcscheduler.PeerTaskRequest, mockPeer *resource.Peer, mockHost *resource.Host, mockTask *resource.Task, scheduler scheduler.Scheduler, ms *mocks.MockServiceMockRecorder, msched *schedulermocks.MockSchedulerMockRecorder) {
mockTask.FSM.SetState(resource.TaskStateSucceeded)

gomock.InOrder(
ms.RegisterTask(context.Background(), req).Return(mockTask, nil).Times(1),
ms.LoadOrStoreHost(context.Background(), req).Return(mockHost, true).Times(1),
ms.LoadOrStorePeer(context.Background(), req, gomock.Any(), gomock.Any()).Return(mockPeer, true).Times(1),
ms.Scheduler().Return(scheduler).Times(1),
msched.FindParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, false).Times(1),
)
},
expect: func(t *testing.T, peer *resource.Peer, result *rpcscheduler.RegisterResult, err error) {
assert := assert.New(t)
assert.Equal(result.TaskId, mockTaskID)
assert.Equal(result.SizeScope, base.SizeScope_NORMAL)
},
},
{
name: "task content length is not equal to length of direct piece",
req: &rpcscheduler.PeerTaskRequest{},
mock: func(req *rpcscheduler.PeerTaskRequest, mockPeer *resource.Peer, mockHost *resource.Host, mockTask *resource.Task, scheduler scheduler.Scheduler, ms *mocks.MockServiceMockRecorder, msched *schedulermocks.MockSchedulerMockRecorder) {
mockTask.FSM.SetState(resource.TaskStateSucceeded)
mockTask.DirectPiece = []byte{1}

gomock.InOrder(
ms.RegisterTask(context.Background(), req).Return(mockTask, nil).Times(1),
ms.LoadOrStoreHost(context.Background(), req).Return(mockHost, true).Times(1),
ms.LoadOrStorePeer(context.Background(), req, gomock.Any(), gomock.Any()).Return(mockPeer, true).Times(1),
ms.Scheduler().Return(scheduler).Times(1),
msched.FindParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, false).Times(1),
)
},
expect: func(t *testing.T, peer *resource.Peer, result *rpcscheduler.RegisterResult, err error) {
assert := assert.New(t)
assert.Equal(result.TaskId, mockTaskID)
assert.Equal(result.SizeScope, base.SizeScope_NORMAL)
},
},
{
name: "task state is TaskStateRunning and peer state is PeerStateFailed",
req: &rpcscheduler.PeerTaskRequest{},
Expand Down Expand Up @@ -206,7 +247,7 @@ func TestRPCServer_RegisterPeerTask(t *testing.T) {
assert.Equal(result.DirectPiece, &rpcscheduler.RegisterResult_PieceContent{
PieceContent: []byte{1},
})
assert.True(peer.FSM.Is(resource.PeerStateRunning))
assert.True(peer.FSM.Is(resource.PeerStateReceivedTiny))
},
},
{
Expand Down
13 changes: 10 additions & 3 deletions scheduler/service/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,14 @@ func (c *callback) BeginOfPiece(ctx context.Context, peer *resource.Peer) {
// Back to the source download process, peer directly returns
peer.Log.Info("peer back to source")
return
case resource.PeerStateReceivedTiny:
// When the task is tiny,
// the peer has already returned to piece data when registering
peer.Log.Info("file type is tiny, peer has already returned to piece data when registering")
if err := peer.FSM.Event(resource.PeerEventDownload); err != nil {
peer.Log.Errorf("peer fsm event failed: %v", err)
return
}
case resource.PeerStateReceivedSmall:
// When the task is small,
// the peer has already returned to the parent when registering
Expand Down Expand Up @@ -252,9 +260,8 @@ func (c *callback) PieceFail(ctx context.Context, peer *resource.Peer, piece *rp
func (c *callback) PeerSuccess(ctx context.Context, peer *resource.Peer) {
// If the peer type is tiny and back-to-source,
// it need to directly download the tiny file and store the data in task DirectPiece
if peer.FSM.Is(resource.PeerStateBackToSource) && peer.Task.SizeScope() == base.SizeScope_TINY {
peer.Log.Info("peer state is PeerStateBackToSource and type is tiny file")
data, err := peer.DownloadTinyFile(ctx)
if peer.Task.SizeScope() == base.SizeScope_TINY && len(peer.Task.DirectPiece) == 0 {
data, err := peer.DownloadTinyFile()
if err == nil && len(data) == int(peer.Task.ContentLength.Load()) {
// Tiny file downloaded successfully
peer.Task.DirectPiece = data
Expand Down
10 changes: 10 additions & 0 deletions scheduler/service/callback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,16 @@ func TestCallback_BeginOfPiece(t *testing.T) {
assert.True(peer.FSM.Is(resource.PeerStateBackToSource))
},
},
{
name: "peer state is PeerStateReceivedTiny",
mock: func(peer *resource.Peer, scheduler *mocks.MockSchedulerMockRecorder) {
peer.FSM.SetState(resource.PeerStateReceivedTiny)
},
expect: func(t *testing.T, peer *resource.Peer) {
assert := assert.New(t)
assert.True(peer.FSM.Is(resource.PeerStateRunning))
},
},
{
name: "peer state is PeerStateReceivedSmall",
mock: func(peer *resource.Peer, scheduler *mocks.MockSchedulerMockRecorder) {
Expand Down

0 comments on commit c552a0c

Please sign in to comment.