Skip to content

Commit

Permalink
proxyd: Add frontend rate limiting
Browse files Browse the repository at this point in the history
To give us more flexibiltiy with rate limiting, proxyd now supports rate limiting of client (frontend) requests in addition to upstream (backend) requests. This PR also gives us the ability to exempt certain user agents/origins from rate limiting.
  • Loading branch information
mslipper committed Aug 3, 2022
1 parent b1cc033 commit e339ba5
Show file tree
Hide file tree
Showing 12 changed files with 253 additions and 69 deletions.
5 changes: 5 additions & 0 deletions .changeset/giant-gifts-attend.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@eth-optimism/proxyd': minor
---

Add frontend rate limiting
21 changes: 12 additions & 9 deletions proxyd/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ import (
)

const (
JSONRPCVersion = "2.0"
JSONRPCErrorInternal = -32000
JSONRPCVersion = "2.0"
JSONRPCErrorInternal = -32000
JSONRPCErrorRateLimited = JSONRPCErrorInternal - 16
)

var (
Expand Down Expand Up @@ -56,7 +57,7 @@ var (
HTTPErrorCode: 503,
}
ErrBackendOverCapacity = &RPCErr{
Code: JSONRPCErrorInternal - 12,
Code: JSONRPCErrorRateLimited,
Message: "backend is over capacity",
HTTPErrorCode: 429,
}
Expand Down Expand Up @@ -92,7 +93,7 @@ type Backend struct {
wsURL string
authUsername string
authPassword string
rateLimiter RateLimiter
rateLimiter BackendRateLimiter
client *LimitedHTTPClient
dialer *websocket.Dialer
maxRetries int
Expand Down Expand Up @@ -174,7 +175,7 @@ func NewBackend(
name string,
rpcURL string,
wsURL string,
rateLimiter RateLimiter,
rateLimiter BackendRateLimiter,
rpcSemaphore *semaphore.Weighted,
opts ...BackendOpt,
) *Backend {
Expand Down Expand Up @@ -362,10 +363,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool

xForwardedFor := GetXForwardedFor(ctx)
if b.stripTrailingXFF {
ipList := strings.Split(xForwardedFor, ", ")
if len(ipList) > 0 {
xForwardedFor = ipList[0]
}
xForwardedFor = stripXFF(xForwardedFor)
} else if b.proxydIP != "" {
xForwardedFor = fmt.Sprintf("%s, %s", xForwardedFor, b.proxydIP)
}
Expand Down Expand Up @@ -835,3 +833,8 @@ func RecordBatchRPCForward(ctx context.Context, backendName string, reqs []*RPCR
RecordRPCForward(ctx, backendName, req.Method, source)
}
}

func stripXFF(xff string) string {
ipList := strings.Split(xff, ", ")
return strings.TrimSpace(ipList[0])
}
8 changes: 8 additions & 0 deletions proxyd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ type MetricsConfig struct {
Port int `toml:"port"`
}

type RateLimitConfig struct {
RatePerSecond int `toml:"rate_per_second"`
ExemptOrigins []string `toml:"exempt_origins"`
ExemptUserAgents []string `toml:"exempt_user_agents"`
ErrorMessage string `toml:"error_message"`
}

type BackendOptions struct {
ResponseTimeoutSeconds int `toml:"response_timeout_seconds"`
MaxResponseSizeBytes int64 `toml:"max_response_size_bytes"`
Expand Down Expand Up @@ -75,6 +82,7 @@ type Config struct {
Cache CacheConfig `toml:"cache"`
Redis RedisConfig `toml:"redis"`
Metrics MetricsConfig `toml:"metrics"`
RateLimit RateLimitConfig `toml:"rate_limit"`
BackendOptions BackendOptions `toml:"backend"`
Backends BackendsConfig `toml:"backends"`
Authentication map[string]string `toml:"authentication"`
Expand Down
3 changes: 2 additions & 1 deletion proxyd/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d
github.com/prometheus/client_golang v1.11.0
github.com/rs/cors v1.8.2
github.com/sethvargo/go-limiter v0.7.2
github.com/stretchr/testify v1.7.0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
)
Expand Down Expand Up @@ -59,7 +60,7 @@ require (
github.com/yusufpapurcu/wmi v1.2.2 // indirect
golang.org/x/crypto v0.0.0-20220307211146-efcb8507fb70 // indirect
golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5 // indirect
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 // indirect
golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9 // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
Expand Down
6 changes: 4 additions & 2 deletions proxyd/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,8 @@ github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD
github.com/segmentio/kafka-go v0.1.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo=
github.com/segmentio/kafka-go v0.2.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/sethvargo/go-limiter v0.7.2 h1:FgC4N7RMpV5gMrUdda15FaFTkQ/L4fEqM7seXMs4oO8=
github.com/sethvargo/go-limiter v0.7.2/go.mod h1:C0kbSFbiriE5k2FFOe18M1YZbAR2Fiwf72uGu0CXCcU=
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
Expand Down Expand Up @@ -701,8 +703,8 @@ golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxb
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 h1:M73Iuj3xbbb9Uk1DYhzydthsj6oOd6l9bpuFcNoUvTs=
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9 h1:ftMN5LMiBFjbzleLqtoBZk7KdJwhuybIU+FckUHgoyQ=
golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand Down
79 changes: 72 additions & 7 deletions proxyd/integration_tests/rate_limit_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package integration_tests

import (
"fmt"
"net/http"
"os"
"testing"
"time"

"github.com/ethereum-optimism/optimism/proxyd"
"github.com/stretchr/testify/require"
Expand All @@ -13,18 +16,83 @@ type resWithCode struct {
res []byte
}

func TestMaxRPSLimit(t *testing.T) {
const frontendOverLimitResponse = `{"error":{"code":-32016,"message":"over rate limit"},"id":null,"jsonrpc":"2.0"}`

func TestBackendMaxRPSLimit(t *testing.T) {
goodBackend := NewMockBackend(BatchedResponseHandler(200, goodResponse))
defer goodBackend.Close()

require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", goodBackend.URL()))

config := ReadConfig("rate_limit")
config := ReadConfig("backend_rate_limit")
client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config)
require.NoError(t, err)
defer shutdown()

limitedRes, codes := spamReqs(t, client, 503)
require.Equal(t, 2, codes[200])
require.Equal(t, 1, codes[503])
RequireEqualJSON(t, []byte(noBackendsResponse), limitedRes)
}

func TestFrontendMaxRPSLimit(t *testing.T) {
goodBackend := NewMockBackend(BatchedResponseHandler(200, goodResponse))
defer goodBackend.Close()

require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", goodBackend.URL()))

config := ReadConfig("frontend_rate_limit")
shutdown, err := proxyd.Start(config)
require.NoError(t, err)
defer shutdown()

t.Run("non-exempt over limit", func(t *testing.T) {
client := NewProxydClient("http://127.0.0.1:8545")
limitedRes, codes := spamReqs(t, client, 429)
require.Equal(t, 1, codes[429])
require.Equal(t, 2, codes[200])
RequireEqualJSON(t, []byte(frontendOverLimitResponse), limitedRes)
})

t.Run("exempt user agent over limit", func(t *testing.T) {
h := make(http.Header)
h.Set("User-Agent", "exempt_agent")
client := NewProxydClientWithHeaders("http://127.0.0.1:8545", h)
_, codes := spamReqs(t, client, 429)
require.Equal(t, 3, codes[200])
})

t.Run("exempt origin over limit", func(t *testing.T) {
h := make(http.Header)
h.Set("Origin", "exempt_origin")
client := NewProxydClientWithHeaders("http://127.0.0.1:8545", h)
_, codes := spamReqs(t, client, 429)
fmt.Println(codes)
require.Equal(t, 3, codes[200])
})

t.Run("multiple xff", func(t *testing.T) {
h1 := make(http.Header)
h1.Set("X-Forwarded-For", "0.0.0.0")
h2 := make(http.Header)
h2.Set("X-Forwarded-For", "1.1.1.1")
client1 := NewProxydClientWithHeaders("http://127.0.0.1:8545", h1)
client2 := NewProxydClientWithHeaders("http://127.0.0.1:8545", h2)
_, codes := spamReqs(t, client1, 429)
require.Equal(t, 1, codes[429])
require.Equal(t, 2, codes[200])
_, code, err := client2.SendRPC("eth_chainId", nil)
require.Equal(t, 200, code)
require.NoError(t, err)
time.Sleep(time.Second)
_, code, err = client2.SendRPC("eth_chainId", nil)
require.Equal(t, 200, code)
require.NoError(t, err)
})
}

func spamReqs(t *testing.T, client *ProxydHTTPClient, limCode int) ([]byte, map[int]int) {
resCh := make(chan *resWithCode)
for i := 0; i < 3; i++ {
go func() {
Expand All @@ -48,13 +116,10 @@ func TestMaxRPSLimit(t *testing.T) {
codes[code] += 1
}

// 503 because there's only one backend available
if code == 503 {
if code == limCode {
limitedRes = res.res
}
}

require.Equal(t, 2, codes[200])
require.Equal(t, 1, codes[503])
RequireEqualJSON(t, []byte(noBackendsResponse), limitedRes)
return limitedRes, codes
}
23 changes: 23 additions & 0 deletions proxyd/integration_tests/testdata/frontend_rate_limit.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[server]
rpc_port = 8545

[backend]
response_timeout_seconds = 1

[backends]
[backends.good]
rpc_url = "$GOOD_BACKEND_RPC_URL"
ws_url = "$GOOD_BACKEND_RPC_URL"

[backend_groups]
[backend_groups.main]
backends = ["good"]

[rpc_method_mappings]
eth_chainId = "main"

[rate_limit]
rate_per_second = 2
exempt_origins = ["exempt_origin"]
exempt_user_agents = ["exempt_agent"]
error_message = "over rate limit"
22 changes: 19 additions & 3 deletions proxyd/integration_tests/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,21 @@ import (
)

type ProxydHTTPClient struct {
url string
url string
headers http.Header
}

func NewProxydClient(url string) *ProxydHTTPClient {
return &ProxydHTTPClient{url: url}
return NewProxydClientWithHeaders(url, make(http.Header))
}

func NewProxydClientWithHeaders(url string, headers http.Header) *ProxydHTTPClient {
clonedHeaders := headers.Clone()
clonedHeaders.Set("Content-Type", "application/json")
return &ProxydHTTPClient{
url: url,
headers: clonedHeaders,
}
}

func (p *ProxydHTTPClient) SendRPC(method string, params []interface{}) ([]byte, int, error) {
Expand All @@ -45,7 +55,13 @@ func (p *ProxydHTTPClient) SendBatchRPC(reqs ...*proxyd.RPCReq) ([]byte, int, er
}

func (p *ProxydHTTPClient) SendRequest(body []byte) ([]byte, int, error) {
res, err := http.Post(p.url, "application/json", bytes.NewReader(body))
req, err := http.NewRequest("POST", p.url, bytes.NewReader(body))
if err != nil {
panic(err)
}
req.Header = p.headers

res, err := http.DefaultClient.Do(req)
if err != nil {
return nil, -1, err
}
Expand Down
10 changes: 7 additions & 3 deletions proxyd/proxyd.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ func Start(config *Config) (func(), error) {
redisURL = rURL
}

var lim RateLimiter
var lim BackendRateLimiter
var err error
if redisURL == "" {
log.Warn("redis is not configured, using local rate limiter")
lim = NewLocalRateLimiter()
lim = NewLocalBackendRateLimiter()
} else {
lim, err = NewRedisRateLimiter(redisURL)
if err != nil {
Expand Down Expand Up @@ -212,7 +212,7 @@ func Start(config *Config) (func(), error) {
rpcCache = newRPCCache(newCacheWithCompression(cache), blockNumFn, gasPriceFn, config.Cache.NumBlockConfirmations)
}

srv := NewServer(
srv, err := NewServer(
backendGroups,
wsBackendGroup,
NewStringSetFromStrings(config.WSMethodWhitelist),
Expand All @@ -222,9 +222,13 @@ func Start(config *Config) (func(), error) {
secondsToDuration(config.Server.TimeoutSeconds),
config.Server.MaxUpstreamBatchSize,
rpcCache,
config.RateLimit,
config.Server.EnableRequestLog,
config.Server.MaxRequestBodyLogLen,
)
if err != nil {
return nil, fmt.Errorf("error creating server: %w", err)
}

if config.Metrics.Enabled {
addr := fmt.Sprintf("%s:%d", config.Metrics.Host, config.Metrics.Port)
Expand Down

0 comments on commit e339ba5

Please sign in to comment.