Skip to content

Commit

Permalink
Improve player metrics and error logging
Browse files Browse the repository at this point in the history
  • Loading branch information
anbsky committed Oct 21, 2019
1 parent 47bcf99 commit 6744cb5
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 70 deletions.
9 changes: 0 additions & 9 deletions api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ package api
import (
"fmt"
"net/http"
"time"

"github.com/lbryio/lbrytv/app/player"
"github.com/lbryio/lbrytv/config"
"github.com/lbryio/lbrytv/internal/metrics"
"github.com/lbryio/lbrytv/internal/monitor"

"github.com/gorilla/mux"
Expand All @@ -21,23 +19,16 @@ func Index(w http.ResponseWriter, req *http.Request) {
}

func stream(uri string, w http.ResponseWriter, req *http.Request) {
metrics.PlayerStreamsRunning.Inc()
defer metrics.PlayerStreamsRunning.Dec()
start := time.Now()
err := player.PlayURI(uri, w, req)
duration := time.Now().Sub(start).Seconds()

if err != nil {
metrics.PlayerFailedDurations.Observe(duration)
if err.Error() == "paid stream" {
w.WriteHeader(http.StatusPaymentRequired)
} else {
w.WriteHeader(http.StatusInternalServerError)
monitor.CaptureException(err, map[string]string{"uri": uri})
w.Write([]byte(err.Error()))
}
} else {
metrics.PlayerSuccessDurations.Observe(duration)
}
}

Expand Down
129 changes: 84 additions & 45 deletions app/player/player.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ import (
"sort"
"time"

"github.com/lbryio/lbrytv/app/users"
"github.com/lbryio/lbrytv/config"
"github.com/lbryio/lbrytv/internal/lbrynet"
"github.com/lbryio/lbrytv/internal/metrics"
"github.com/lbryio/lbrytv/internal/monitor"

ljsonrpc "github.com/lbryio/lbry.go/v2/extras/jsonrpc"
"github.com/lbryio/lbry.go/v2/stream"
log "github.com/sirupsen/logrus"
)

const reflectorURL = "http://blobs.lbry.io/"
Expand All @@ -32,6 +33,10 @@ type reflectedStream struct {
seekOffset int64
}

// Logger is a package-wide logger.
// Warning: will generate a lot of output if DEBUG loglevel is enabled.
var Logger = monitor.NewModuleLogger("player")

// PlayURI downloads and streams LBRY video content located at uri and delimited by rangeHeader
// (use rangeHeader := request.Header.Get("Range")).
// Streaming works like this:
Expand All @@ -43,16 +48,24 @@ type reflectedStream struct {
// - Read calculates boundaries and finds blobs that contain the requested stream range,
// then calls streamBlobs, which sequentially downloads and decrypts requested blobs
func PlayURI(uri string, w http.ResponseWriter, req *http.Request) error {
rs, err := newReflectedStream(uri)
metrics.PlayerStreamsRunning.Inc()
defer metrics.PlayerStreamsRunning.Dec()

s, err := newReflectedStream(uri)
if err != nil {
return err
}
err = rs.fetchData()
err = s.fetchData()
if err != nil {
return err
}
rs.prepareWriter(w)
ServeContent(w, req, "test", time.Time{}, rs)
s.prepareWriter(w)
Logger.LogF(monitor.F{
"stream": s.URI,
"remote_ip": users.GetIPAddressForRequest(req),
}).Info("stream requested")
ServeContent(w, req, "test", time.Time{}, s)

return err
}

Expand All @@ -77,14 +90,32 @@ func (s *reflectedStream) Read(p []byte) (n int, err error) {
startOffsetInBlob = s.seekOffset - int64(blobNum*stream.MaxBlobSize) + int64(blobNum)
}

start := time.Now()
n, err = s.streamBlob(blobNum, startOffsetInBlob, p)

monitor.Logger.WithFields(log.Fields{
"read_buffer_length": bufferLen,
"blob_num": blobNum,
"current_offset": s.seekOffset,
"offset_in_blob": startOffsetInBlob,
}).Debugf("read %v bytes (%v..%v) from blob stream", n, s.seekOffset, seekOffsetEnd)
if err != nil {
metrics.PlayerFailuresCount.Inc()
Logger.LogF(monitor.F{
"stream": s.URI,
"num": fmt.Sprintf("%v/%v", blobNum, len(s.SDBlob.BlobInfos)),
"current_offset": s.seekOffset,
"offset_in_blob": startOffsetInBlob,
}).Errorf("failed to read from blob stream after %vs: %v", time.Since(start).Seconds(), err)
monitor.CaptureException(err, map[string]string{
"stream": s.URI,
"num": fmt.Sprintf("%v/%v", blobNum, len(s.SDBlob.BlobInfos)),
"current_offset": fmt.Sprintf("%v", s.seekOffset),
"offset_in_blob": fmt.Sprintf("%v", startOffsetInBlob),
})
} else {
metrics.PlayerSuccessesCount.Inc()
Logger.LogF(monitor.F{
"buffer_len": bufferLen,
"num": fmt.Sprintf("%v/%v", blobNum, len(s.SDBlob.BlobInfos)),
"current_offset": s.seekOffset,
"offset_in_blob": startOffsetInBlob,
}).Debugf("read %v bytes (%v..%v) from blob stream", n, s.seekOffset, seekOffsetEnd)
}

s.seekOffset += int64(n)
return n, err
Expand Down Expand Up @@ -136,7 +167,7 @@ func (s *reflectedStream) resolve(client *ljsonrpc.Client) error {
s.ContentType = stream.Source.MediaType
s.Size = int64(stream.Source.Size)

monitor.Logger.WithFields(log.Fields{
Logger.LogF(monitor.F{
"sd_hash": fmt.Sprintf("%s", s.SdHash),
"uri": s.URI,
"content_type": s.ContentType,
Expand All @@ -149,7 +180,7 @@ func (s *reflectedStream) fetchData() error {
if s.SdHash == "" {
return errors.New("no sd hash set, call `resolve` first")
}
monitor.Logger.WithFields(log.Fields{
Logger.LogF(monitor.F{
"uri": s.URI, "url": s.URL(),
}).Debug("requesting stream data")

Expand Down Expand Up @@ -188,10 +219,10 @@ func (s *reflectedStream) fetchData() error {
})
s.SDBlob = sdb

monitor.Logger.WithFields(log.Fields{
"blobs_number": len(sdb.BlobInfos),
"stream_size": s.Size,
"uri": s.URI,
Logger.LogF(monitor.F{
"blobs": len(sdb.BlobInfos),
"size": s.Size,
"uri": s.URI,
}).Debug("got stream data")
return nil
}
Expand All @@ -200,67 +231,75 @@ func (s *reflectedStream) prepareWriter(w http.ResponseWriter) {
w.Header().Set("Content-Type", s.ContentType)
}

func (s *reflectedStream) getBlob(url string) (*http.Response, error) {
request, _ := http.NewRequest("GET", url, nil)
client := http.Client{Timeout: time.Second * time.Duration(config.GetBlobDownloadTimeout())}
r, err := client.Do(request)
return r, err
}

func (s *reflectedStream) streamBlob(blobNum int, startOffsetInBlob int64, dest []byte) (n int, err error) {
bi := s.SDBlob.BlobInfos[blobNum]
logBlobNum := fmt.Sprintf("%v/%v", bi.BlobNum, len(s.SDBlob.BlobInfos))

if n > 0 {
startOffsetInBlob = 0
}
url := blobInfoURL(bi)

monitor.Logger.WithFields(log.Fields{
"url": url,
"stream": s.URI,
"blob_num": bi.BlobNum,
Logger.LogF(monitor.F{
"stream": s.URI,
"url": url,
"num": logBlobNum,
}).Debug("requesting a blob")
start := time.Now()

resp, err := http.Get(url)
resp, err := s.getBlob(url)
if err != nil {
return 0, err
}
defer resp.Body.Close()

monitor.Logger.WithFields(log.Fields{
"stream": s.URI,
"blob_num": bi.BlobNum,
"time_elapsed": time.Since(start),
}).Debug("done downloading a blob")

if blobNum == 0 {
monitor.Logger.WithFields(log.Fields{
"stream": s.URI,
"first_blob_time": time.Since(start).Seconds(),
}).Info("stream playback requested")
}

if resp.StatusCode == http.StatusOK {
start := time.Now()

encryptedBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
return 0, err
}

message := "done downloading a blob"
elapsedDLoad := time.Since(start).Seconds()
metrics.PlayerBlobDownloadDurations.Observe(elapsedDLoad)
if blobNum == 0 {
message += ", starting stream playback"
}
Logger.LogF(monitor.F{
"stream": s.URI,
"num": logBlobNum,
"elapsed": elapsedDLoad,
}).Info(message)

start = time.Now()
decryptedBody, err := stream.DecryptBlob(stream.Blob(encryptedBody), s.SDBlob.Key, bi.IV)
if err != nil {
return 0, err
}

endOffsetInBlob := int64(len(dest)) + startOffsetInBlob
if endOffsetInBlob > int64(len(decryptedBody)) {
endOffsetInBlob = int64(len(decryptedBody))
}
elapsedDecode := time.Since(start).Seconds()
metrics.PlayerBlobDecodeDurations.Observe(elapsedDecode)

thisN := copy(dest, decryptedBody[startOffsetInBlob:endOffsetInBlob])
n += thisN

monitor.Logger.WithFields(log.Fields{
"stream": s.URI,
"blob_num": bi.BlobNum,
"bytes_written": n,
"time_elapsed": time.Since(start),
"start_offset": startOffsetInBlob,
"end_offset": endOffsetInBlob,
Logger.LogF(monitor.F{
"stream": s.URI,
"num": logBlobNum,
"written": n,
"elapsed": elapsedDecode,
"start_offset": startOffsetInBlob,
"end_offset": endOffsetInBlob,
}).Debug("done streaming a blob")
} else {
return n, fmt.Errorf("server responded with an unexpected status (%v)", resp.Status)
Expand Down
6 changes: 6 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func (c *ConfigWrapper) Init() {
c.Viper.SetDefault("Address", ":8080")
c.Viper.SetDefault("Host", "http://localhost:8080")
c.Viper.SetDefault("BaseContentURL", "http://localhost:8080/content/")
c.Viper.SetDefault("BlobDownloadTimeout", int64(10))

c.Viper.SetDefault("AccountsEnabled", false)
c.Viper.BindEnv("AccountsEnabled")
Expand Down Expand Up @@ -181,3 +182,8 @@ func GetBlobFilesDir() string {
func GetReflectorAddress() string {
return Config.Viper.GetString("ReflectorAddress")
}

// GetBlobDownloadTimeout returns timeout for blob HTTP client in seconds.
func GetBlobDownloadTimeout() int64 {
return Config.Viper.GetInt64("BlobDownloadTimeout")
}
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ services:
target: /storage
environment:
LW_LBRYNET: http://lbrynet:5279/
LW_DEBUG: 1
depends_on:
- lbrynet
- postgres
Expand Down
26 changes: 18 additions & 8 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,27 @@ var (
Name: "running",
Help: "Number of streams currently playing",
})
PlayerSuccessDurations = promauto.NewHistogram(prometheus.HistogramOpts{
PlayerBlobDownloadDurations = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: nsPlayer,
Subsystem: "response",
Name: "success_seconds",
Help: "Time to successful response",
Subsystem: "blob",
Name: "download_seconds",
Help: "Blob download durations",
})
PlayerFailedDurations = promauto.NewHistogram(prometheus.HistogramOpts{
PlayerBlobDecodeDurations = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: nsPlayer,
Subsystem: "response",
Name: "failed_seconds",
Help: "Time to failed response",
Subsystem: "blob",
Name: "decode_seconds",
Help: "Blob decode durations",
})
PlayerSuccessesCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: nsPlayer,
Name: "successes_total",
Help: "Total number of successfully served blobs",
})
PlayerFailuresCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: nsPlayer,
Name: "failures_total",
Help: "Total number of errors serving blobs",
})

IAPIAuthSuccessDurations = promauto.NewHistogram(prometheus.HistogramOpts{
Expand Down

0 comments on commit 6744cb5

Please sign in to comment.