Skip to content

Commit

Permalink
Add configurable JSON-RPC timeout to proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
anbsky committed Jan 21, 2020
1 parent 5e1b08a commit 8ce9269
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 29 deletions.
8 changes: 6 additions & 2 deletions api/benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
"github.com/lbryio/lbrytv/app/users"
"github.com/lbryio/lbrytv/config"
"github.com/lbryio/lbrytv/internal/lbrynet"
"github.com/lbryio/lbrytv/internal/storage"
"github.com/lbryio/lbrytv/internal/responses"
"github.com/lbryio/lbrytv/internal/storage"
"github.com/lbryio/lbrytv/models"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -110,7 +110,11 @@ func BenchmarkWalletCommands(b *testing.B) {
wallets[i] = u
}

handler := proxy.NewRequestHandler(proxy.NewService(router.New(config.GetLbrynetServers())))
handler := proxy.NewRequestHandler(
proxy.NewService(
proxy.Opts{SDKRouter: router.New(config.GetLbrynetServers())},
),
)

b.SetParallelism(30)
b.ResetTimer()
Expand Down
6 changes: 3 additions & 3 deletions api/routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

func TestRoutesProxy(t *testing.T) {
r := mux.NewRouter()
proxy := proxy.NewService(router.New(config.GetLbrynetServers()))
proxy := proxy.NewService(proxy.Opts{SDKRouter: router.New(config.GetLbrynetServers())})

req, err := http.NewRequest("POST", "/api/v1/proxy", bytes.NewBuffer([]byte(`{"method": "status"}`)))
require.Nil(t, err)
Expand All @@ -33,7 +33,7 @@ func TestRoutesProxy(t *testing.T) {

func TestRoutesPublish(t *testing.T) {
r := mux.NewRouter()
proxy := proxy.NewService(router.New(config.GetLbrynetServers()))
proxy := proxy.NewService(proxy.Opts{SDKRouter: router.New(config.GetLbrynetServers())})

req := publish.CreatePublishRequest(t, []byte("test file"))
rr := httptest.NewRecorder()
Expand All @@ -49,7 +49,7 @@ func TestRoutesPublish(t *testing.T) {

func TestRoutesOptions(t *testing.T) {
r := mux.NewRouter()
proxy := proxy.NewService(router.New(config.GetLbrynetServers()))
proxy := proxy.NewService(proxy.Opts{SDKRouter: router.New(config.GetLbrynetServers())})

req, err := http.NewRequest("OPTIONS", "/api/v1/proxy", nil)
require.Nil(t, err)
Expand Down
2 changes: 1 addition & 1 deletion app/proxy/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var svc *ProxyService
func TestMain(m *testing.M) {
rand.Seed(time.Now().UnixNano())

svc = NewService(router.New(config.GetLbrynetServers()))
svc = NewService(Opts{SDKRouter: router.New(config.GetLbrynetServers())})

dbConfig := config.GetDatabase()
params := storage.ConnParams{
Expand Down
27 changes: 21 additions & 6 deletions app/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"encoding/json"
"errors"
"fmt"
"net/http"
"time"

"github.com/lbryio/lbrytv/app/router"
Expand All @@ -27,14 +28,16 @@ import (
)

const walletLoadRetries = 3
const defaultRPCTimeout = time.Second * 30

type Preprocessor func(q *Query)

// ProxyService generates Caller objects and keeps execution time metrics
// for all calls proxied through those objects.
type ProxyService struct {
Router router.SDKRouter
logger monitor.QueryMonitor
Router router.SDKRouter
rpcTimeout time.Duration
logger monitor.QueryMonitor
}

// Caller patches through JSON-RPC requests from clients, doing pre/post-processing,
Expand All @@ -56,12 +59,22 @@ type Query struct {
walletID string
}

// Opts is initialization parameters for NewService / proxy.ProxyService
type Opts struct {
SDKRouter router.SDKRouter
RPCTimeout time.Duration
}

// NewService is the entry point to proxy module.
// Normally only one instance of ProxyService should be created per running server.
func NewService(sdkRouter router.SDKRouter) *ProxyService {
func NewService(opts Opts) *ProxyService {
s := ProxyService{
Router: sdkRouter,
logger: monitor.NewProxyLogger(),
Router: opts.SDKRouter,
rpcTimeout: opts.RPCTimeout,
logger: monitor.NewProxyLogger(),
}
if s.rpcTimeout == 0 {
s.rpcTimeout = defaultRPCTimeout
}
return &s
}
Expand All @@ -70,9 +83,11 @@ func NewService(sdkRouter router.SDKRouter) *ProxyService {
// Note that `SetWalletID` needs to be called if an authenticated user is making this call.
func (ps *ProxyService) NewCaller(walletID string) *Caller {
endpoint := ps.Router.GetSDKServerAddress(walletID)
client := jsonrpc.NewClientWithOpts(endpoint, &jsonrpc.RPCClientOpts{
HTTPClient: &http.Client{Timeout: time.Second * ps.rpcTimeout}})
c := Caller{
walletID: walletID,
client: jsonrpc.NewClient(endpoint),
client: client,
endpoint: endpoint,
service: ps,
retries: walletLoadRetries,
Expand Down
26 changes: 13 additions & 13 deletions app/proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestNewCaller(t *testing.T) {
"default": "http://lbrynet1",
"second": "http://lbrynet2",
}
svc := NewService(router.New(servers))
svc := NewService(Opts{SDKRouter: router.New(servers)})
c := svc.NewCaller("")
assert.Equal(t, svc, c.service)

Expand All @@ -107,7 +107,7 @@ func TestNewCaller(t *testing.T) {
}

func TestCallerSetWalletID(t *testing.T) {
svc := NewService(router.NewDefault())
svc := NewService(Opts{SDKRouter: router.NewDefault()})
c := svc.NewCaller("abc")
assert.Equal(t, "abc", c.walletID)
}
Expand All @@ -118,7 +118,7 @@ func TestCallerCallResolve(t *testing.T) {
resolveResponse ljsonrpc.ResolveResponse
)

svc := NewService(router.NewDefault())
svc := NewService(Opts{SDKRouter: router.NewDefault()})
c := svc.NewCaller("")

resolvedURL := "what#6769855a9aa43b67086f9ff3c1a5bacb5698a27a"
Expand All @@ -143,7 +143,7 @@ func TestCallerCallWalletBalance(t *testing.T) {
_, wid, err := lbrynet.InitializeWallet(dummyUserID)
require.NoError(t, err)

svc := NewService(router.NewDefault())
svc := NewService(Opts{SDKRouter: router.NewDefault()})
request := newRawRequest(t, "wallet_balance", nil)

c := svc.NewCaller("")
Expand Down Expand Up @@ -172,7 +172,7 @@ func TestCallerCallDoesReloadWallet(t *testing.T) {
_, err := lbrynet.WalletRemove(dummyUserID)
require.NoError(t, err)

svc := NewService(router.NewDefault())
svc := NewService(Opts{SDKRouter: router.NewDefault()})
c := svc.NewCaller(wid)

request := newRawRequest(t, "wallet_balance", nil)
Expand All @@ -191,7 +191,7 @@ func TestCallerCallRelaxedMethods(t *testing.T) {
return
}
mockClient := &ClientMock{}
svc := NewService(router.NewDefault())
svc := NewService(Opts{SDKRouter: router.NewDefault()})
c := Caller{
client: mockClient,
service: svc,
Expand All @@ -211,7 +211,7 @@ func TestCallerCallRelaxedMethods(t *testing.T) {
func TestCallerCallNonRelaxedMethods(t *testing.T) {
for _, m := range walletSpecificMethods {
mockClient := &ClientMock{}
svc := NewService(router.NewDefault())
svc := NewService(Opts{SDKRouter: router.NewDefault()})
c := Caller{
client: mockClient,
service: svc,
Expand All @@ -224,7 +224,7 @@ func TestCallerCallNonRelaxedMethods(t *testing.T) {

func TestCallerCallForbiddenMethod(t *testing.T) {
mockClient := &ClientMock{}
svc := NewService(router.NewDefault())
svc := NewService(Opts{SDKRouter: router.NewDefault()})
c := Caller{
client: mockClient,
service: svc,
Expand All @@ -240,7 +240,7 @@ func TestCallerCallAttachesWalletID(t *testing.T) {
rand.Seed(time.Now().UnixNano())
dummyWalletID := "abc123321"

svc := NewService(router.NewDefault())
svc := NewService(Opts{SDKRouter: router.NewDefault()})
c := Caller{
walletID: dummyWalletID,
client: mockClient,
Expand All @@ -260,7 +260,7 @@ func TestCallerCallAttachesWalletID(t *testing.T) {
}

func TestCallerSetPreprocessor(t *testing.T) {
svc := NewService(router.NewDefault())
svc := NewService(Opts{SDKRouter: router.NewDefault()})
client := &ClientMock{}
c := Caller{
client: client,
Expand Down Expand Up @@ -317,7 +317,7 @@ func TestCallerCallSDKError(t *testing.T) {
}
`))
}))
svc := NewService(router.New(router.SingleLbrynetServer(ts.URL)))
svc := NewService(Opts{SDKRouter: router.New(router.SingleLbrynetServer(ts.URL))})
c := svc.NewCaller("")

hook := logrus_test.NewLocal(svc.logger.Logger())
Expand All @@ -335,7 +335,7 @@ func TestCallerCallClientJSONError(t *testing.T) {
responses.PrepareJSONWriter(w)
w.Write([]byte(`{"method":"version}`))
}))
svc := NewService(router.New(router.SingleLbrynetServer(ts.URL)))
svc := NewService(Opts{SDKRouter: router.New(router.SingleLbrynetServer(ts.URL))})
c := svc.NewCaller("")

hook := logrus_test.NewLocal(svc.logger.Logger())
Expand Down Expand Up @@ -375,7 +375,7 @@ func TestQueryParamsAsMap(t *testing.T) {
func TestSDKMethodStatus(t *testing.T) {
var rpcResponse jsonrpc.RPCResponse

svc := NewService(router.NewDefault())
svc := NewService(Opts{SDKRouter: router.NewDefault()})
c := svc.NewCaller("")
request := newRawRequest(t, "status", nil)
callResult := c.Call(request)
Expand Down
2 changes: 1 addition & 1 deletion app/publish/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestLbrynetPublisher(t *testing.T) {
config.Override("InternalAPIHost", ts.URL)
defer config.RestoreOverridden()

p := &LbrynetPublisher{proxy.NewService(router.New(config.GetLbrynetServers()))}
p := &LbrynetPublisher{proxy.NewService(proxy.Opts{SDKRouter: router.New(config.GetLbrynetServers())})}

walletSvc := users.NewWalletService()
u, err := walletSvc.Retrieve(users.Query{Token: authToken})
Expand Down
2 changes: 1 addition & 1 deletion cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var rootCmd = &cobra.Command{
Run: func(cmd *cobra.Command, args []string) {
s := server.NewServer(server.Options{
Address: config.GetAddress(),
ProxyService: proxy.NewService(router.New(config.GetLbrynetServers())),
ProxyService: proxy.NewService(proxy.Opts{SDKRouter: router.New(config.GetLbrynetServers())}),
})
err := s.Start()
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
func TestStartAndServeUntilShutdown(t *testing.T) {
server := NewServer(Options{
Address: "localhost:40080",
ProxyService: proxy.NewService(router.NewDefault()),
ProxyService: proxy.NewService(proxy.Opts{SDKRouter: router.NewDefault()}),
})
server.Start()
go server.ServeUntilShutdown()
Expand Down Expand Up @@ -49,7 +49,7 @@ func TestHeaders(t *testing.T) {

server := NewServer(Options{
Address: "localhost:40080",
ProxyService: proxy.NewService(router.NewDefault()),
ProxyService: proxy.NewService(proxy.Opts{SDKRouter: router.NewDefault()}),
})
server.Start()
go server.ServeUntilShutdown()
Expand Down

0 comments on commit 8ce9269

Please sign in to comment.