Skip to content

Commit

Permalink
Manual master merge
Browse files Browse the repository at this point in the history
  • Loading branch information
anbsky committed Apr 20, 2021
2 parents fd98b32 + 68ad465 commit a2726d1
Show file tree
Hide file tree
Showing 11 changed files with 267 additions and 220 deletions.
64 changes: 64 additions & 0 deletions app/publish/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,67 @@ func TestUploadHandlerSystemError(t *testing.T) {
assert.Equal(t, "unexpected EOF", rpcResponse.Error.Message)
require.False(t, publisher.called)
}

func Test_fetchFileInvalidInput(t *testing.T) {
h := &Handler{UploadPath: os.TempDir()}

cases := []struct {
url, errMsg string
}{
{"", ErrEmptyRemoteURL.Error()},
{"http://ovh.net/files/nonexistant1Mb.dat", "remote server returned non-OK status 404"},
{"/etc/passwd", `Get "/etc/passwd": unsupported protocol scheme ""`},
{"https://odysee.tv/../../../etc/passwd", "remote server returned non-OK status 400"},
{"http://nonexistenthost/some_file.mp4", `dial tcp: lookup nonexistenthost:`},
{"http://nonexistenthost/", "couldn't determine remote file name"},
{"/", "couldn't determine remote file name"},
}

for _, c := range cases {
t.Run(c.url, func(t *testing.T) {
r := CreatePublishRequest(t, nil, FormParam{remoteURLParam, c.url})

_, err := h.fetchFile(r, 20404)
assert.Regexp(t, fmt.Sprintf(".*%v.*", c.errMsg), err.Error())
})
}
}

func Test_fetchFile(t *testing.T) {
h := &Handler{UploadPath: os.TempDir()}
r := CreatePublishRequest(t, nil, FormParam{remoteURLParam, "http://ovh.net/files/1Mb.dat"})

f, err := h.fetchFile(r, 20404)
require.NoError(t, err)
assert.Regexp(t, "20404/.+_1Mb.dat$", f.Name())
s, err := os.Stat(f.Name())
require.NoError(t, err)
require.EqualValues(t, 125000, s.Size())
}

func Test_fetchFileEmptyRemoteFile(t *testing.T) {
ts := test.MockHTTPServer(nil)
defer ts.Close()
ts.NextResponse <- ""

h := &Handler{UploadPath: os.TempDir()}
r := CreatePublishRequest(t, nil, FormParam{remoteURLParam, fmt.Sprintf("%v/file.mp4", ts.URL)})

f, err := h.fetchFile(r, 20404)
require.EqualError(t, err, "remote file is empty")
assert.Nil(t, f)
}

func Test_fetchFileRemoteFileTooLarge(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Content-Length", fmt.Sprintf("%v", MaxRemoteFileSize+1))
}))
defer ts.Close()

h := &Handler{UploadPath: os.TempDir()}
r := CreatePublishRequest(t, nil, FormParam{remoteURLParam, fmt.Sprintf("%v/file.mp4", ts.URL)})

f, err := h.fetchFile(r, 20404)
require.EqualError(t, err, fmt.Sprintf("remote file is too large at %v bytes", MaxRemoteFileSize+1))
assert.Nil(t, f)
}
121 changes: 108 additions & 13 deletions app/publish/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"net/http"
"os"
"path"
"strconv"
"time"

"github.com/lbryio/lbrytv/app/auth"
"github.com/lbryio/lbrytv/app/proxy"
Expand All @@ -21,30 +23,35 @@ import (
"github.com/lbryio/lbrytv/internal/responses"

"github.com/gorilla/mux"
werrors "github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/ybbus/jsonrpc"
)

var logger = monitor.NewModuleLogger("publish")

var method = "publish"

const (
MaxRemoteFileSize = 5 << 30 // 4GB

// fileFieldName refers to the POST field containing file upload
fileFieldName = "file"
// jsonRPCFieldName is a name of the POST field containing JSONRPC request accompanying the uploaded file
jsonRPCFieldName = "json_payload"
opName = "publish"

fileNameParam = "file_path"

opName = "publish"
fileNameParam = "file_path"
remoteURLParam = "remote_url"
)

var ErrEmptyRemoteURL = werrors.New("empty remote url")

// Handler has path to save uploads to
type Handler struct {
UploadPath string
}

var method = "publish"

// observeFailure requires metrics.MeasureMiddleware middleware to be present on the request
func observeFailure(d float64, kind string) {
metrics.ProxyE2ECallDurations.WithLabelValues(method).Observe(d)
Expand Down Expand Up @@ -78,14 +85,24 @@ func (h Handler) Handle(w http.ResponseWriter, r *http.Request) {

log := logger.WithFields(logrus.Fields{"user_id": user.ID, "method_handler": method})

f, err := h.saveFile(r, user.ID)
f, err := h.fetchFile(r, user.ID)
if err != nil {
log.Error(err)
monitor.ErrorToSentry(err)
w.Write(rpcerrors.NewInternalError(err).JSON())
observeFailure(metrics.GetDuration(r), metrics.FailureKindInternal)
return
if err == ErrEmptyRemoteURL {
f, err = h.saveFile(r, user.ID)
if err != nil {
log.Error(err)
monitor.ErrorToSentry(err)
w.Write(rpcerrors.NewInternalError(err).JSON())
observeFailure(metrics.GetDuration(r), metrics.FailureKindInternal)
return
}
} else {
w.Write(rpcerrors.NewInternalError(err).JSON())
observeFailure(metrics.GetDuration(r), metrics.FailureKindClient)
return
}
}

defer func() {
op := metrics.StartOperation(opName, "remove_file")
defer op.End()
Expand Down Expand Up @@ -155,8 +172,12 @@ func getCaller(sdkAddress, filename string, userID int, qCache cache.QueryCache)
// CanHandle checks if http.Request contains POSTed data in an accepted format.
// Supposed to be used in gorilla mux router MatcherFunc.
func (h Handler) CanHandle(r *http.Request, _ *mux.RouteMatch) bool {
_, _, err := r.FormFile(fileFieldName)
return !errors.Is(err, http.ErrMissingFile) && r.FormValue(jsonRPCFieldName) != ""
err := r.ParseMultipartForm(32 << 20)
if err != nil {
return false
}

return r.FormValue(jsonRPCFieldName) != ""
}

func (h Handler) saveFile(r *http.Request, userID int) (*os.File, error) {
Expand Down Expand Up @@ -200,3 +221,77 @@ func (h Handler) createFile(userID int, origFilename string) (*os.File, error) {
}
return ioutil.TempFile(path, fmt.Sprintf("*_%s", origFilename))
}

// fetchFile downloads remote file from the URL provided by client.
// ErrEmptyRemoteURL is a standard error when no URL has been provided.
func (h Handler) fetchFile(r *http.Request, userID int) (*os.File, error) {
log := logger.WithFields(logrus.Fields{"user_id": userID, "method_handler": method})

err := r.ParseMultipartForm(32 << 20)
if err != nil {
return nil, err
}

url := r.Form.Get(remoteURLParam)
if url == "" {
return nil, ErrEmptyRemoteURL
}

r, err = http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, werrors.Wrap(err, "error creating request")
}
fname := path.Base(r.URL.Path)
if fname == "/" || fname == "." {
return nil, fmt.Errorf("couldn't determine remote file name")
}

timeout := sdkrouter.RPCTimeout - (120 * time.Second)
c := &http.Client{
Timeout: timeout,
}
resp, err := c.Do(r)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("remote server returned non-OK status %v", resp.StatusCode)
}

defer resp.Body.Close()

clh := resp.Header.Get("Content-Length")
cl, err := strconv.Atoi(clh)
if err != nil {
return nil, werrors.Wrap(err, "cannot determine remote file size")
}
if cl >= MaxRemoteFileSize {
return nil, fmt.Errorf("remote file is too large at %v bytes", cl)
}
if cl == 0 {
return nil, werrors.New("remote file is empty")
}

f, err := h.createFile(userID, fname)
if err != nil {
return nil, err
}
log.Infof("processing remote file %v", fname)

numWritten, err := io.Copy(f, resp.Body)
if err != nil {
return nil, err
}
if numWritten == 0 {
f.Close()
os.Remove(f.Name())
return f, werrors.New("remote file is empty")
}
log.Infof("saved uploaded file %v (%v bytes written)", f.Name(), numWritten)

if err := f.Close(); err != nil {
return f, err
}

return f, nil
}
13 changes: 9 additions & 4 deletions app/publish/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
"github.com/stretchr/testify/require"
)

type FormParam [2]string

// CreatePublishRequest creates and returns a HTTP request providing data for the publishing endpoint.
func CreatePublishRequest(t *testing.T, data []byte) *http.Request {
func CreatePublishRequest(t *testing.T, data []byte, extra ...FormParam) *http.Request {
readSeeker := bytes.NewReader(data)
body := &bytes.Buffer{}

Expand All @@ -22,9 +24,12 @@ func CreatePublishRequest(t *testing.T, data []byte) *http.Request {
_, err = io.Copy(fileBody, readSeeker)
require.NoError(t, err)

jsonPayload, err := writer.CreateFormField(jsonRPCFieldName)
require.NoError(t, err)
jsonPayload.Write([]byte(expectedStreamCreateRequest))
extra = append([]FormParam{{jsonRPCFieldName, expectedStreamCreateRequest}}, extra...)
for _, kv := range extra {
fld, err := writer.CreateFormField(kv[0])
require.NoError(t, err)
fld.Write([]byte(kv[1]))
}

writer.Close()

Expand Down
4 changes: 2 additions & 2 deletions app/sdkrouter/sdkrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
ljsonrpc "github.com/lbryio/lbry.go/v2/extras/jsonrpc"
)

const RPCTimeout = 300 * time.Second
const RPCTimeout = 420 * time.Second

var logger = monitor.NewModuleLogger("sdkrouter")

Expand Down Expand Up @@ -131,7 +131,7 @@ func (r *Router) updateLoadAndMetrics() {

numWallets := walletList.TotalPages
logger.Log().Debugf("load update: considering %s with load %d", server.Address, numWallets)
if best == nil || numWallets < min {
if (best == nil || numWallets < min) && !server.Private {
logger.Log().Debugf("load update: %s has least with %d", server.Address, numWallets)
best = server
min = numWallets
Expand Down
28 changes: 17 additions & 11 deletions app/sdkrouter/sdkrouter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/lbryio/lbrytv/apps/lbrytv/config"
"github.com/lbryio/lbrytv/internal/storage"
"github.com/lbryio/lbrytv/internal/test"
"github.com/lbryio/lbrytv/models"

"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -60,25 +61,30 @@ func TestLeastLoaded(t *testing.T) {
defer rpcServer2.Close()
rpcServer3 := test.MockHTTPServer(nil)
defer rpcServer3.Close()
rpcServerPvt := test.MockHTTPServer(nil)
defer rpcServerPvt.Close()

servers := map[string]string{
"srv1": rpcServer1.URL,
"srv2": rpcServer2.URL,
"srv3": rpcServer3.URL,
servers := []*models.LbrynetServer{
{Name: "srv1", Address: rpcServer1.URL},
{Name: "srv2", Address: rpcServer2.URL},
{Name: "srv3", Address: rpcServer3.URL},
{Name: "srv-pvt", Address: rpcServerPvt.URL, Private: true},
}
r := New(servers)
r := NewWithServers(servers...)

// try doing the load in increasing order
rpcServer1.NextResponse <- `{"result":{"total_pages":1}}`
rpcServer2.NextResponse <- `{"result":{"total_pages":2}}`
rpcServer3.NextResponse <- `{"result":{"total_pages":3}}`
rpcServerPvt.NextResponse <- `{"result":{"total_pages":5}}`
rpcServer1.NextResponse <- `{"result":{"total_pages":10}}`
rpcServer2.NextResponse <- `{"result":{"total_pages":20}}`
rpcServer3.NextResponse <- `{"result":{"total_pages":30}}`
r.updateLoadAndMetrics()
assert.Equal(t, "srv1", r.LeastLoaded().Name)

// now do the load in decreasing order
rpcServer1.NextResponse <- `{"result":{"total_pages":3}}`
rpcServer2.NextResponse <- `{"result":{"total_pages":2}}`
rpcServer3.NextResponse <- `{"result":{"total_pages":1}}`
rpcServer1.NextResponse <- `{"result":{"total_pages":30}}`
rpcServer2.NextResponse <- `{"result":{"total_pages":20}}`
rpcServer3.NextResponse <- `{"result":{"total_pages":10}}`
rpcServerPvt.NextResponse <- `{"result":{"total_pages":5}}`
r.updateLoadAndMetrics()
assert.Equal(t, "srv3", r.LeastLoaded().Name)

Expand Down
12 changes: 12 additions & 0 deletions apps/collector/migrations/0003_indexes_time_url.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- +migrate Up

-- +migrate StatementBegin
CREATE INDEX url_idx ON buffer_event(url);
CREATE INDEX time_idx ON buffer_event(time);
-- +migrate StatementEnd

-- +migrate Down
-- +migrate StatementBegin
DROP INDEX url_idx;
DROP INDEX time_idx;
-- +migrate StatementEnd

0 comments on commit a2726d1

Please sign in to comment.