Skip to content

Commit

Permalink
Merge pull request #99 from lbryio/bugfix/wallet
Browse files Browse the repository at this point in the history
Bugfix/wallet
  • Loading branch information
anbsky committed Oct 25, 2019
2 parents ed91ed0 + 6744cb5 commit e084685
Show file tree
Hide file tree
Showing 21 changed files with 590 additions and 385 deletions.
138 changes: 138 additions & 0 deletions api/benchmarks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package api

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"log"
"math/rand"
"net/http"
"net/http/httptest"
"os"
"testing"
"time"

"github.com/lbryio/lbrytv/app/proxy"
"github.com/lbryio/lbrytv/app/users"
"github.com/lbryio/lbrytv/config"
"github.com/lbryio/lbrytv/internal/lbrynet"
"github.com/lbryio/lbrytv/internal/storage"
"github.com/lbryio/lbrytv/models"

"github.com/stretchr/testify/require"
"github.com/ybbus/jsonrpc"
)

const proxySuffix = "/api/v1/proxy"

func launchAuthenticatingAPIServer() *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
t := r.PostFormValue("auth_token")

w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(http.StatusOK)

reply := fmt.Sprintf(`
{
"success": true,
"error": null,
"data": {
"id": %v,
"language": "en",
"given_name": null,
"family_name": null,
"created_at": "2019-01-17T12:13:06Z",
"updated_at": "2019-05-02T13:57:59Z",
"invited_by_id": null,
"invited_at": null,
"invites_remaining": 0,
"invite_reward_claimed": false,
"is_email_enabled": true,
"manual_approval_user_id": 837139,
"reward_status_change_trigger": "manual",
"primary_email": "user@domain.com",
"has_verified_email": true,
"is_identity_verified": false,
"is_reward_approved": true,
"groups": []
}
}`, t)
w.Write([]byte(reply))
}))
}

func TestMain(m *testing.M) {
dbConfig := config.GetDatabase()
params := storage.ConnParams{
Connection: dbConfig.Connection,
DBName: dbConfig.DBName,
Options: dbConfig.Options,
}
dbConn, connCleanup := storage.CreateTestConn(params)
dbConn.SetDefaultConnection()
defer connCleanup()

code := m.Run()

os.Exit(code)
}

func setupDBTables() {
storage.Conn.Truncate([]string{"users"})
}

func BenchmarkWalletCommands(b *testing.B) {
setupDBTables()

ts := launchAuthenticatingAPIServer()
defer ts.Close()
config.Override("InternalAPIHost", ts.URL)
defer config.RestoreOverridden()

walletsNum := 30
wallets := make([]*models.User, walletsNum)
svc := users.NewWalletService()

svc.Logger.Disable()
lbrynet.Logger.Disable()
log.SetOutput(ioutil.Discard)

rand.Seed(time.Now().UnixNano())

for i := 0; i < walletsNum; i++ {
uid := int(rand.Int31())
u, err := svc.Retrieve(users.Query{Token: fmt.Sprintf("%v", uid)})
require.NoError(b, err, errors.Unwrap(err))
require.NotNil(b, u)
wallets[i] = u
}

handler := proxy.NewRequestHandler(proxy.NewService(config.GetLbrynet()))

b.SetParallelism(30)
b.ResetTimer()

b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
u := wallets[rand.Intn(len(wallets))]

var response jsonrpc.RPCResponse
q := jsonrpc.NewRequest("wallet_balance", map[string]string{"wallet_id": u.WalletID})

qBody, _ := json.Marshal(q)
r, _ := http.NewRequest("POST", proxySuffix, bytes.NewBuffer(qBody))
r.Header.Add("X-Lbry-Auth-Token", fmt.Sprintf("%v", u.ID))

rr := httptest.NewRecorder()
handler.Handle(rr, r)

require.Equal(b, http.StatusOK, rr.Code)
json.Unmarshal(rr.Body.Bytes(), &response)
require.Nil(b, response.Error)
}
})

b.StopTimer()
}
8 changes: 1 addition & 7 deletions api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,21 @@ import (

"github.com/lbryio/lbrytv/app/player"
"github.com/lbryio/lbrytv/config"
"github.com/lbryio/lbrytv/internal/metrics"
"github.com/lbryio/lbrytv/internal/monitor"

"github.com/gorilla/mux"
)

var logger = monitor.NewModuleLogger("api")

var Collector = metrics.NewCollector()

// Index serves a blank home page
func Index(w http.ResponseWriter, req *http.Request) {
http.Redirect(w, req, config.GetProjectURL(), http.StatusSeeOther)
}

func stream(uri string, w http.ResponseWriter, req *http.Request) {
Collector.MetricsIncrement("player_instances_count", metrics.One)
Collector.MetricsIncrement("player_streams_total", metrics.One)
err := player.PlayURI(uri, w, req)
Collector.MetricsDecrement("player_instances_count", metrics.One)
// Only output error if player has not pushed anything to the client yet

if err != nil {
if err.Error() == "paid stream" {
w.WriteHeader(http.StatusPaymentRequired)
Expand Down
129 changes: 84 additions & 45 deletions app/player/player.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ import (
"sort"
"time"

"github.com/lbryio/lbrytv/app/users"
"github.com/lbryio/lbrytv/config"
"github.com/lbryio/lbrytv/internal/lbrynet"
"github.com/lbryio/lbrytv/internal/metrics"
"github.com/lbryio/lbrytv/internal/monitor"

ljsonrpc "github.com/lbryio/lbry.go/v2/extras/jsonrpc"
"github.com/lbryio/lbry.go/v2/stream"
log "github.com/sirupsen/logrus"
)

const reflectorURL = "http://blobs.lbry.io/"
Expand All @@ -32,6 +33,10 @@ type reflectedStream struct {
seekOffset int64
}

// Logger is a package-wide logger.
// Warning: will generate a lot of output if DEBUG loglevel is enabled.
var Logger = monitor.NewModuleLogger("player")

// PlayURI downloads and streams LBRY video content located at uri and delimited by rangeHeader
// (use rangeHeader := request.Header.Get("Range")).
// Streaming works like this:
Expand All @@ -43,16 +48,24 @@ type reflectedStream struct {
// - Read calculates boundaries and finds blobs that contain the requested stream range,
// then calls streamBlobs, which sequentially downloads and decrypts requested blobs
func PlayURI(uri string, w http.ResponseWriter, req *http.Request) error {
rs, err := newReflectedStream(uri)
metrics.PlayerStreamsRunning.Inc()
defer metrics.PlayerStreamsRunning.Dec()

s, err := newReflectedStream(uri)
if err != nil {
return err
}
err = rs.fetchData()
err = s.fetchData()
if err != nil {
return err
}
rs.prepareWriter(w)
ServeContent(w, req, "test", time.Time{}, rs)
s.prepareWriter(w)
Logger.LogF(monitor.F{
"stream": s.URI,
"remote_ip": users.GetIPAddressForRequest(req),
}).Info("stream requested")
ServeContent(w, req, "test", time.Time{}, s)

return err
}

Expand All @@ -77,14 +90,32 @@ func (s *reflectedStream) Read(p []byte) (n int, err error) {
startOffsetInBlob = s.seekOffset - int64(blobNum*stream.MaxBlobSize) + int64(blobNum)
}

start := time.Now()
n, err = s.streamBlob(blobNum, startOffsetInBlob, p)

monitor.Logger.WithFields(log.Fields{
"read_buffer_length": bufferLen,
"blob_num": blobNum,
"current_offset": s.seekOffset,
"offset_in_blob": startOffsetInBlob,
}).Debugf("read %v bytes (%v..%v) from blob stream", n, s.seekOffset, seekOffsetEnd)
if err != nil {
metrics.PlayerFailuresCount.Inc()
Logger.LogF(monitor.F{
"stream": s.URI,
"num": fmt.Sprintf("%v/%v", blobNum, len(s.SDBlob.BlobInfos)),
"current_offset": s.seekOffset,
"offset_in_blob": startOffsetInBlob,
}).Errorf("failed to read from blob stream after %vs: %v", time.Since(start).Seconds(), err)
monitor.CaptureException(err, map[string]string{
"stream": s.URI,
"num": fmt.Sprintf("%v/%v", blobNum, len(s.SDBlob.BlobInfos)),
"current_offset": fmt.Sprintf("%v", s.seekOffset),
"offset_in_blob": fmt.Sprintf("%v", startOffsetInBlob),
})
} else {
metrics.PlayerSuccessesCount.Inc()
Logger.LogF(monitor.F{
"buffer_len": bufferLen,
"num": fmt.Sprintf("%v/%v", blobNum, len(s.SDBlob.BlobInfos)),
"current_offset": s.seekOffset,
"offset_in_blob": startOffsetInBlob,
}).Debugf("read %v bytes (%v..%v) from blob stream", n, s.seekOffset, seekOffsetEnd)
}

s.seekOffset += int64(n)
return n, err
Expand Down Expand Up @@ -136,7 +167,7 @@ func (s *reflectedStream) resolve(client *ljsonrpc.Client) error {
s.ContentType = stream.Source.MediaType
s.Size = int64(stream.Source.Size)

monitor.Logger.WithFields(log.Fields{
Logger.LogF(monitor.F{
"sd_hash": fmt.Sprintf("%s", s.SdHash),
"uri": s.URI,
"content_type": s.ContentType,
Expand All @@ -149,7 +180,7 @@ func (s *reflectedStream) fetchData() error {
if s.SdHash == "" {
return errors.New("no sd hash set, call `resolve` first")
}
monitor.Logger.WithFields(log.Fields{
Logger.LogF(monitor.F{
"uri": s.URI, "url": s.URL(),
}).Debug("requesting stream data")

Expand Down Expand Up @@ -188,10 +219,10 @@ func (s *reflectedStream) fetchData() error {
})
s.SDBlob = sdb

monitor.Logger.WithFields(log.Fields{
"blobs_number": len(sdb.BlobInfos),
"stream_size": s.Size,
"uri": s.URI,
Logger.LogF(monitor.F{
"blobs": len(sdb.BlobInfos),
"size": s.Size,
"uri": s.URI,
}).Debug("got stream data")
return nil
}
Expand All @@ -200,67 +231,75 @@ func (s *reflectedStream) prepareWriter(w http.ResponseWriter) {
w.Header().Set("Content-Type", s.ContentType)
}

func (s *reflectedStream) getBlob(url string) (*http.Response, error) {
request, _ := http.NewRequest("GET", url, nil)
client := http.Client{Timeout: time.Second * time.Duration(config.GetBlobDownloadTimeout())}
r, err := client.Do(request)
return r, err
}

func (s *reflectedStream) streamBlob(blobNum int, startOffsetInBlob int64, dest []byte) (n int, err error) {
bi := s.SDBlob.BlobInfos[blobNum]
logBlobNum := fmt.Sprintf("%v/%v", bi.BlobNum, len(s.SDBlob.BlobInfos))

if n > 0 {
startOffsetInBlob = 0
}
url := blobInfoURL(bi)

monitor.Logger.WithFields(log.Fields{
"url": url,
"stream": s.URI,
"blob_num": bi.BlobNum,
Logger.LogF(monitor.F{
"stream": s.URI,
"url": url,
"num": logBlobNum,
}).Debug("requesting a blob")
start := time.Now()

resp, err := http.Get(url)
resp, err := s.getBlob(url)
if err != nil {
return 0, err
}
defer resp.Body.Close()

monitor.Logger.WithFields(log.Fields{
"stream": s.URI,
"blob_num": bi.BlobNum,
"time_elapsed": time.Since(start),
}).Debug("done downloading a blob")

if blobNum == 0 {
monitor.Logger.WithFields(log.Fields{
"stream": s.URI,
"first_blob_time": time.Since(start).Seconds(),
}).Info("stream playback requested")
}

if resp.StatusCode == http.StatusOK {
start := time.Now()

encryptedBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
return 0, err
}

message := "done downloading a blob"
elapsedDLoad := time.Since(start).Seconds()
metrics.PlayerBlobDownloadDurations.Observe(elapsedDLoad)
if blobNum == 0 {
message += ", starting stream playback"
}
Logger.LogF(monitor.F{
"stream": s.URI,
"num": logBlobNum,
"elapsed": elapsedDLoad,
}).Info(message)

start = time.Now()
decryptedBody, err := stream.DecryptBlob(stream.Blob(encryptedBody), s.SDBlob.Key, bi.IV)
if err != nil {
return 0, err
}

endOffsetInBlob := int64(len(dest)) + startOffsetInBlob
if endOffsetInBlob > int64(len(decryptedBody)) {
endOffsetInBlob = int64(len(decryptedBody))
}
elapsedDecode := time.Since(start).Seconds()
metrics.PlayerBlobDecodeDurations.Observe(elapsedDecode)

thisN := copy(dest, decryptedBody[startOffsetInBlob:endOffsetInBlob])
n += thisN

monitor.Logger.WithFields(log.Fields{
"stream": s.URI,
"blob_num": bi.BlobNum,
"bytes_written": n,
"time_elapsed": time.Since(start),
"start_offset": startOffsetInBlob,
"end_offset": endOffsetInBlob,
Logger.LogF(monitor.F{
"stream": s.URI,
"num": logBlobNum,
"written": n,
"elapsed": elapsedDecode,
"start_offset": startOffsetInBlob,
"end_offset": endOffsetInBlob,
}).Debug("done streaming a blob")
} else {
return n, fmt.Errorf("server responded with an unexpected status (%v)", resp.Status)
Expand Down
Loading

0 comments on commit e084685

Please sign in to comment.