Skip to content

Commit

Permalink
fix: scheduler download tiny file error (#1052)
Browse files Browse the repository at this point in the history
* fix: scheduler download tiny file error

Signed-off-by: Jim Ma <majinjing3@gmail.com>

* chore: fix golang lint

Signed-off-by: Jim Ma <majinjing3@gmail.com>
  • Loading branch information
jim3ma committed Feb 8, 2022
1 parent 66a0068 commit a855c0e
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 37 deletions.
16 changes: 8 additions & 8 deletions scheduler/resource/peer.go
Expand Up @@ -385,25 +385,25 @@ func (p *Peer) DeleteStream() {
p.Stream = &atomic.Value{}
}

// Download tiny file from peer
// DownloadTinyFile downloads tiny file from peer
func (p *Peer) DownloadTinyFile() ([]byte, error) {
ctx, cancel := context.WithTimeout(context.Background(), downloadTinyFileContextTimeout)
defer cancel()

// Download url: http://${host}:${port}/download/${taskIndex}/${taskID}?peerId=scheduler
url := url.URL{
// Download url: http://${host}:${port}/download/${taskIndex}/${taskID}?peerId=${peerID}
targetURL := url.URL{
Scheme: "http",
Host: fmt.Sprintf("%s:%d", p.Host.IP, p.Host.DownloadPort),
Path: fmt.Sprintf("download/%s/%s", p.Task.ID[:3], p.Task.ID),
RawQuery: "peerId=scheduler",
RawQuery: fmt.Sprintf("peerId=%s", p.ID),
}

req, err := http.NewRequestWithContext(ctx, http.MethodGet, url.String(), nil)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, targetURL.String(), nil)
if err != nil {
return []byte{}, err
}
req.Header.Set(headers.Range, fmt.Sprintf("bytes=%d-%d", 0, p.Task.ContentLength.Load()))
p.Log.Infof("download tiny file %s, header is : %#v", url.String(), req.Header)
req.Header.Set(headers.Range, fmt.Sprintf("bytes=%d-%d", 0, p.Task.ContentLength.Load()-1))
p.Log.Infof("download tiny file %s, header is : %#v", targetURL.String(), req.Header)

resp, err := http.DefaultClient.Do(req)
if err != nil {
Expand All @@ -415,7 +415,7 @@ func (p *Peer) DownloadTinyFile() ([]byte, error) {
// the request has succeeded and the body contains the requested ranges of data, as described in the Range header of the request.
// Refer to https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/206
if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusBadRequest {
return []byte{}, fmt.Errorf("%v: %v", url.String(), resp.Status)
return []byte{}, fmt.Errorf("%v: %v", targetURL.String(), resp.Status)
}

return io.ReadAll(resp.Body)
Expand Down
80 changes: 51 additions & 29 deletions scheduler/resource/peer_test.go
Expand Up @@ -22,14 +22,14 @@ import (
"net/http"
"net/http/httptest"
"net/url"
"path"
"strconv"
"testing"

"github.com/go-http-utils/headers"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"

"d7y.io/dragonfly/v2/client/clientutil"
"d7y.io/dragonfly/v2/pkg/idgen"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler/mocks"
Expand Down Expand Up @@ -813,57 +813,79 @@ func TestPeer_DeleteStream(t *testing.T) {
}

func TestPeer_DownloadTinyFile(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if path.Base(r.URL.Path) == "foo" {
w.WriteHeader(http.StatusNotFound)
return
}

if r.Header.Get(headers.Range) == "bytes=0-2" {
w.WriteHeader(http.StatusNotAcceptable)
return
}

w.WriteHeader(http.StatusPartialContent)
}))
defer s.Close()

testData := []byte("./0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" +
"./0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz")
newServer := func(t *testing.T, getPeer func() *Peer) *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
peer := getPeer()
assert := assert.New(t)
assert.NotNil(peer)
assert.Equal(r.URL.Path, fmt.Sprintf("/download/%s/%s", peer.Task.ID[:3], peer.Task.ID))
assert.Equal(r.URL.RawQuery, fmt.Sprintf("peerId=%s", peer.ID))

rgs, err := clientutil.ParseRange(r.Header.Get(headers.Range), 128)
assert.Nil(err)
assert.Equal(1, len(rgs))
rg := rgs[0]

w.WriteHeader(http.StatusPartialContent)
n, err := w.Write(testData[rg.Start : rg.Start+rg.Length])
assert.Nil(err)
assert.Equal(int64(n), rg.Length)
}))
}
tests := []struct {
name string
expect func(t *testing.T, peer *Peer)
name string
newServer func(t *testing.T, getPeer func() *Peer) *httptest.Server
expect func(t *testing.T, peer *Peer)
}{
{
name: "download tiny file",
name: "download tiny file - 32",
expect: func(t *testing.T, peer *Peer) {
assert := assert.New(t)
_, err := peer.DownloadTinyFile()
peer.Task.ContentLength.Store(32)
data, err := peer.DownloadTinyFile()
assert.NoError(err)
assert.Equal(testData[:32], data)
},
},
{
name: "download tiny file with range header",
name: "download tiny file - 128",
expect: func(t *testing.T, peer *Peer) {
assert := assert.New(t)
peer.Task.ContentLength.Store(2)
_, err := peer.DownloadTinyFile()
assert.EqualError(err, fmt.Sprintf("http://%s:%d/download/%s/%s?peerId=scheduler: 406 Not Acceptable",
peer.Host.IP, peer.Host.DownloadPort, peer.Task.ID[:3], peer.Task.ID))
peer.Task.ContentLength.Store(32)
data, err := peer.DownloadTinyFile()
assert.NoError(err)
assert.Equal(testData[:32], data)
},
},
{
name: "download tiny file failed because of http status code",
newServer: func(t *testing.T, getPeer func() *Peer) *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
}))
},
expect: func(t *testing.T, peer *Peer) {
assert := assert.New(t)
peer.Task.ID = "foo"
peer.Task.ID = "foobar"
_, err := peer.DownloadTinyFile()
assert.EqualError(err, fmt.Sprintf("http://%s:%d/download/%s/%s?peerId=scheduler: 404 Not Found",
peer.Host.IP, peer.Host.DownloadPort, peer.Task.ID[:3], peer.Task.ID))
assert.EqualError(err, fmt.Sprintf("http://%s:%d/download/%s/%s?peerId=%s: 404 Not Found",
peer.Host.IP, peer.Host.DownloadPort, peer.Task.ID[:3], peer.Task.ID, peer.ID))
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
var peer *Peer
if tc.newServer == nil {
tc.newServer = newServer
}
s := tc.newServer(t, func() *Peer {
return peer
})
defer s.Close()
url, err := url.Parse(s.URL)
if err != nil {
t.Fatal(err)
Expand All @@ -883,7 +905,7 @@ func TestPeer_DownloadTinyFile(t *testing.T) {
mockRawHost.DownPort = int32(port)
mockHost := NewHost(mockRawHost)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskBackToSourceLimit, mockTaskURLMeta)
peer := NewPeer(mockPeerID, mockTask, mockHost)
peer = NewPeer(mockPeerID, mockTask, mockHost)
tc.expect(t, peer)
})
}
Expand Down

0 comments on commit a855c0e

Please sign in to comment.