Skip to content

Commit

Permalink
Refactor out proxy SDK client
Browse files Browse the repository at this point in the history
  • Loading branch information
anbsky committed Jan 29, 2020
1 parent e27a9ec commit a588839
Show file tree
Hide file tree
Showing 8 changed files with 182 additions and 116 deletions.
108 changes: 108 additions & 0 deletions app/proxy/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package proxy

import (
"errors"
"fmt"
"net/http"
"time"

"github.com/lbryio/lbrytv/internal/lbrynet"
"github.com/lbryio/lbrytv/internal/metrics"
"github.com/lbryio/lbrytv/internal/monitor"

ljsonrpc "github.com/lbryio/lbry.go/v2/extras/jsonrpc"
"github.com/ybbus/jsonrpc"
)

const walletLoadRetries = 3
const walletLoadRetryWait = time.Millisecond * 200

var ClientLogger = monitor.NewModuleLogger("proxy_client")

type LbrynetClient interface {
Call(q *Query) (*jsonrpc.RPCResponse, error)
}

type Client struct {
rpcClient jsonrpc.RPCClient
endpoint string
wallet string
retries int
}

func NewClient(endpoint string, wallet string, timeout time.Duration) LbrynetClient {
return Client{
endpoint: endpoint,
rpcClient: jsonrpc.NewClientWithOpts(endpoint, &jsonrpc.RPCClientOpts{
HTTPClient: &http.Client{Timeout: time.Second * timeout}}),
wallet: wallet,
}
}

func (c Client) Call(q *Query) (*jsonrpc.RPCResponse, error) {
var (
i int
r *jsonrpc.RPCResponse
err error
)
for i = 0; i < walletLoadRetries; i++ {
r, err = c.call(q)

if err != nil {
return nil, err
}

// This checks if LbrynetServer responded with missing wallet error and tries to reload it,
// then repeat the request again.
if c.isWalletNotLoaded(r) {
// We need to use Lbry JSON-RPC client here for easier request/response processing
client := ljsonrpc.NewClient(c.endpoint)
_, err := client.WalletAdd(c.wallet)
if err != nil {
monitor.CaptureException(
fmt.Errorf("encountered an error adding wallet manually: %v", err), map[string]string{
"wallet_id": c.wallet,
"endpoint": c.endpoint,
})
}
} else {
return r, nil
}
}
if c.isWalletNotLoaded(r) {
monitor.CaptureException(
fmt.Errorf("couldn't manually add wallet after %v retries", i), map[string]string{
"wallet_id": c.wallet,
"endpoint": c.endpoint,
})
}
return r, err
}

func (c *Client) call(q *Query) (*jsonrpc.RPCResponse, error) {
start := time.Now()
r, err := c.rpcClient.CallRaw(q.Request)
duration := time.Since(start).Seconds()
metrics.ProxyCallDurations.WithLabelValues(q.Method(), c.endpoint).Observe(duration)
if err != nil {
return nil, err
}

if r.Error != nil {
metrics.ProxyCallFailedDurations.WithLabelValues(q.Method(), c.endpoint).Observe(duration)
Logger.LogFailedQuery(q.Method(), duration, q.Params(), r.Error)
} else {
Logger.LogSuccessfulQuery(q.Method(), duration, q.Params(), r)
}
return r, err
}

func (c *Client) isWalletNotLoaded(r *jsonrpc.RPCResponse) bool {
if r.Error != nil {
wErr := lbrynet.NewWalletError(0, errors.New(r.Error.Message))
if errors.As(wErr, &lbrynet.WalletNotLoaded{}) {
return true
}
}
return false
}
39 changes: 39 additions & 0 deletions app/proxy/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package proxy

import (
"math/rand"
"testing"
"time"

"github.com/lbryio/lbrytv/app/router"

"github.com/lbryio/lbrytv/internal/lbrynet"

"github.com/stretchr/testify/require"
"github.com/ybbus/jsonrpc"
)

func TestClientCallDoesReloadWallet(t *testing.T) {
var (
r *jsonrpc.RPCResponse
)

rand.Seed(time.Now().UnixNano())
dummyUserID := rand.Intn(100)

_, wid, _ := lbrynet.InitializeWallet(dummyUserID)
_, err := lbrynet.WalletRemove(dummyUserID)
require.NoError(t, err)

router := router.NewDefault()

c := NewClient(router.GetSDKServerAddress(wid), wid, time.Second*1)

q, _ := NewQuery(newRawRequest(t, "wallet_balance", nil))
q.SetWalletID(wid)
r, err = c.Call(q)

// err = json.Unmarshal(result, response)
require.NoError(t, err)
require.Nil(t, r.Error)
}
58 changes: 9 additions & 49 deletions app/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,19 @@ import (
"encoding/json"
"errors"
"fmt"
"net/http"
"time"

ljsonrpc "github.com/lbryio/lbry.go/v2/extras/jsonrpc"
"github.com/lbryio/lbrytv/app/router"
"github.com/lbryio/lbrytv/internal/lbrynet"
"github.com/lbryio/lbrytv/internal/metrics"
"github.com/lbryio/lbrytv/internal/monitor"

ljsonrpc "github.com/lbryio/lbry.go/v2/extras/jsonrpc"
"github.com/ybbus/jsonrpc"
)

const walletLoadRetries = 3
const defaultRPCTimeout = time.Second * 30

var Logger = monitor.NewProxyLogger()

type Preprocessor func(q *Query)

// ProxyService generates Caller objects and keeps execution time metrics
Expand All @@ -45,11 +43,10 @@ type ProxyService struct {
type Caller struct {
walletID string
query *jsonrpc.RPCRequest
client jsonrpc.RPCClient
client LbrynetClient
endpoint string
service *ProxyService
preprocessor Preprocessor
retries int
}

// Query is a wrapper around client JSON-RPC query for easier (un)marshaling and processing.
Expand All @@ -71,7 +68,6 @@ func NewService(opts Opts) *ProxyService {
s := ProxyService{
Router: opts.SDKRouter,
rpcTimeout: opts.RPCTimeout,
logger: monitor.NewProxyLogger(),
}
if s.rpcTimeout == 0 {
s.rpcTimeout = defaultRPCTimeout
Expand All @@ -83,14 +79,12 @@ func NewService(opts Opts) *ProxyService {
// Note that `SetWalletID` needs to be called if an authenticated user is making this call.
func (ps *ProxyService) NewCaller(walletID string) *Caller {
endpoint := ps.Router.GetSDKServerAddress(walletID)
client := jsonrpc.NewClientWithOpts(endpoint, &jsonrpc.RPCClientOpts{
HTTPClient: &http.Client{Timeout: time.Second * ps.rpcTimeout}})
client := NewClient(endpoint, walletID, ps.rpcTimeout)
c := Caller{
walletID: walletID,
client: client,
endpoint: endpoint,
service: ps,
retries: walletLoadRetries,
}
return &c
}
Expand Down Expand Up @@ -241,18 +235,10 @@ func (c *Caller) marshalError(e CallError) []byte {
return serialized
}

func (c *Caller) sendQuery(q *Query) (*jsonrpc.RPCResponse, error) {
response, err := c.client.CallRaw(q.Request)
if err != nil {
return nil, err
}
return response, nil
}

func (c *Caller) call(rawQuery []byte) (*jsonrpc.RPCResponse, CallError) {
q, err := NewQuery(rawQuery)
if err != nil {
c.service.logger.Errorf("malformed JSON from client: %s", err.Error())
Logger.Errorf("malformed JSON from client: %s", err.Error())
return nil, NewParseError(err)
}

Expand All @@ -276,37 +262,11 @@ func (c *Caller) call(rawQuery []byte) (*jsonrpc.RPCResponse, CallError) {
c.preprocessor(q)
}

queryStartTime := time.Now()
r, err := c.sendQuery(q)
duration := time.Since(queryStartTime).Seconds()
r, err := c.client.Call(q)
if err != nil {
return r, NewInternalError(err)
}

// We want to account for method duration whether it succeeded or not
metrics.ProxyCallDurations.WithLabelValues(q.Method()).Observe(duration)

// This checks if LbrynetServer responded with missing wallet error and tries to reload it,
// then repeat the request again.
// TODO: Refactor this and move somewhere else
if r.Error != nil {
wErr := lbrynet.NewWalletError(0, errors.New(r.Error.Message))
if c.retries > 0 && errors.As(wErr, &lbrynet.WalletNotLoaded{}) {
// We need to use Lbry JSON-RPC client here for easier request/response processing
client := ljsonrpc.NewClient(c.service.Router.GetSDKServerAddress(c.WalletID()))
_, err := client.WalletAdd(c.WalletID())
if err == nil {
c.retries--
return c.call(rawQuery)
}
} else {
metrics.ProxyCallFailedDurations.WithLabelValues(q.Method()).Observe(duration)
c.service.logger.LogFailedQuery(q.Method(), q.Params(), r.Error)
}
} else {
c.service.logger.LogSuccessfulQuery(q.Method(), duration, q.Params(), r)
}

r, err = processResponse(q.Request, r)
if err != nil {
return r, NewInternalError(err)
Expand All @@ -324,13 +284,13 @@ func (c *Caller) Call(rawQuery []byte) []byte {
r, err := c.call(rawQuery)
if err != nil {
monitor.CaptureException(err, map[string]string{"query": string(rawQuery), "response": fmt.Sprintf("%v", r)})
c.service.logger.Errorf("error calling lbrynet: %v, query: %s", err, rawQuery)
Logger.Errorf("error calling lbrynet: %v, query: %s", err, rawQuery)
return c.marshalError(err)
}
serialized, err := c.marshal(r)
if err != nil {
monitor.CaptureException(err)
c.service.logger.Errorf("error marshaling response: %v", err)
Logger.Errorf("error marshaling response: %v", err)
return c.marshalError(err)
}
return serialized
Expand Down
53 changes: 5 additions & 48 deletions app/proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,34 +59,15 @@ type ClientMock struct {
LastRequest jsonrpc.RPCRequest
}

func (c ClientMock) Call(method string, params ...interface{}) (*jsonrpc.RPCResponse, error) {
return &jsonrpc.RPCResponse{
JSONRPC: "2.0",
Result: "0.0",
}, nil
}

func (c *ClientMock) CallRaw(request *jsonrpc.RPCRequest) (*jsonrpc.RPCResponse, error) {
c.LastRequest = *request
func (c *ClientMock) Call(q *Query) (*jsonrpc.RPCResponse, error) {
c.LastRequest = *q.Request
time.Sleep(c.Delay)
return &jsonrpc.RPCResponse{
JSONRPC: "2.0",
Result: "0.0",
}, nil
}

func (c ClientMock) CallFor(out interface{}, method string, params ...interface{}) error {
return nil
}

func (c ClientMock) CallBatch(requests jsonrpc.RPCRequests) (jsonrpc.RPCResponses, error) {
return nil, nil
}

func (c ClientMock) CallBatchRaw(requests jsonrpc.RPCRequests) (jsonrpc.RPCResponses, error) {
return nil, nil
}

func TestNewCaller(t *testing.T) {
servers := map[string]string{
"default": "http://lbrynet1",
Expand Down Expand Up @@ -151,7 +132,7 @@ func TestCallerCallWalletBalance(t *testing.T) {
assert.Contains(t, string(result), `"message": "account identificator required"`)

c = svc.NewCaller(wid)
hook := logrus_test.NewLocal(svc.logger.Logger())
hook := logrus_test.NewLocal(Logger.Logger())
result = c.Call(request)

parseRawResponse(t, result, &accountBalanceResponse)
Expand All @@ -160,30 +141,6 @@ func TestCallerCallWalletBalance(t *testing.T) {
assert.Equal(t, "wallet_balance", hook.LastEntry().Data["method"])
}

func TestCallerCallDoesReloadWallet(t *testing.T) {
var (
response jsonrpc.RPCResponse
)

rand.Seed(time.Now().UnixNano())
dummyUserID := rand.Intn(100)

_, wid, _ := lbrynet.InitializeWallet(dummyUserID)
_, err := lbrynet.WalletRemove(dummyUserID)
require.NoError(t, err)

svc := NewService(Opts{SDKRouter: router.NewDefault()})
c := svc.NewCaller(wid)

request := newRawRequest(t, "wallet_balance", nil)
result := c.Call(request)

assert.Equal(t, walletLoadRetries-1, c.retries)
err = json.Unmarshal(result, &response)
require.NoError(t, err)
require.Nil(t, response.Error)
}

func TestCallerCallRelaxedMethods(t *testing.T) {
for _, m := range relaxedMethods {
t.Run(m, func(t *testing.T) {
Expand Down Expand Up @@ -320,7 +277,7 @@ func TestCallerCallSDKError(t *testing.T) {
svc := NewService(Opts{SDKRouter: router.New(router.SingleLbrynetServer(ts.URL))})
c := svc.NewCaller("")

hook := logrus_test.NewLocal(svc.logger.Logger())
hook := logrus_test.NewLocal(Logger.Logger())
response := c.Call([]byte(newRawRequest(t, "resolve", map[string]string{"urls": "what"})))
json.Unmarshal(response, &rpcResponse)
assert.Equal(t, rpcResponse.Error.Code, -32500)
Expand All @@ -338,7 +295,7 @@ func TestCallerCallClientJSONError(t *testing.T) {
svc := NewService(Opts{SDKRouter: router.New(router.SingleLbrynetServer(ts.URL))})
c := svc.NewCaller("")

hook := logrus_test.NewLocal(svc.logger.Logger())
hook := logrus_test.NewLocal(Logger.Logger())
response := c.Call([]byte(`{"method":"version}`))
json.Unmarshal(response, &rpcResponse)
assert.Equal(t, "2.0", rpcResponse.JSONRPC)
Expand Down
9 changes: 8 additions & 1 deletion deployments/docker/prometheus/prometheus.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,16 @@ scrape_configs:
- job_name: 'blob-cache'
static_configs:
- targets: ['reflector.lbry.com:2112']
- job_name: 'player'
scheme: https
static_configs:
- targets:
- 'player1.lbry.tv'
- 'player2.lbry.tv'
- 'player3.lbry.tv'
- job_name: 'spv'
static_configs:
- targets:
- targets:
- 'spv1.lbry.com:2112'
- 'spv2.lbry.com:2112'
- 'spv3.lbry.com:2112'
Expand Down
Loading

0 comments on commit a588839

Please sign in to comment.