Skip to content

Commit

Permalink
Make MaxCallRecvMsgSize and MaxCallSendMsgSize configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
bancek committed Jun 8, 2020
1 parent 648708f commit feb158a
Show file tree
Hide file tree
Showing 8 changed files with 215 additions and 2 deletions.
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -16,6 +16,7 @@ require (
github.com/onsi/gomega v1.10.1
github.com/sirupsen/logrus v1.6.0
github.com/stretchr/objx v0.1.1 // indirect
github.com/stretchr/testify v1.6.1
go.etcd.io/etcd v0.0.0-20200520232829-54ba9589114f
go.uber.org/zap v1.15.0 // indirect
golang.org/x/sync v0.0.0-20190423024810-112230192c58
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Expand Up @@ -172,11 +172,14 @@ github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3
github.com/spf13/pflag v1.0.1 h1:aCvUg6QPl3ibpQUxyLkrEkCHtPqYJL4x9AuhqVqFis4=
github.com/spf13/pflag v1.0.1/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8 h1:ndzgwNDnKIqyCvHTXaCqh9KlOWKvBry6nuXMJmonVsE=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
Expand Down Expand Up @@ -305,6 +308,8 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM=
Expand Down
8 changes: 8 additions & 0 deletions pkg/lockproxy/config.go
Expand Up @@ -33,6 +33,14 @@ type Config struct {
// (LOCKPROXY_PROXYREQUESTABORTTIMEOUT)
ProxyRequestAbortTimeout time.Duration `default:"10s"`

// ProxyGrpcMaxCallRecvMsgSize is the maximum message size in bytes the proxy can receive.
// (LOCKPROXY_PROXYGRPCMAXCALLRECVMSGSIZE)
ProxyGrpcMaxCallRecvMsgSize int `default:"4194304"`

// ProxyGrpcMaxCallSendMsgSize is the maximum message size in bytes the proxy can send.
// (LOCKPROXY_PROXYGRPCMAXCALLSENDMSGSIZE)
ProxyGrpcMaxCallSendMsgSize int `default:"4194304"`

// HealthListenAddr is the address of the gRPC Health server. It should
// only be used internally. Health probes should be directed to ListenAddr.
// (LOCKPROXY_HEALTHLISTENADDR)
Expand Down
2 changes: 1 addition & 1 deletion pkg/lockproxy/health_server.go
Expand Up @@ -5,7 +5,7 @@ import (
"google.golang.org/grpc/health/grpc_health_v1"
)

func NewHealthServer(healthService *HealthService) *grpc.Server {
func NewHealthServer(healthService grpc_health_v1.HealthServer) *grpc.Server {
server := grpc.NewServer()

grpc_health_v1.RegisterHealthServer(server, healthService)
Expand Down
17 changes: 17 additions & 0 deletions pkg/lockproxy/helpers_test.go
@@ -1,11 +1,14 @@
package lockproxy_test

import (
"context"
"time"

"github.com/google/uuid"
"github.com/onsi/ginkgo"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/mock"
"google.golang.org/grpc/health/grpc_health_v1"
)

func NewLogger() *logrus.Logger {
Expand All @@ -29,3 +32,17 @@ func NewLoggerEntry() *logrus.Entry {
func Rand() string {
return uuid.New().String()[:8]
}

type healthServiceMock struct {
mock.Mock
}

func (m *healthServiceMock) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
args := m.Called()
return args.Get(0).(*grpc_health_v1.HealthCheckResponse), args.Error(1)
}

func (m *healthServiceMock) Watch(req *grpc_health_v1.HealthCheckRequest, w grpc_health_v1.Health_WatchServer) error {
args := m.Called()
return args.Error(0)
}
2 changes: 2 additions & 0 deletions pkg/lockproxy/lockproxy.go
Expand Up @@ -104,6 +104,8 @@ func (p *LockProxy) Init(ctx context.Context) error {
p.upstreamAddrProvider,
p.config.HealthListenAddr,
grpcDialTransportSecurity,
p.config.ProxyGrpcMaxCallRecvMsgSize,
p.config.ProxyGrpcMaxCallSendMsgSize,
p.config.ProxyRequestAbortTimeout,
p.logger,
)
Expand Down
12 changes: 11 additions & 1 deletion pkg/lockproxy/proxy_director.go
Expand Up @@ -26,6 +26,8 @@ type ProxyDirector struct {
upstreamAddrProvider UpstreamAddrProvider
healthAddr string
grpcDialTransportSecurity grpc.DialOption
grpcMaxCallRecvMsgSize int
grpcMaxCallSendMsgSize int
abortTimeout time.Duration
logger *logrus.Entry
}
Expand All @@ -35,6 +37,8 @@ func NewProxyDirector(
upstreamAddrProvider UpstreamAddrProvider,
healthAddr string,
grpcDialTransportSecurity grpc.DialOption,
grpcMaxCallRecvMsgSize int,
grpcMaxCallSendMsgSize int,
abortTimeout time.Duration,
logger *logrus.Entry,
) *ProxyDirector {
Expand All @@ -43,6 +47,8 @@ func NewProxyDirector(
upstreamAddrProvider: upstreamAddrProvider,
healthAddr: healthAddr,
grpcDialTransportSecurity: grpcDialTransportSecurity,
grpcMaxCallRecvMsgSize: grpcMaxCallRecvMsgSize,
grpcMaxCallSendMsgSize: grpcMaxCallSendMsgSize,
abortTimeout: abortTimeout,
logger: logger,
}
Expand Down Expand Up @@ -81,7 +87,11 @@ func (d *ProxyDirector) Director(ctx context.Context, fullMethodName string) (co
clientConn, err := grpc.DialContext(
outCtx,
addr,
grpc.WithDefaultCallOptions(grpc.CustomCodecCallOption{Codec: proxy.Codec()}),
grpc.WithDefaultCallOptions(
grpc.CustomCodecCallOption{Codec: proxy.Codec()},
grpc.MaxCallRecvMsgSize(d.grpcMaxCallRecvMsgSize),
grpc.MaxCallSendMsgSize(d.grpcMaxCallSendMsgSize),
),
d.grpcDialTransportSecurity,
)
if err != nil {
Expand Down
170 changes: 170 additions & 0 deletions pkg/lockproxy/proxy_director_test.go
@@ -0,0 +1,170 @@
package lockproxy_test

import (
"net"
"strings"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"

. "github.com/bancek/lockproxy/pkg/lockproxy"
)

var _ = Describe("ProxyDirector", func() {
createHealthServer := func() (string, *healthServiceMock, func()) {
healthListener, err := net.Listen("tcp", "127.0.0.1:0")
Expect(err).NotTo(HaveOccurred())

healthAddr := healthListener.Addr().String()

healthService := new(healthServiceMock)
healthService.On("Check").Return(&grpc_health_v1.HealthCheckResponse{
Status: grpc_health_v1.HealthCheckResponse_SERVING,
}, nil)
healthServer := NewHealthServer(healthService)
go healthServer.Serve(healthListener)

return healthAddr, healthService, func() {
healthServer.Stop()
healthListener.Close()
}
}

createDirector := func(
upstreamAddrProvider UpstreamAddrProvider,
healthAddr string,
grpcMaxCallRecvMsgSize int,
grpcMaxCallSendMsgSize int,
abortTimeout time.Duration,
) (grpc_health_v1.HealthClient, func()) {
grpcDialTransportSecurity := grpc.WithInsecure()

proxyListener, err := net.Listen("tcp", "127.0.0.1:0")
Expect(err).NotTo(HaveOccurred())

proxyAddr := proxyListener.Addr().String()

proxyDirector := NewProxyDirector(
TestCtx,
upstreamAddrProvider,
healthAddr,
grpcDialTransportSecurity,
grpcMaxCallRecvMsgSize,
grpcMaxCallSendMsgSize,
abortTimeout,
Logger,
)
proxyServer := NewProxyServer(proxyDirector)
go proxyServer.Serve(proxyListener)

conn, err := grpc.DialContext(TestCtx, proxyAddr, grpc.WithInsecure())
Expect(err).NotTo(HaveOccurred())

healthClient := grpc_health_v1.NewHealthClient(conn)

return healthClient, func() {
conn.Close()
proxyServer.Stop()
proxyListener.Close()
}
}

It("should proxy health to upstream if leader", func() {
healthAddr1, healthServiceMock1, healthStop1 := createHealthServer()
defer healthStop1()
healthAddr2, healthServiceMock2, healthStop2 := createHealthServer()
defer healthStop2()
upstreamAddrProvider := func() (addr string, isLeader bool) {
return healthAddr1, true
}
healthClient, stop := createDirector(
upstreamAddrProvider,
healthAddr2,
4*1024*1024,
4*1024*1024,
10*time.Second,
)
defer stop()

resp, err := healthClient.Check(TestCtx, &grpc_health_v1.HealthCheckRequest{})
Expect(err).NotTo(HaveOccurred())
Expect(resp.Status).To(Equal(grpc_health_v1.HealthCheckResponse_SERVING))

healthServiceMock1.AssertCalled(GinkgoT(), "Check")
healthServiceMock2.AssertNumberOfCalls(GinkgoT(), "Check", 0)
})

It("should proxy health to local server if not leader", func() {
healthAddr1, healthServiceMock1, healthStop1 := createHealthServer()
defer healthStop1()
healthAddr2, healthServiceMock2, healthStop2 := createHealthServer()
defer healthStop2()
upstreamAddrProvider := func() (addr string, isLeader bool) {
return healthAddr1, false
}
healthClient, stop := createDirector(
upstreamAddrProvider,
healthAddr2,
4*1024*1024,
4*1024*1024,
10*time.Second,
)
defer stop()

resp, err := healthClient.Check(TestCtx, &grpc_health_v1.HealthCheckRequest{})
Expect(err).NotTo(HaveOccurred())
Expect(resp.Status).To(Equal(grpc_health_v1.HealthCheckResponse_SERVING))

healthServiceMock1.AssertNumberOfCalls(GinkgoT(), "Check", 0)
healthServiceMock2.AssertCalled(GinkgoT(), "Check")
})

It("should fail if response is larger than grpcMaxCallRecvMsgSize", func() {
healthAddr, healthServiceMock, healthStop := createHealthServer()
defer healthStop()
upstreamAddrProvider := func() (addr string, isLeader bool) {
return healthAddr, true
}
healthClient, stop := createDirector(
upstreamAddrProvider,
healthAddr,
1,
4*1024*1024,
10*time.Second,
)
defer stop()

_, err := healthClient.Check(TestCtx, &grpc_health_v1.HealthCheckRequest{})
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("received message larger than max"))

healthServiceMock.AssertNumberOfCalls(GinkgoT(), "Check", 1)
})

It("should fail if request is larger than grpcMaxCallSendMsgSize", func() {
healthAddr, healthServiceMock, healthStop := createHealthServer()
defer healthStop()
upstreamAddrProvider := func() (addr string, isLeader bool) {
return healthAddr, true
}
healthClient, stop := createDirector(
upstreamAddrProvider,
healthAddr,
4*1024*1024,
1,
10*time.Second,
)
defer stop()

_, err := healthClient.Check(TestCtx, &grpc_health_v1.HealthCheckRequest{
Service: strings.Repeat("a", 1*1024*1024),
})
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("trying to send message larger than max"))

healthServiceMock.AssertNumberOfCalls(GinkgoT(), "Check", 0)
})
})

0 comments on commit feb158a

Please sign in to comment.