Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor out proxy SDK client #160

Merged
merged 2 commits into from
Feb 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions api/routes.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package api

import (
"net/http/pprof"

"github.com/lbryio/lbrytv/app/player"
"github.com/lbryio/lbrytv/app/proxy"
"github.com/lbryio/lbrytv/app/publish"
Expand All @@ -28,12 +26,4 @@ func InstallRoutes(proxyService *proxy.ProxyService, r *mux.Router) {
v1Router.HandleFunc("/proxy", proxyHandler.Handle)

player.InstallRoutes(r)

debugRouter := r.PathPrefix("/superdebug/pprof").Subrouter()
debugRouter.HandleFunc("/", pprof.Index)
debugRouter.HandleFunc("/cmdline", pprof.Cmdline)
debugRouter.HandleFunc("/profile", pprof.Profile)
debugRouter.HandleFunc("/symbol", pprof.Symbol)
debugRouter.HandleFunc("/trace", pprof.Trace)
debugRouter.Handle("/heap", pprof.Handler("heap"))
}
108 changes: 108 additions & 0 deletions app/proxy/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package proxy

import (
"errors"
"fmt"
"net/http"
"time"

"github.com/lbryio/lbrytv/internal/lbrynet"
"github.com/lbryio/lbrytv/internal/metrics"
"github.com/lbryio/lbrytv/internal/monitor"

ljsonrpc "github.com/lbryio/lbry.go/v2/extras/jsonrpc"
"github.com/ybbus/jsonrpc"
)

const walletLoadRetries = 3
const walletLoadRetryWait = time.Millisecond * 200

var ClientLogger = monitor.NewModuleLogger("proxy_client")

type LbrynetClient interface {
Call(q *Query) (*jsonrpc.RPCResponse, error)
}

type Client struct {
rpcClient jsonrpc.RPCClient
endpoint string
wallet string
retries int
}

func NewClient(endpoint string, wallet string, timeout time.Duration) LbrynetClient {
return Client{
endpoint: endpoint,
rpcClient: jsonrpc.NewClientWithOpts(endpoint, &jsonrpc.RPCClientOpts{
HTTPClient: &http.Client{Timeout: time.Second * timeout}}),
wallet: wallet,
}
}

func (c Client) Call(q *Query) (*jsonrpc.RPCResponse, error) {
var (
i int
r *jsonrpc.RPCResponse
err error
)
for i = 0; i < walletLoadRetries; i++ {
r, err = c.call(q)

if err != nil {
return nil, err
}

// This checks if LbrynetServer responded with missing wallet error and tries to reload it,
// then repeat the request again.
if c.isWalletNotLoaded(r) {
// We need to use Lbry JSON-RPC client here for easier request/response processing
client := ljsonrpc.NewClient(c.endpoint)
_, err := client.WalletAdd(c.wallet)
if err != nil {
monitor.CaptureException(
fmt.Errorf("encountered an error adding wallet manually: %v", err), map[string]string{
"wallet_id": c.wallet,
"endpoint": c.endpoint,
})
}
} else {
return r, nil
}
}
if c.isWalletNotLoaded(r) {
monitor.CaptureException(
fmt.Errorf("couldn't manually add wallet after %v retries", i), map[string]string{
"wallet_id": c.wallet,
"endpoint": c.endpoint,
})
}
return r, err
}

func (c *Client) call(q *Query) (*jsonrpc.RPCResponse, error) {
start := time.Now()
r, err := c.rpcClient.CallRaw(q.Request)
duration := time.Since(start).Seconds()
metrics.ProxyCallDurations.WithLabelValues(q.Method(), c.endpoint).Observe(duration)
if err != nil {
return nil, err
}

if r.Error != nil {
metrics.ProxyCallFailedDurations.WithLabelValues(q.Method(), c.endpoint).Observe(duration)
Logger.LogFailedQuery(q.Method(), duration, q.Params(), r.Error)
} else {
Logger.LogSuccessfulQuery(q.Method(), duration, q.Params(), r)
}
return r, err
}

func (c *Client) isWalletNotLoaded(r *jsonrpc.RPCResponse) bool {
if r.Error != nil {
wErr := lbrynet.NewWalletError(0, errors.New(r.Error.Message))
if errors.As(wErr, &lbrynet.WalletNotLoaded{}) {
return true
}
}
return false
}
39 changes: 39 additions & 0 deletions app/proxy/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package proxy

import (
"math/rand"
"testing"
"time"

"github.com/lbryio/lbrytv/app/router"

"github.com/lbryio/lbrytv/internal/lbrynet"

"github.com/stretchr/testify/require"
"github.com/ybbus/jsonrpc"
)

func TestClientCallDoesReloadWallet(t *testing.T) {
var (
r *jsonrpc.RPCResponse
)

rand.Seed(time.Now().UnixNano())
dummyUserID := rand.Intn(100)

_, wid, _ := lbrynet.InitializeWallet(dummyUserID)
_, err := lbrynet.WalletRemove(dummyUserID)
require.NoError(t, err)

router := router.NewDefault()

c := NewClient(router.GetSDKServerAddress(wid), wid, time.Second*1)

q, _ := NewQuery(newRawRequest(t, "wallet_balance", nil))
q.SetWalletID(wid)
r, err = c.Call(q)

// err = json.Unmarshal(result, response)
require.NoError(t, err)
require.Nil(t, r.Error)
}
58 changes: 9 additions & 49 deletions app/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,19 @@ import (
"encoding/json"
"errors"
"fmt"
"net/http"
"time"

ljsonrpc "github.com/lbryio/lbry.go/v2/extras/jsonrpc"
"github.com/lbryio/lbrytv/app/router"
"github.com/lbryio/lbrytv/internal/lbrynet"
"github.com/lbryio/lbrytv/internal/metrics"
"github.com/lbryio/lbrytv/internal/monitor"

ljsonrpc "github.com/lbryio/lbry.go/v2/extras/jsonrpc"
"github.com/ybbus/jsonrpc"
)

const walletLoadRetries = 3
const defaultRPCTimeout = time.Second * 30

var Logger = monitor.NewProxyLogger()

type Preprocessor func(q *Query)

// ProxyService generates Caller objects and keeps execution time metrics
Expand All @@ -45,11 +43,10 @@ type ProxyService struct {
type Caller struct {
walletID string
query *jsonrpc.RPCRequest
client jsonrpc.RPCClient
client LbrynetClient
endpoint string
service *ProxyService
preprocessor Preprocessor
retries int
}

// Query is a wrapper around client JSON-RPC query for easier (un)marshaling and processing.
Expand All @@ -71,7 +68,6 @@ func NewService(opts Opts) *ProxyService {
s := ProxyService{
Router: opts.SDKRouter,
rpcTimeout: opts.RPCTimeout,
logger: monitor.NewProxyLogger(),
}
if s.rpcTimeout == 0 {
s.rpcTimeout = defaultRPCTimeout
Expand All @@ -83,14 +79,12 @@ func NewService(opts Opts) *ProxyService {
// Note that `SetWalletID` needs to be called if an authenticated user is making this call.
func (ps *ProxyService) NewCaller(walletID string) *Caller {
endpoint := ps.Router.GetSDKServerAddress(walletID)
client := jsonrpc.NewClientWithOpts(endpoint, &jsonrpc.RPCClientOpts{
HTTPClient: &http.Client{Timeout: time.Second * ps.rpcTimeout}})
client := NewClient(endpoint, walletID, ps.rpcTimeout)
c := Caller{
walletID: walletID,
client: client,
endpoint: endpoint,
service: ps,
retries: walletLoadRetries,
}
return &c
}
Expand Down Expand Up @@ -241,18 +235,10 @@ func (c *Caller) marshalError(e CallError) []byte {
return serialized
}

func (c *Caller) sendQuery(q *Query) (*jsonrpc.RPCResponse, error) {
response, err := c.client.CallRaw(q.Request)
if err != nil {
return nil, err
}
return response, nil
}

func (c *Caller) call(rawQuery []byte) (*jsonrpc.RPCResponse, CallError) {
q, err := NewQuery(rawQuery)
if err != nil {
c.service.logger.Errorf("malformed JSON from client: %s", err.Error())
Logger.Errorf("malformed JSON from client: %s", err.Error())
return nil, NewParseError(err)
}

Expand All @@ -276,37 +262,11 @@ func (c *Caller) call(rawQuery []byte) (*jsonrpc.RPCResponse, CallError) {
c.preprocessor(q)
}

queryStartTime := time.Now()
r, err := c.sendQuery(q)
duration := time.Since(queryStartTime).Seconds()
r, err := c.client.Call(q)
if err != nil {
return r, NewInternalError(err)
}

// We want to account for method duration whether it succeeded or not
metrics.ProxyCallDurations.WithLabelValues(q.Method()).Observe(duration)

// This checks if LbrynetServer responded with missing wallet error and tries to reload it,
// then repeat the request again.
// TODO: Refactor this and move somewhere else
if r.Error != nil {
wErr := lbrynet.NewWalletError(0, errors.New(r.Error.Message))
if c.retries > 0 && errors.As(wErr, &lbrynet.WalletNotLoaded{}) {
// We need to use Lbry JSON-RPC client here for easier request/response processing
client := ljsonrpc.NewClient(c.service.Router.GetSDKServerAddress(c.WalletID()))
_, err := client.WalletAdd(c.WalletID())
if err == nil {
c.retries--
return c.call(rawQuery)
}
} else {
metrics.ProxyCallFailedDurations.WithLabelValues(q.Method()).Observe(duration)
c.service.logger.LogFailedQuery(q.Method(), q.Params(), r.Error)
}
} else {
c.service.logger.LogSuccessfulQuery(q.Method(), duration, q.Params(), r)
}

r, err = processResponse(q.Request, r)
if err != nil {
return r, NewInternalError(err)
Expand All @@ -324,13 +284,13 @@ func (c *Caller) Call(rawQuery []byte) []byte {
r, err := c.call(rawQuery)
if err != nil {
monitor.CaptureException(err, map[string]string{"query": string(rawQuery), "response": fmt.Sprintf("%v", r)})
c.service.logger.Errorf("error calling lbrynet: %v, query: %s", err, rawQuery)
Logger.Errorf("error calling lbrynet: %v, query: %s", err, rawQuery)
return c.marshalError(err)
}
serialized, err := c.marshal(r)
if err != nil {
monitor.CaptureException(err)
c.service.logger.Errorf("error marshaling response: %v", err)
Logger.Errorf("error marshaling response: %v", err)
return c.marshalError(err)
}
return serialized
Expand Down
Loading