Skip to content

Commit

Permalink
Improve logging, use configurable refractor address
Browse files Browse the repository at this point in the history
  • Loading branch information
anbsky committed Nov 28, 2019
1 parent 1667c0c commit c16eb53
Show file tree
Hide file tree
Showing 11 changed files with 81 additions and 59 deletions.
6 changes: 4 additions & 2 deletions app/player/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import (
"github.com/lbryio/lbrytv/internal/monitor"
)

// RequestHandler is a wrapper for passing proxy.ProxyService instance to proxy HTTP handler.
// RequestHandler is a HTTP request handler for player package.
type RequestHandler struct {
player *Player
}

// NewRequestHandler initializes request handler with a provided Proxy ProxyService instance
// NewRequestHandler initializes a HTTP request handler with the provided Player instance.
func NewRequestHandler(p *Player) *RequestHandler {
return &RequestHandler{p}
}
Expand All @@ -39,6 +39,7 @@ func (h RequestHandler) processStreamError(w http.ResponseWriter, uri string, er
}
}

// Handle is responsible for all HTTP media delivery via player module.
func (h *RequestHandler) Handle(w http.ResponseWriter, r *http.Request) {
uri := h.getURI(r)
err := h.player.Play(uri, w, r)
Expand All @@ -48,6 +49,7 @@ func (h *RequestHandler) Handle(w http.ResponseWriter, r *http.Request) {
}
}

// HandleOptions handlers OPTIONS requests for media.
func (h *RequestHandler) HandleOptions(w http.ResponseWriter, r *http.Request) {
header := w.Header()
uri := h.getURI(r)
Expand Down
1 change: 0 additions & 1 deletion app/player/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ func makeRequest(method, uri string, rng *rangeHeader) *http.Response {
}
}

fmt.Println(r.Header.Get("Range"))
rr := httptest.NewRecorder()
router.ServeHTTP(rr, r)
return rr.Result()
Expand Down
28 changes: 18 additions & 10 deletions app/player/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,13 @@ func (l localLogger) streamPlaybackRequested(uri, remoteIP string) {
l.WithFields(monitor.F{"remote_ip": remoteIP, "uri": uri}).Info("starting stream playback")
}

func (l localLogger) streamPlaybackEnd(s *Stream) {
l.WithFields(monitor.F{"uri": s.URI}).Info("stream playback complete")
}

func (l localLogger) streamSeek(s *Stream, offset, newOffset int64, whence string) {
Logger.WithFields(monitor.F{"stream": s.URI}).Tracef("seeking from %v to %v (%v), new position = %v", s.seekOffset, offset, whence, newOffset)
}

func (l localLogger) streamRead(s *Stream, n int, calc BlobCalculator) {
l.WithFields(monitor.F{"uri": s.URI}).Debugf("read %v bytes (%v..%v) from blob stream", n, calc.Offset, s.seekOffset)
metrics.PlayerSuccessesCount.Inc()
l.WithFields(monitor.F{"uri": s.URI}).Debugf("read %v bytes (%v..%v) from stream", n, calc.Offset, s.seekOffset)
}

func (l localLogger) streamReadFailed(s *Stream, calc BlobCalculator, err error) {
Expand All @@ -45,22 +42,33 @@ func (l localLogger) streamReadFailed(s *Stream, calc BlobCalculator, err error)
}

monitor.CaptureException(err, excFields)
l.WithFields(logFields).Info("stream read failed:", err)
l.WithFields(logFields).Info("stream read failed: ", err)
}

func (l localLogger) streamResolved(s *Stream) {
l.WithFields(monitor.F{"uri": s.URI}).Debug("stream resolved")
}

func (l localLogger) streamResolveFailure(uri string, err error) {
l.WithFields(monitor.F{"uri": uri}).Error("stream failed to resolve: ", err)
func (l localLogger) streamResolveFailed(uri string, err error) {
metrics.PlayerFailuresCount.Inc()
l.WithFields(monitor.F{"uri": uri}).Error("failed to resolve stream: ", err)
}

func (l localLogger) streamRetrieved(s *Stream) {
l.WithFields(monitor.F{"uri": s.URI}).Debug("stream retrieved")
}

func (l localLogger) streamRetrievalFailed(uri string, err error) {
metrics.PlayerFailuresCount.Inc()
l.WithFields(monitor.F{"uri": uri}).Error("failed to retrieve stream: ", err)
}

func (l localLogger) blobDownloaded(b stream.Blob, t *metrics.Timer) {
speed := float64(len(b)) / (1024 * 1024) / t.Duration
l.WithFields(monitor.F{"duration": t.Duration, "speed": speed}).Debug("blob downloaded")
}

func (l localLogger) blobDownloadFailure(b stream.Blob, err error) {
l.Log().Error("blob failed to download:", err)
func (l localLogger) blobDownloadFailed(b stream.Blob, err error) {
metrics.PlayerFailuresCount.Inc()
l.Log().Error("blob failed to download: ", err)
}
43 changes: 22 additions & 21 deletions app/player/player.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
"github.com/lbryio/reflector.go/store"
)

const reflectorAddress = "refractor.lbry.com:5567"

type CacheEntry struct {
Body []byte
Hits int32
Expand All @@ -32,9 +30,9 @@ var cache = map[string]*CacheEntry{}

// Player is an entry-point object to the new player package.
type Player struct {
lbrynet *ljsonrpc.Client
blobStore store.BlobStore
blobGetter BlobGetter
lbrynetClient *ljsonrpc.Client
blobStore store.BlobStore
blobGetter BlobGetter
}

type PlayerOpts struct {
Expand Down Expand Up @@ -74,13 +72,12 @@ type reflectedBlob struct {

// GetBlobStore returns default pre-configured blob store.
func GetBlobStore() *peer.Store {
return peer.NewStore(peer.StoreOpts{Address: reflectorAddress, Timeout: time.Second * 25})
return peer.NewStore(peer.StoreOpts{
Address: config.GetRefractorAddress(),
Timeout: time.Second * time.Duration(config.GetRefractorTimeout()),
})
}

// func NewCachingBlobStore(reflectorAddress string) *store.CachingBlobStore {
// return store.NewCachingBlobStore()
// }

// NewPlayer initializes an instance with optional BlobStore.
func NewPlayer(opts *PlayerOpts) *Player {
if opts == nil {
Expand All @@ -89,32 +86,38 @@ func NewPlayer(opts *PlayerOpts) *Player {
if opts.BlobStore == nil {
opts.BlobStore = GetBlobStore()
}
if opts.Lbrynet == nil {
sdkRouter := router.New(config.GetLbrynetServers())
opts.Lbrynet = ljsonrpc.NewClient(sdkRouter.GetBalancedSDKAddress())
}
p := &Player{
lbrynet: opts.Lbrynet,
blobStore: opts.BlobStore,
lbrynetClient: opts.Lbrynet,
blobStore: opts.BlobStore,
}
return p
}

func (p *Player) getLbrynetClient() *ljsonrpc.Client {
if p.lbrynetClient != nil {
return p.lbrynetClient
}
sdkRouter := router.New(config.GetLbrynetServers())
return ljsonrpc.NewClient(sdkRouter.GetBalancedSDKAddress())
}

// Play delivers requested URI onto the supplied http.ResponseWriter.
func (p *Player) Play(uri string, w http.ResponseWriter, r *http.Request) error {
Logger.streamPlaybackRequested(uri, users.GetIPAddressForRequest(r))

s, err := p.ResolveStream(uri)
if err != nil {
Logger.streamResolveFailure(uri, err)
Logger.streamResolveFailed(uri, err)
return err
}
Logger.streamResolved(s)

err = p.RetrieveStream(s)
if err != nil {
Logger.streamRetrievalFailed(uri, err)
return err
}
Logger.streamRetrieved(s)

w.Header().Set("Content-Type", s.ContentType)

Expand All @@ -129,7 +132,7 @@ func (p *Player) Play(uri string, w http.ResponseWriter, r *http.Request) error
func (p *Player) ResolveStream(uri string) (*Stream, error) {
s := &Stream{URI: uri}

r, err := p.lbrynet.Resolve(uri)
r, err := p.getLbrynetClient().Resolve(uri)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -249,10 +252,8 @@ func (s *Stream) Read(dest []byte) (n int, err error) {
n, err = s.readFromBlobs(calc, dest)

if err != nil {
metrics.PlayerFailuresCount.Inc()
Logger.streamReadFailed(s, calc, err)
} else {
metrics.PlayerSuccessesCount.Inc()
Logger.streamRead(s, n, calc)
}

Expand Down Expand Up @@ -313,7 +314,7 @@ func (b *BlobGetter) getReflectedBlobByHash(hash string) (*reflectedBlob, error)
timer := metrics.TimerStart(metrics.PlayerBlobDownloadDurations)
blob, err := b.blobStore.Get(hash)
if err != nil {
Logger.blobDownloadFailure(blob, err)
Logger.blobDownloadFailed(blob, err)
return nil, err
}
timer.Done()
Expand Down
10 changes: 2 additions & 8 deletions app/player/player_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ import (
"io"
"math/rand"
"testing"
"time"

"github.com/lbryio/reflector.go/peer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -126,9 +124,7 @@ func TestBlobCalculator(t *testing.T) {
}

func TestStreamRead(t *testing.T) {
p := NewPlayer(&PlayerOpts{
BlobStore: peer.NewStore(peer.StoreOpts{Timeout: time.Second * 60, Address: reflectorAddress}),
})
p := NewPlayer(nil)
s, err := p.ResolveStream(streamURL)
require.NoError(t, err)

Expand All @@ -153,9 +149,7 @@ func TestStreamRead(t *testing.T) {
}

func TestStreamReadOutOfBounds(t *testing.T) {
p := NewPlayer(&PlayerOpts{
BlobStore: peer.NewStore(peer.StoreOpts{Timeout: time.Second * 60, Address: reflectorAddress}),
})
p := NewPlayer(nil)
s, err := p.ResolveStream(streamURL)
require.NoError(t, err)

Expand Down
21 changes: 16 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ func (c *ConfigWrapper) Init() {
c.Viper.SetDefault("Address", ":8080")
c.Viper.SetDefault("Host", "http://localhost:8080")
c.Viper.SetDefault("BaseContentURL", "http://localhost:8080/content/")
c.Viper.SetDefault("BlobDownloadTimeout", int64(10))
c.Viper.SetDefault("ReflectorTimeout", int64(10))
c.Viper.SetDefault("RefractorTimeout", int64(10))

c.Viper.SetDefault("AccountsEnabled", false)
c.Viper.BindEnv("AccountsEnabled")
Expand Down Expand Up @@ -208,12 +209,22 @@ func GetReflectorAddress() string {
return Config.Viper.GetString("ReflectorAddress")
}

// GetBlobDownloadTimeout returns timeout for blob HTTP client in seconds.
func GetBlobDownloadTimeout() int64 {
return Config.Viper.GetInt64("BlobDownloadTimeout")
// GetReflectorTimeout returns reflector TCP timeout in seconds.
func GetReflectorTimeout() int64 {
return Config.Viper.GetInt64("ReflectorTimeout")
}

// AccountsEnabled enables or disables accounts subsystem
// GetRefractorAddress returns refractor address in the format of host:port.
func GetRefractorAddress() string {
return Config.Viper.GetString("RefractorAddress")
}

// GetRefractorTimeout returns refractor TCP timeout in seconds.
func GetRefractorTimeout() int64 {
return Config.Viper.GetInt64("RefractorTimeout")
}

// ShouldLogResponses enables or disables full SDK responses logging
func ShouldLogResponses() bool {
return Config.Viper.GetBool("ShouldLogResponses")
}
5 changes: 5 additions & 0 deletions deployments/docker/app/config/lbrytv.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,9 @@ PublishSourceDir: /storage/publish
BlobFilesDir: /storage/lbrynet/blobfiles

ReflectorAddress: reflector.lbry.com:5566
ReflectorTimeout: 5

RefractorAddress: refractor.lbry.com:5567
RefractorTimeout: 5

ShouldLogResponses: false
2 changes: 1 addition & 1 deletion internal/monitor/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const responseSnippetLength = 500

var errGeneric = errors.New("handler responded with an error")

var httpLogger = NewModuleLogger("http")
var httpLogger = NewModuleLogger("monitor")

// loggingWriter mimics http.ResponseWriter but stores a snippet of response, status code
// and response size for easier logging
Expand Down
12 changes: 6 additions & 6 deletions internal/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ func SetupLogging() {
// LogSuccessfulQuery takes a remote method name, execution time and params and logs it
func LogSuccessfulQuery(method string, time float64, params interface{}, response interface{}) {
fields := logrus.Fields{
"method": method,
"time": time,
"params": params,
"method": method,
"duration": time,
"params": params,
}
if config.ShouldLogResponses() {
fields["response"] = response
Expand Down Expand Up @@ -119,9 +119,9 @@ func NewProxyLogger() *ProxyLogger {

func (l *ProxyLogger) LogSuccessfulQuery(method string, time float64, params interface{}, response interface{}) {
fields := logrus.Fields{
"method": method,
"time": time,
"params": params,
"method": method,
"duration": time,
"params": params,
}
if config.ShouldLogResponses() {
fields["response"] = response
Expand Down
6 changes: 3 additions & 3 deletions internal/monitor/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestLogSuccessfulQuery(t *testing.T) {
require.Equal(t, log.InfoLevel, hook.LastEntry().Level)
require.Equal(t, "resolve", hook.LastEntry().Data["method"])
require.Equal(t, map[string]string{"urls": "one"}, hook.LastEntry().Data["params"])
require.Equal(t, 0.025, hook.LastEntry().Data["time"])
require.Equal(t, 0.025, hook.LastEntry().Data["duration"])
require.Equal(t, "call processed", hook.LastEntry().Message)

LogSuccessfulQuery("account_balance", 0.025, nil, nil)
Expand All @@ -45,7 +45,7 @@ func TestLogSuccessfulQuery(t *testing.T) {
require.Equal(t, log.InfoLevel, hook.LastEntry().Level)
require.Equal(t, "account_balance", hook.LastEntry().Data["method"])
require.Equal(t, nil, hook.LastEntry().Data["params"])
require.Equal(t, 0.025, hook.LastEntry().Data["time"])
require.Equal(t, 0.025, hook.LastEntry().Data["duration"])
require.Nil(t, hook.LastEntry().Data["response"])
require.Equal(t, "call processed", hook.LastEntry().Message)

Expand Down Expand Up @@ -77,7 +77,7 @@ func TestLogSuccessfulQueryWithResponse(t *testing.T) {
require.Equal(t, log.InfoLevel, hook.LastEntry().Level)
require.Equal(t, "resolve", hook.LastEntry().Data["method"])
require.Equal(t, map[string]string{"urls": "one"}, hook.LastEntry().Data["params"])
require.Equal(t, 0.025, hook.LastEntry().Data["time"])
require.Equal(t, 0.025, hook.LastEntry().Data["duration"])
require.Equal(t, response, hook.LastEntry().Data["response"])
require.Equal(t, "call processed", hook.LastEntry().Message)

Expand Down
6 changes: 4 additions & 2 deletions lbrytv.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ MetricsPath: /metrics
PublishSourceDir: /storage/published
BlobFilesDir: /storage/lbrynet/blobfiles

# ReflectorAddress: reflector.lbry.com:5566
ReflectorAddress: reflector.lbry.com:5566
ReflectorTimeout: 60

BlobDownloadTimeout: 60
RefractorAddress: refractor.lbry.com:5567
RefractorTimeout: 60

0 comments on commit c16eb53

Please sign in to comment.