Skip to content

Commit

Permalink
Merge branch 'feature/metrics'
Browse files Browse the repository at this point in the history
  • Loading branch information
anbsky committed Aug 15, 2019
2 parents 404ccdc + f575ed9 commit fc73536
Show file tree
Hide file tree
Showing 35 changed files with 1,199 additions and 543 deletions.
2 changes: 2 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ jobs:
docker:
- image: lbry/lbrytv-ci:latest
- image: lbry/lbrynet-tv:latest
environment:
SDK_LBRYUM_SERVERS: spv1.lbry.com:50001
- image: postgres:11-alpine
environment:
POSTGRES_USER: lbrytv
Expand Down
61 changes: 1 addition & 60 deletions api/handlers.go
Original file line number Diff line number Diff line change
@@ -1,81 +1,22 @@
package api

import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"

"github.com/lbryio/lbrytv/app/player"
"github.com/lbryio/lbrytv/app/proxy"
"github.com/lbryio/lbrytv/app/users"
"github.com/lbryio/lbrytv/config"
"github.com/lbryio/lbrytv/internal/monitor"

"github.com/gorilla/mux"
log "github.com/sirupsen/logrus"
)

var logger = monitor.NewModuleLogger("api")

// Index just serves a blank home page
// Index serves a blank home page
func Index(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusOK)
}

// Proxy takes client request body and feeds it to the proxy module
func Proxy(w http.ResponseWriter, req *http.Request) {
var accountID string

if req.Method == "OPTIONS" {
w.WriteHeader(http.StatusOK)
return
}
if req.Body == nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("empty request body"))
return
}

body, err := ioutil.ReadAll(req.Body)
if err != nil {
log.Panicf("error: %v", err.Error())
}

ur, err := proxy.UnmarshalRequest(body)
if err != nil {
response, _ := json.Marshal(proxy.NewErrorResponse(err.Error(), proxy.ErrProxy))
w.WriteHeader(http.StatusBadRequest)
w.Write(response)
return
}

if config.AccountsEnabled() {
accountID, err = users.GetAccountIDFromRequest(req)
if err != nil {
response, _ := json.Marshal(proxy.NewErrorResponse(err.Error(), proxy.ErrProxyAuthFailed))
w.WriteHeader(http.StatusForbidden)
w.Write(response)
return
}
} else {
accountID = ""
}

lbrynetResponse, err := proxy.Proxy(ur, accountID)
if err != nil {
logger.LogF(monitor.F{"query": ur, "error": err}).Error("proxy errored")
response, _ := json.Marshal(proxy.NewErrorResponse(err.Error(), proxy.ErrProxy))
w.WriteHeader(http.StatusServiceUnavailable)
w.Write(response)
return
}

w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(http.StatusOK)
w.Write(lbrynetResponse)
}

func stream(uri string, w http.ResponseWriter, req *http.Request) {
err := player.PlayURI(uri, w, req)
// Only output error if player has not pushed anything to the client yet
Expand Down
66 changes: 4 additions & 62 deletions api/handlers_test.go
Original file line number Diff line number Diff line change
@@ -1,78 +1,20 @@
package api

import (
"bytes"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"

ljsonrpc "github.com/lbryio/lbry.go/extras/jsonrpc"
"github.com/stretchr/testify/assert"
"github.com/ybbus/jsonrpc"
)

func TestProxyOptions(t *testing.T) {
request, _ := http.NewRequest("OPTIONS", "/api/proxy", nil)
rr := httptest.NewRecorder()
http.HandlerFunc(Proxy).ServeHTTP(rr, request)
response := rr.Result()
assert.Equal(t, http.StatusOK, response.StatusCode)
}

func TestProxyNilQuery(t *testing.T) {
request, _ := http.NewRequest("POST", "/api/proxy", nil)
rr := httptest.NewRecorder()
http.HandlerFunc(Proxy).ServeHTTP(rr, request)
assert.Equal(t, http.StatusBadRequest, rr.Code)
assert.Equal(t, "empty request body", rr.Body.String())
}

func TestProxyInvalidQuery(t *testing.T) {
var parsedResponse jsonrpc.RPCResponse

request, _ := http.NewRequest("POST", "/api/proxy", bytes.NewBuffer([]byte("yo")))
rr := httptest.NewRecorder()
http.HandlerFunc(Proxy).ServeHTTP(rr, request)
assert.Equal(t, http.StatusBadRequest, rr.Code)
err := json.Unmarshal(rr.Body.Bytes(), &parsedResponse)
if err != nil {
panic(err)
}
assert.True(t, strings.HasPrefix(parsedResponse.Error.Message, "client json parse error: invalid character 'y'"))
}

func TestProxy(t *testing.T) {
var query *jsonrpc.RPCRequest
var queryBody []byte
var parsedResponse jsonrpc.RPCResponse
resolveResponse := make(ljsonrpc.ResolveResponse)

query = jsonrpc.NewRequest("resolve", map[string]string{"urls": "one"})
queryBody, _ = json.Marshal(query)
request, _ := http.NewRequest("POST", "/api/proxy", bytes.NewBuffer(queryBody))
rr := httptest.NewRecorder()

http.HandlerFunc(Proxy).ServeHTTP(rr, request)

assert.Equal(t, http.StatusOK, rr.Code)
assert.Equal(t, "application/json; charset=utf-8", rr.HeaderMap["Content-Type"][0])
err := json.Unmarshal(rr.Body.Bytes(), &parsedResponse)
if err != nil {
panic(err)
}
ljsonrpc.Decode(parsedResponse.Result, &resolveResponse)
assert.Equal(t, "one", resolveResponse["one"].Name)
}

func TestContentByURL_NoPayment(t *testing.T) {
req, _ := http.NewRequest("GET", "http://localhost:40080/content/url", nil)
req.URL.RawQuery = "pra-onde-vamos-em-2018-seguran-a-online#3a508cce1fda3b7c1a2502cb4323141d40a2cf0b"
req.Header.Add("Range", "bytes=0-1023")
r, _ := http.NewRequest("GET", "http://localhost:40080/content/url", nil)
r.URL.RawQuery = "pra-onde-vamos-em-2018-seguran-a-online#3a508cce1fda3b7c1a2502cb4323141d40a2cf0b"
r.Header.Add("Range", "bytes=0-1023")
rr := httptest.NewRecorder()
http.HandlerFunc(ContentByURL).ServeHTTP(rr, req)
http.HandlerFunc(ContentByURL).ServeHTTP(rr, r)

assert.Equal(t, http.StatusPaymentRequired, rr.Code)
_, err := rr.Body.ReadByte()
Expand Down
10 changes: 7 additions & 3 deletions api/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package api
import (
"net/http"

"github.com/lbryio/lbrytv/app/proxy"

sentryhttp "github.com/getsentry/sentry-go/http"
"github.com/gorilla/mux"
)
Expand All @@ -13,10 +15,12 @@ func captureErrors(handler func(http.ResponseWriter, *http.Request)) http.Handle
}

// InstallRoutes sets up global API handlers
func InstallRoutes(r *mux.Router) {
func InstallRoutes(ps *proxy.Service, r *mux.Router) {
r.HandleFunc("/", Index)
r.HandleFunc("/api/proxy", captureErrors(Proxy))
r.HandleFunc("/api/proxy/", captureErrors(Proxy))

proxyHandler := &proxy.RequestHandler{Service: ps}
r.HandleFunc("/api/proxy", captureErrors(proxyHandler.Handle))
r.HandleFunc("/api/proxy/", captureErrors(proxyHandler.Handle))
r.HandleFunc("/content/claims/{uri}/{claim}/{filename}", captureErrors(ContentByClaimsURI))
r.HandleFunc("/content/url", captureErrors(ContentByURL))
}
2 changes: 1 addition & 1 deletion app/player/player.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type reflectedStream struct {
// - Seek simply implements io.Seeker
// - Read calculates boundaries and finds blobs that contain the requested stream range,
// then calls streamBlobs, which sequentially downloads and decrypts requested blobs
func PlayURI(uri string, w http.ResponseWriter, req *http.Request) (err error) {
func PlayURI(uri string, w http.ResponseWriter, req *http.Request) error {
rs, err := newReflectedStream(uri)
if err != nil {
return err
Expand Down
16 changes: 8 additions & 8 deletions app/player/player_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const streamURL = "what#6769855a9aa43b67086f9ff3c1a5bacb5698a27a"
// An MP4 file, size: 128791189 bytes, blobs: 63
const sizedStreamURL = "known-size#0590f924bbee6627a2e79f7f2ff7dfb50bf2877c"

func Test_newReflectedStream(t *testing.T) {
func TestNewReflectedStream(t *testing.T) {
rs, err := newReflectedStream(streamURL)
if err != nil {
// If this fails, no point running other tests
Expand All @@ -26,12 +26,12 @@ func Test_newReflectedStream(t *testing.T) {
rs.SdHash)
}

func Test_newReflectedStream_emptyURL(t *testing.T) {
func TestNewReflectedStreamWithEmptyURL(t *testing.T) {
_, err := newReflectedStream("")
assert.NotNil(t, err)
}

func TestReflectedStream_fetchData(t *testing.T) {
func TestNewReflectedStreamFetchData(t *testing.T) {
rs, err := newReflectedStream(streamURL)
err = rs.fetchData()
if err != nil {
Expand All @@ -42,7 +42,7 @@ func TestReflectedStream_fetchData(t *testing.T) {
assert.Equal(t, 38, rs.SDBlob.BlobInfos[38].BlobNum)
}

func TestPlayURI_0B_52B(t *testing.T) {
func TestPlayURIFromByte0ToByte52(t *testing.T) {
var err error
r, _ := http.NewRequest("", "", nil)
r.Header.Add("Range", "bytes=0-52")
Expand Down Expand Up @@ -73,7 +73,7 @@ func TestPlayURI_0B_52B(t *testing.T) {
assert.Equal(t, first52, responseFirst52)
}

func TestPlayURI_156B_259B(t *testing.T) {
func TestPlayURIFromByte156ToByte259(t *testing.T) {
var err error
r, _ := http.NewRequest("", "", nil)
r.Header.Add("Range", "bytes=156-259")
Expand Down Expand Up @@ -108,7 +108,7 @@ func TestPlayURI_156B_259B(t *testing.T) {
assert.Equal(t, responseData[104:], emptyData)
}

func TestPlayURI_4MB_4MB105B(t *testing.T) {
func TestPlayURI104BytesFrom4thMB(t *testing.T) {
var err error
r, _ := http.NewRequest("", "", nil)
r.Header.Add("Range", "bytes=4000000-4000104")
Expand Down Expand Up @@ -143,7 +143,7 @@ func TestPlayURI_4MB_4MB105B(t *testing.T) {
assert.Equal(t, responseData[106:], emptyData)
}

func TestPlayURI_LastBytes(t *testing.T) {
func TestPlayURILastBytes(t *testing.T) {
var err error
r, _ := http.NewRequest("", "", nil)
r.Header.Add("Range", "bytes=128791089-")
Expand Down Expand Up @@ -174,7 +174,7 @@ func TestPlayURI_LastBytes(t *testing.T) {
assert.Equal(t, expectedData, responseData)
}

func TestPlayURI_Big(t *testing.T) {
func TestPlayURIFromZeroTo100KB(t *testing.T) {
var err error
r, _ := http.NewRequest("", "", nil)
r.Header.Add("Range", "bytes=0-100000")
Expand Down
18 changes: 18 additions & 0 deletions app/player/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package player

type Metrics struct {
ServingStreamsCount int
}

type PlayerService struct {
Metrics *Metrics
}

type Player struct {
reflectedStream
URI string
}

func (ps *PlayerService) NewPlayer(uri string) *Player {
return &Player{URI: uri}
}
23 changes: 18 additions & 5 deletions api/accounts_test.go → app/proxy/accounts_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package api
package proxy

import (
"bytes"
Expand All @@ -25,8 +25,13 @@ const dummyServerURL = "http://127.0.0.1:59999"
const proxySuffix = "/api/proxy"
const testSetupWait = 200 * time.Millisecond

var svc *Service

func TestMain(m *testing.M) {
// call flag.Parse() here if TestMain uses flags
go launchGrumpyServer()
svc = NewService(config.GetLbrynet())

config.Override("AccountsEnabled", true)
defer config.RestoreOverridden()

Expand All @@ -38,6 +43,7 @@ func TestMain(m *testing.M) {
}
c, connCleanup := storage.CreateTestConn(params)
c.SetDefaultConnection()

defer connCleanup()
defer lbrynet.RemoveAccount(dummyUserID)

Expand Down Expand Up @@ -117,7 +123,8 @@ func TestWithValidAuthToken(t *testing.T) {
r.Header.Add("X-Lbry-Auth-Token", "d94ab9865f8416d107935d2ca644509c")

rr := httptest.NewRecorder()
http.HandlerFunc(Proxy).ServeHTTP(rr, r)
handler := &RequestHandler{svc}
handler.Handle(rr, r)
require.Equal(t, http.StatusOK, rr.Code)
err := json.Unmarshal(rr.Body.Bytes(), &response)
require.Nil(t, err)
Expand Down Expand Up @@ -171,9 +178,11 @@ func TestWithValidAuthTokenConcurrent(t *testing.T) {
qBody, _ := json.Marshal(q)
r, _ := http.NewRequest("POST", proxySuffix, bytes.NewBuffer(qBody))
r.Header.Add("X-Lbry-Auth-Token", "d94ab9865f8416d107935d2ca644509c")

rr := httptest.NewRecorder()
handler := &RequestHandler{svc}
handler.Handle(rr, r)

http.HandlerFunc(Proxy).ServeHTTP(rr, r)
require.Equal(t, http.StatusOK, rr.Code)
json.Unmarshal(rr.Body.Bytes(), &response)
require.Nil(t, response.Error)
Expand Down Expand Up @@ -208,7 +217,9 @@ func TestWithWrongAuthToken(t *testing.T) {
r.Header.Add("X-Lbry-Auth-Token", "xXxXxXx")

rr := httptest.NewRecorder()
http.HandlerFunc(Proxy).ServeHTTP(rr, r)
handler := &RequestHandler{svc}
handler.Handle(rr, r)

assert.Equal(t, http.StatusForbidden, rr.Code)
err := json.Unmarshal(rr.Body.Bytes(), &response)
require.Nil(t, err)
Expand All @@ -235,7 +246,9 @@ func TestWithoutToken(t *testing.T) {
r, _ := http.NewRequest("POST", proxySuffix, bytes.NewBuffer(qBody))

rr := httptest.NewRecorder()
http.HandlerFunc(Proxy).ServeHTTP(rr, r)
handler := &RequestHandler{svc}
handler.Handle(rr, r)

require.Equal(t, http.StatusOK, rr.Code)
err := json.Unmarshal(rr.Body.Bytes(), &response)

Expand Down
Loading

0 comments on commit fc73536

Please sign in to comment.