From feb158a32e66402d62bcea3bb248881238cccd08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luka=20Zakraj=C5=A1ek?= Date: Tue, 9 Jun 2020 00:39:09 +0200 Subject: [PATCH] Make MaxCallRecvMsgSize and MaxCallSendMsgSize configurable --- go.mod | 1 + go.sum | 5 + pkg/lockproxy/config.go | 8 ++ pkg/lockproxy/health_server.go | 2 +- pkg/lockproxy/helpers_test.go | 17 +++ pkg/lockproxy/lockproxy.go | 2 + pkg/lockproxy/proxy_director.go | 12 +- pkg/lockproxy/proxy_director_test.go | 170 +++++++++++++++++++++++++++ 8 files changed, 215 insertions(+), 2 deletions(-) create mode 100644 pkg/lockproxy/proxy_director_test.go diff --git a/go.mod b/go.mod index f882a24..436b099 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/onsi/gomega v1.10.1 github.com/sirupsen/logrus v1.6.0 github.com/stretchr/objx v0.1.1 // indirect + github.com/stretchr/testify v1.6.1 go.etcd.io/etcd v0.0.0-20200520232829-54ba9589114f go.uber.org/zap v1.15.0 // indirect golang.org/x/sync v0.0.0-20190423024810-112230192c58 diff --git a/go.sum b/go.sum index f0acc95..ca528d0 100644 --- a/go.sum +++ b/go.sum @@ -172,11 +172,14 @@ github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3 github.com/spf13/pflag v1.0.1 h1:aCvUg6QPl3ibpQUxyLkrEkCHtPqYJL4x9AuhqVqFis4= github.com/spf13/pflag v1.0.1/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8 h1:ndzgwNDnKIqyCvHTXaCqh9KlOWKvBry6nuXMJmonVsE= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= @@ -305,6 +308,8 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM= diff --git a/pkg/lockproxy/config.go b/pkg/lockproxy/config.go index a96ecf0..84ebd1f 100644 --- a/pkg/lockproxy/config.go +++ b/pkg/lockproxy/config.go @@ -33,6 +33,14 @@ type Config struct { // (LOCKPROXY_PROXYREQUESTABORTTIMEOUT) ProxyRequestAbortTimeout time.Duration `default:"10s"` + // ProxyGrpcMaxCallRecvMsgSize is the maximum message size in bytes the proxy can receive. + // (LOCKPROXY_PROXYGRPCMAXCALLRECVMSGSIZE) + ProxyGrpcMaxCallRecvMsgSize int `default:"4194304"` + + // ProxyGrpcMaxCallSendMsgSize is the maximum message size in bytes the proxy can send. + // (LOCKPROXY_PROXYGRPCMAXCALLSENDMSGSIZE) + ProxyGrpcMaxCallSendMsgSize int `default:"4194304"` + // HealthListenAddr is the address of the gRPC Health server. It should // only be used internally. Health probes should be directed to ListenAddr. // (LOCKPROXY_HEALTHLISTENADDR) diff --git a/pkg/lockproxy/health_server.go b/pkg/lockproxy/health_server.go index 2ce0683..ced884b 100644 --- a/pkg/lockproxy/health_server.go +++ b/pkg/lockproxy/health_server.go @@ -5,7 +5,7 @@ import ( "google.golang.org/grpc/health/grpc_health_v1" ) -func NewHealthServer(healthService *HealthService) *grpc.Server { +func NewHealthServer(healthService grpc_health_v1.HealthServer) *grpc.Server { server := grpc.NewServer() grpc_health_v1.RegisterHealthServer(server, healthService) diff --git a/pkg/lockproxy/helpers_test.go b/pkg/lockproxy/helpers_test.go index a430205..5593279 100644 --- a/pkg/lockproxy/helpers_test.go +++ b/pkg/lockproxy/helpers_test.go @@ -1,11 +1,14 @@ package lockproxy_test import ( + "context" "time" "github.com/google/uuid" "github.com/onsi/ginkgo" "github.com/sirupsen/logrus" + "github.com/stretchr/testify/mock" + "google.golang.org/grpc/health/grpc_health_v1" ) func NewLogger() *logrus.Logger { @@ -29,3 +32,17 @@ func NewLoggerEntry() *logrus.Entry { func Rand() string { return uuid.New().String()[:8] } + +type healthServiceMock struct { + mock.Mock +} + +func (m *healthServiceMock) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) { + args := m.Called() + return args.Get(0).(*grpc_health_v1.HealthCheckResponse), args.Error(1) +} + +func (m *healthServiceMock) Watch(req *grpc_health_v1.HealthCheckRequest, w grpc_health_v1.Health_WatchServer) error { + args := m.Called() + return args.Error(0) +} diff --git a/pkg/lockproxy/lockproxy.go b/pkg/lockproxy/lockproxy.go index f5f94eb..74762da 100644 --- a/pkg/lockproxy/lockproxy.go +++ b/pkg/lockproxy/lockproxy.go @@ -104,6 +104,8 @@ func (p *LockProxy) Init(ctx context.Context) error { p.upstreamAddrProvider, p.config.HealthListenAddr, grpcDialTransportSecurity, + p.config.ProxyGrpcMaxCallRecvMsgSize, + p.config.ProxyGrpcMaxCallSendMsgSize, p.config.ProxyRequestAbortTimeout, p.logger, ) diff --git a/pkg/lockproxy/proxy_director.go b/pkg/lockproxy/proxy_director.go index e649aeb..d3de45b 100644 --- a/pkg/lockproxy/proxy_director.go +++ b/pkg/lockproxy/proxy_director.go @@ -26,6 +26,8 @@ type ProxyDirector struct { upstreamAddrProvider UpstreamAddrProvider healthAddr string grpcDialTransportSecurity grpc.DialOption + grpcMaxCallRecvMsgSize int + grpcMaxCallSendMsgSize int abortTimeout time.Duration logger *logrus.Entry } @@ -35,6 +37,8 @@ func NewProxyDirector( upstreamAddrProvider UpstreamAddrProvider, healthAddr string, grpcDialTransportSecurity grpc.DialOption, + grpcMaxCallRecvMsgSize int, + grpcMaxCallSendMsgSize int, abortTimeout time.Duration, logger *logrus.Entry, ) *ProxyDirector { @@ -43,6 +47,8 @@ func NewProxyDirector( upstreamAddrProvider: upstreamAddrProvider, healthAddr: healthAddr, grpcDialTransportSecurity: grpcDialTransportSecurity, + grpcMaxCallRecvMsgSize: grpcMaxCallRecvMsgSize, + grpcMaxCallSendMsgSize: grpcMaxCallSendMsgSize, abortTimeout: abortTimeout, logger: logger, } @@ -81,7 +87,11 @@ func (d *ProxyDirector) Director(ctx context.Context, fullMethodName string) (co clientConn, err := grpc.DialContext( outCtx, addr, - grpc.WithDefaultCallOptions(grpc.CustomCodecCallOption{Codec: proxy.Codec()}), + grpc.WithDefaultCallOptions( + grpc.CustomCodecCallOption{Codec: proxy.Codec()}, + grpc.MaxCallRecvMsgSize(d.grpcMaxCallRecvMsgSize), + grpc.MaxCallSendMsgSize(d.grpcMaxCallSendMsgSize), + ), d.grpcDialTransportSecurity, ) if err != nil { diff --git a/pkg/lockproxy/proxy_director_test.go b/pkg/lockproxy/proxy_director_test.go new file mode 100644 index 0000000..fd37147 --- /dev/null +++ b/pkg/lockproxy/proxy_director_test.go @@ -0,0 +1,170 @@ +package lockproxy_test + +import ( + "net" + "strings" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "google.golang.org/grpc" + "google.golang.org/grpc/health/grpc_health_v1" + + . "github.com/bancek/lockproxy/pkg/lockproxy" +) + +var _ = Describe("ProxyDirector", func() { + createHealthServer := func() (string, *healthServiceMock, func()) { + healthListener, err := net.Listen("tcp", "127.0.0.1:0") + Expect(err).NotTo(HaveOccurred()) + + healthAddr := healthListener.Addr().String() + + healthService := new(healthServiceMock) + healthService.On("Check").Return(&grpc_health_v1.HealthCheckResponse{ + Status: grpc_health_v1.HealthCheckResponse_SERVING, + }, nil) + healthServer := NewHealthServer(healthService) + go healthServer.Serve(healthListener) + + return healthAddr, healthService, func() { + healthServer.Stop() + healthListener.Close() + } + } + + createDirector := func( + upstreamAddrProvider UpstreamAddrProvider, + healthAddr string, + grpcMaxCallRecvMsgSize int, + grpcMaxCallSendMsgSize int, + abortTimeout time.Duration, + ) (grpc_health_v1.HealthClient, func()) { + grpcDialTransportSecurity := grpc.WithInsecure() + + proxyListener, err := net.Listen("tcp", "127.0.0.1:0") + Expect(err).NotTo(HaveOccurred()) + + proxyAddr := proxyListener.Addr().String() + + proxyDirector := NewProxyDirector( + TestCtx, + upstreamAddrProvider, + healthAddr, + grpcDialTransportSecurity, + grpcMaxCallRecvMsgSize, + grpcMaxCallSendMsgSize, + abortTimeout, + Logger, + ) + proxyServer := NewProxyServer(proxyDirector) + go proxyServer.Serve(proxyListener) + + conn, err := grpc.DialContext(TestCtx, proxyAddr, grpc.WithInsecure()) + Expect(err).NotTo(HaveOccurred()) + + healthClient := grpc_health_v1.NewHealthClient(conn) + + return healthClient, func() { + conn.Close() + proxyServer.Stop() + proxyListener.Close() + } + } + + It("should proxy health to upstream if leader", func() { + healthAddr1, healthServiceMock1, healthStop1 := createHealthServer() + defer healthStop1() + healthAddr2, healthServiceMock2, healthStop2 := createHealthServer() + defer healthStop2() + upstreamAddrProvider := func() (addr string, isLeader bool) { + return healthAddr1, true + } + healthClient, stop := createDirector( + upstreamAddrProvider, + healthAddr2, + 4*1024*1024, + 4*1024*1024, + 10*time.Second, + ) + defer stop() + + resp, err := healthClient.Check(TestCtx, &grpc_health_v1.HealthCheckRequest{}) + Expect(err).NotTo(HaveOccurred()) + Expect(resp.Status).To(Equal(grpc_health_v1.HealthCheckResponse_SERVING)) + + healthServiceMock1.AssertCalled(GinkgoT(), "Check") + healthServiceMock2.AssertNumberOfCalls(GinkgoT(), "Check", 0) + }) + + It("should proxy health to local server if not leader", func() { + healthAddr1, healthServiceMock1, healthStop1 := createHealthServer() + defer healthStop1() + healthAddr2, healthServiceMock2, healthStop2 := createHealthServer() + defer healthStop2() + upstreamAddrProvider := func() (addr string, isLeader bool) { + return healthAddr1, false + } + healthClient, stop := createDirector( + upstreamAddrProvider, + healthAddr2, + 4*1024*1024, + 4*1024*1024, + 10*time.Second, + ) + defer stop() + + resp, err := healthClient.Check(TestCtx, &grpc_health_v1.HealthCheckRequest{}) + Expect(err).NotTo(HaveOccurred()) + Expect(resp.Status).To(Equal(grpc_health_v1.HealthCheckResponse_SERVING)) + + healthServiceMock1.AssertNumberOfCalls(GinkgoT(), "Check", 0) + healthServiceMock2.AssertCalled(GinkgoT(), "Check") + }) + + It("should fail if response is larger than grpcMaxCallRecvMsgSize", func() { + healthAddr, healthServiceMock, healthStop := createHealthServer() + defer healthStop() + upstreamAddrProvider := func() (addr string, isLeader bool) { + return healthAddr, true + } + healthClient, stop := createDirector( + upstreamAddrProvider, + healthAddr, + 1, + 4*1024*1024, + 10*time.Second, + ) + defer stop() + + _, err := healthClient.Check(TestCtx, &grpc_health_v1.HealthCheckRequest{}) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("received message larger than max")) + + healthServiceMock.AssertNumberOfCalls(GinkgoT(), "Check", 1) + }) + + It("should fail if request is larger than grpcMaxCallSendMsgSize", func() { + healthAddr, healthServiceMock, healthStop := createHealthServer() + defer healthStop() + upstreamAddrProvider := func() (addr string, isLeader bool) { + return healthAddr, true + } + healthClient, stop := createDirector( + upstreamAddrProvider, + healthAddr, + 4*1024*1024, + 1, + 10*time.Second, + ) + defer stop() + + _, err := healthClient.Check(TestCtx, &grpc_health_v1.HealthCheckRequest{ + Service: strings.Repeat("a", 1*1024*1024), + }) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("trying to send message larger than max")) + + healthServiceMock.AssertNumberOfCalls(GinkgoT(), "Check", 0) + }) +})