From 6744cb579e4f455a54dac938552e18c369973ad1 Mon Sep 17 00:00:00 2001 From: Andrey Beletsky Date: Mon, 21 Oct 2019 23:56:36 +0700 Subject: [PATCH] Improve player metrics and error logging --- api/handlers.go | 9 --- app/player/player.go | 129 +++++++++++++++++++++++------------- config/config.go | 6 ++ docker-compose.yml | 1 + internal/metrics/metrics.go | 26 +++++--- internal/monitor/monitor.go | 26 +++++--- lbrytv.yml | 2 + 7 files changed, 129 insertions(+), 70 deletions(-) diff --git a/api/handlers.go b/api/handlers.go index 88a04e93..e39a10da 100644 --- a/api/handlers.go +++ b/api/handlers.go @@ -3,11 +3,9 @@ package api import ( "fmt" "net/http" - "time" "github.com/lbryio/lbrytv/app/player" "github.com/lbryio/lbrytv/config" - "github.com/lbryio/lbrytv/internal/metrics" "github.com/lbryio/lbrytv/internal/monitor" "github.com/gorilla/mux" @@ -21,14 +19,9 @@ func Index(w http.ResponseWriter, req *http.Request) { } func stream(uri string, w http.ResponseWriter, req *http.Request) { - metrics.PlayerStreamsRunning.Inc() - defer metrics.PlayerStreamsRunning.Dec() - start := time.Now() err := player.PlayURI(uri, w, req) - duration := time.Now().Sub(start).Seconds() if err != nil { - metrics.PlayerFailedDurations.Observe(duration) if err.Error() == "paid stream" { w.WriteHeader(http.StatusPaymentRequired) } else { @@ -36,8 +29,6 @@ func stream(uri string, w http.ResponseWriter, req *http.Request) { monitor.CaptureException(err, map[string]string{"uri": uri}) w.Write([]byte(err.Error())) } - } else { - metrics.PlayerSuccessDurations.Observe(duration) } } diff --git a/app/player/player.go b/app/player/player.go index ba356f91..cbc452d1 100644 --- a/app/player/player.go +++ b/app/player/player.go @@ -10,13 +10,14 @@ import ( "sort" "time" + "github.com/lbryio/lbrytv/app/users" "github.com/lbryio/lbrytv/config" "github.com/lbryio/lbrytv/internal/lbrynet" + "github.com/lbryio/lbrytv/internal/metrics" "github.com/lbryio/lbrytv/internal/monitor" ljsonrpc "github.com/lbryio/lbry.go/v2/extras/jsonrpc" "github.com/lbryio/lbry.go/v2/stream" - log "github.com/sirupsen/logrus" ) const reflectorURL = "http://blobs.lbry.io/" @@ -32,6 +33,10 @@ type reflectedStream struct { seekOffset int64 } +// Logger is a package-wide logger. +// Warning: will generate a lot of output if DEBUG loglevel is enabled. +var Logger = monitor.NewModuleLogger("player") + // PlayURI downloads and streams LBRY video content located at uri and delimited by rangeHeader // (use rangeHeader := request.Header.Get("Range")). // Streaming works like this: @@ -43,16 +48,24 @@ type reflectedStream struct { // - Read calculates boundaries and finds blobs that contain the requested stream range, // then calls streamBlobs, which sequentially downloads and decrypts requested blobs func PlayURI(uri string, w http.ResponseWriter, req *http.Request) error { - rs, err := newReflectedStream(uri) + metrics.PlayerStreamsRunning.Inc() + defer metrics.PlayerStreamsRunning.Dec() + + s, err := newReflectedStream(uri) if err != nil { return err } - err = rs.fetchData() + err = s.fetchData() if err != nil { return err } - rs.prepareWriter(w) - ServeContent(w, req, "test", time.Time{}, rs) + s.prepareWriter(w) + Logger.LogF(monitor.F{ + "stream": s.URI, + "remote_ip": users.GetIPAddressForRequest(req), + }).Info("stream requested") + ServeContent(w, req, "test", time.Time{}, s) + return err } @@ -77,14 +90,32 @@ func (s *reflectedStream) Read(p []byte) (n int, err error) { startOffsetInBlob = s.seekOffset - int64(blobNum*stream.MaxBlobSize) + int64(blobNum) } + start := time.Now() n, err = s.streamBlob(blobNum, startOffsetInBlob, p) - monitor.Logger.WithFields(log.Fields{ - "read_buffer_length": bufferLen, - "blob_num": blobNum, - "current_offset": s.seekOffset, - "offset_in_blob": startOffsetInBlob, - }).Debugf("read %v bytes (%v..%v) from blob stream", n, s.seekOffset, seekOffsetEnd) + if err != nil { + metrics.PlayerFailuresCount.Inc() + Logger.LogF(monitor.F{ + "stream": s.URI, + "num": fmt.Sprintf("%v/%v", blobNum, len(s.SDBlob.BlobInfos)), + "current_offset": s.seekOffset, + "offset_in_blob": startOffsetInBlob, + }).Errorf("failed to read from blob stream after %vs: %v", time.Since(start).Seconds(), err) + monitor.CaptureException(err, map[string]string{ + "stream": s.URI, + "num": fmt.Sprintf("%v/%v", blobNum, len(s.SDBlob.BlobInfos)), + "current_offset": fmt.Sprintf("%v", s.seekOffset), + "offset_in_blob": fmt.Sprintf("%v", startOffsetInBlob), + }) + } else { + metrics.PlayerSuccessesCount.Inc() + Logger.LogF(monitor.F{ + "buffer_len": bufferLen, + "num": fmt.Sprintf("%v/%v", blobNum, len(s.SDBlob.BlobInfos)), + "current_offset": s.seekOffset, + "offset_in_blob": startOffsetInBlob, + }).Debugf("read %v bytes (%v..%v) from blob stream", n, s.seekOffset, seekOffsetEnd) + } s.seekOffset += int64(n) return n, err @@ -136,7 +167,7 @@ func (s *reflectedStream) resolve(client *ljsonrpc.Client) error { s.ContentType = stream.Source.MediaType s.Size = int64(stream.Source.Size) - monitor.Logger.WithFields(log.Fields{ + Logger.LogF(monitor.F{ "sd_hash": fmt.Sprintf("%s", s.SdHash), "uri": s.URI, "content_type": s.ContentType, @@ -149,7 +180,7 @@ func (s *reflectedStream) fetchData() error { if s.SdHash == "" { return errors.New("no sd hash set, call `resolve` first") } - monitor.Logger.WithFields(log.Fields{ + Logger.LogF(monitor.F{ "uri": s.URI, "url": s.URL(), }).Debug("requesting stream data") @@ -188,10 +219,10 @@ func (s *reflectedStream) fetchData() error { }) s.SDBlob = sdb - monitor.Logger.WithFields(log.Fields{ - "blobs_number": len(sdb.BlobInfos), - "stream_size": s.Size, - "uri": s.URI, + Logger.LogF(monitor.F{ + "blobs": len(sdb.BlobInfos), + "size": s.Size, + "uri": s.URI, }).Debug("got stream data") return nil } @@ -200,67 +231,75 @@ func (s *reflectedStream) prepareWriter(w http.ResponseWriter) { w.Header().Set("Content-Type", s.ContentType) } +func (s *reflectedStream) getBlob(url string) (*http.Response, error) { + request, _ := http.NewRequest("GET", url, nil) + client := http.Client{Timeout: time.Second * time.Duration(config.GetBlobDownloadTimeout())} + r, err := client.Do(request) + return r, err +} + func (s *reflectedStream) streamBlob(blobNum int, startOffsetInBlob int64, dest []byte) (n int, err error) { bi := s.SDBlob.BlobInfos[blobNum] + logBlobNum := fmt.Sprintf("%v/%v", bi.BlobNum, len(s.SDBlob.BlobInfos)) + if n > 0 { startOffsetInBlob = 0 } url := blobInfoURL(bi) - monitor.Logger.WithFields(log.Fields{ - "url": url, - "stream": s.URI, - "blob_num": bi.BlobNum, + Logger.LogF(monitor.F{ + "stream": s.URI, + "url": url, + "num": logBlobNum, }).Debug("requesting a blob") start := time.Now() - resp, err := http.Get(url) + resp, err := s.getBlob(url) if err != nil { return 0, err } defer resp.Body.Close() - monitor.Logger.WithFields(log.Fields{ - "stream": s.URI, - "blob_num": bi.BlobNum, - "time_elapsed": time.Since(start), - }).Debug("done downloading a blob") - - if blobNum == 0 { - monitor.Logger.WithFields(log.Fields{ - "stream": s.URI, - "first_blob_time": time.Since(start).Seconds(), - }).Info("stream playback requested") - } - if resp.StatusCode == http.StatusOK { - start := time.Now() - encryptedBody, err := ioutil.ReadAll(resp.Body) if err != nil { return 0, err } + message := "done downloading a blob" + elapsedDLoad := time.Since(start).Seconds() + metrics.PlayerBlobDownloadDurations.Observe(elapsedDLoad) + if blobNum == 0 { + message += ", starting stream playback" + } + Logger.LogF(monitor.F{ + "stream": s.URI, + "num": logBlobNum, + "elapsed": elapsedDLoad, + }).Info(message) + + start = time.Now() decryptedBody, err := stream.DecryptBlob(stream.Blob(encryptedBody), s.SDBlob.Key, bi.IV) if err != nil { return 0, err } - endOffsetInBlob := int64(len(dest)) + startOffsetInBlob if endOffsetInBlob > int64(len(decryptedBody)) { endOffsetInBlob = int64(len(decryptedBody)) } + elapsedDecode := time.Since(start).Seconds() + metrics.PlayerBlobDecodeDurations.Observe(elapsedDecode) thisN := copy(dest, decryptedBody[startOffsetInBlob:endOffsetInBlob]) n += thisN - monitor.Logger.WithFields(log.Fields{ - "stream": s.URI, - "blob_num": bi.BlobNum, - "bytes_written": n, - "time_elapsed": time.Since(start), - "start_offset": startOffsetInBlob, - "end_offset": endOffsetInBlob, + Logger.LogF(monitor.F{ + "stream": s.URI, + "num": logBlobNum, + "written": n, + "elapsed": elapsedDecode, + "start_offset": startOffsetInBlob, + "end_offset": endOffsetInBlob, }).Debug("done streaming a blob") } else { return n, fmt.Errorf("server responded with an unexpected status (%v)", resp.Status) diff --git a/config/config.go b/config/config.go index 33b5d33b..4c40a6be 100644 --- a/config/config.go +++ b/config/config.go @@ -59,6 +59,7 @@ func (c *ConfigWrapper) Init() { c.Viper.SetDefault("Address", ":8080") c.Viper.SetDefault("Host", "http://localhost:8080") c.Viper.SetDefault("BaseContentURL", "http://localhost:8080/content/") + c.Viper.SetDefault("BlobDownloadTimeout", int64(10)) c.Viper.SetDefault("AccountsEnabled", false) c.Viper.BindEnv("AccountsEnabled") @@ -181,3 +182,8 @@ func GetBlobFilesDir() string { func GetReflectorAddress() string { return Config.Viper.GetString("ReflectorAddress") } + +// GetBlobDownloadTimeout returns timeout for blob HTTP client in seconds. +func GetBlobDownloadTimeout() int64 { + return Config.Viper.GetInt64("BlobDownloadTimeout") +} diff --git a/docker-compose.yml b/docker-compose.yml index ae17fb2b..97b96c33 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -69,6 +69,7 @@ services: target: /storage environment: LW_LBRYNET: http://lbrynet:5279/ + LW_DEBUG: 1 depends_on: - lbrynet - postgres diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 8eb5049f..132b23ea 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -43,17 +43,27 @@ var ( Name: "running", Help: "Number of streams currently playing", }) - PlayerSuccessDurations = promauto.NewHistogram(prometheus.HistogramOpts{ + PlayerBlobDownloadDurations = promauto.NewHistogram(prometheus.HistogramOpts{ Namespace: nsPlayer, - Subsystem: "response", - Name: "success_seconds", - Help: "Time to successful response", + Subsystem: "blob", + Name: "download_seconds", + Help: "Blob download durations", }) - PlayerFailedDurations = promauto.NewHistogram(prometheus.HistogramOpts{ + PlayerBlobDecodeDurations = promauto.NewHistogram(prometheus.HistogramOpts{ Namespace: nsPlayer, - Subsystem: "response", - Name: "failed_seconds", - Help: "Time to failed response", + Subsystem: "blob", + Name: "decode_seconds", + Help: "Blob decode durations", + }) + PlayerSuccessesCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: nsPlayer, + Name: "successes_total", + Help: "Total number of successfully served blobs", + }) + PlayerFailuresCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: nsPlayer, + Name: "failures_total", + Help: "Total number of errors serving blobs", }) IAPIAuthSuccessDurations = promauto.NewHistogram(prometheus.HistogramOpts{ diff --git a/internal/monitor/monitor.go b/internal/monitor/monitor.go index a6078833..96c41340 100644 --- a/internal/monitor/monitor.go +++ b/internal/monitor/monitor.go @@ -22,11 +22,15 @@ const ValueMask = "****" type ModuleLogger struct { ModuleName string Logger *logrus.Logger + Level logrus.Level } // F can be supplied to ModuleLogger's Log function for providing additional log context. type F map[string]interface{} +var jsonFormatter = logrus.JSONFormatter{DisableTimestamp: true} +var textFormatter = logrus.TextFormatter{FullTimestamp: true, TimestampFormat: "15:04"} + // init magic is needed so logging is set up without calling it in every package explicitly func init() { SetupLogging() @@ -44,15 +48,15 @@ func SetupLogging() { logrus.SetLevel(logrus.InfoLevel) Logger.SetLevel(logrus.InfoLevel) - logrus.SetFormatter(&logrus.JSONFormatter{}) - Logger.SetFormatter(&logrus.JSONFormatter{}) + logrus.SetFormatter(&jsonFormatter) + Logger.SetFormatter(&jsonFormatter) } else { mode = "develop" logrus.SetLevel(logrus.DebugLevel) Logger.SetLevel(logrus.DebugLevel) - logrus.SetFormatter(&logrus.TextFormatter{FullTimestamp: true}) - Logger.SetFormatter(&logrus.TextFormatter{FullTimestamp: true}) + logrus.SetFormatter(&textFormatter) + Logger.SetFormatter(&textFormatter) } Logger.Infof("%v, running in %v mode", version.GetFullBuildName(), mode) @@ -62,10 +66,14 @@ func SetupLogging() { // NewModuleLogger creates a new ModuleLogger instance carrying module name // for later `Log()` calls. func NewModuleLogger(moduleName string) ModuleLogger { - return ModuleLogger{ + logger := getBaseLogger() + modLogger := ModuleLogger{ ModuleName: moduleName, - Logger: logrus.New(), + Logger: logger, + Level: logger.GetLevel(), } + modLogger.Log().Debugf("module logger initialized (level=%v)", modLogger.Level) + return modLogger } // LogF returns a new log entry containing additional info provided by fields, @@ -138,10 +146,10 @@ func getBaseLogger() *logrus.Logger { logger := logrus.New() if config.IsProduction() { logger.SetLevel(logrus.InfoLevel) - logger.SetFormatter(&logrus.JSONFormatter{}) + logger.SetFormatter(&jsonFormatter) } else { logger.SetLevel(logrus.DebugLevel) - logger.SetFormatter(&logrus.TextFormatter{FullTimestamp: true}) + logger.SetFormatter(&textFormatter) } return logger } @@ -149,6 +157,7 @@ func getBaseLogger() *logrus.Logger { type ProxyLogger struct { logger *logrus.Logger entry *logrus.Entry + Level logrus.Level } func NewProxyLogger() *ProxyLogger { @@ -157,6 +166,7 @@ func NewProxyLogger() *ProxyLogger { l := ProxyLogger{ logger: logger, entry: logger.WithFields(logrus.Fields{"module": "proxy"}), + Level: logger.GetLevel(), } return &l } diff --git a/lbrytv.yml b/lbrytv.yml index dda7554e..d85a8958 100644 --- a/lbrytv.yml +++ b/lbrytv.yml @@ -19,3 +19,5 @@ PublishSourceDir: /storage/published BlobFilesDir: /storage/lbrynet/blobfiles # ReflectorAddress: reflector.lbry.com:5566 + +BlobDownloadTimeout: 60