Skip to content

Commit

Permalink
fix:Handle gRPC messages larger than max size (#399)
Browse files Browse the repository at this point in the history
* fix:Handle gRPC messages larger than max size
  • Loading branch information
acceleratorssr authored Apr 26, 2024
1 parent d95d2ff commit 42c4eb2
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 10 deletions.
5 changes: 4 additions & 1 deletion cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func createServerCmd(execer fakeruntime.Execer, httpServer server.HTTPServer) (c
flags.IntVarP(&opt.httpPort, "http-port", "", 8080, "The HTTP server port")
flags.BoolVarP(&opt.printProto, "print-proto", "", false, "Print the proto content and exit")
flags.StringArrayVarP(&opt.localStorage, "local-storage", "", []string{"*.yaml"}, "The local storage path")
flags.IntVarP(&opt.grpcMaxRecvMsgSize, "grpc-max-recv-msg-size", "", 4*1024*1024, "The maximum received message size for gRPC clients")
flags.StringVarP(&opt.consolePath, "console-path", "", "", "The path of the console")
flags.StringVarP(&opt.configDir, "config-dir", "", os.ExpandEnv("$HOME/.config/atest"), "The config directory")
flags.StringVarP(&opt.secretServer, "secret-server", "", "", "The secret server URL")
Expand Down Expand Up @@ -129,6 +130,8 @@ type serverOption struct {

dryRun bool

grpcMaxRecvMsgSize int

// inner fields, not as command flags
provider oauth.OAuthProvider
}
Expand Down Expand Up @@ -218,7 +221,7 @@ func (o *serverOption) runE(cmd *cobra.Command, args []string) (err error) {
}

storeExtMgr := server.NewStoreExtManager(o.execer)
remoteServer := server.NewRemoteServer(loader, remote.NewGRPCloaderFromStore(), secretServer, storeExtMgr, o.configDir)
remoteServer := server.NewRemoteServer(loader, remote.NewGRPCloaderFromStore(), secretServer, storeExtMgr, o.configDir, o.grpcMaxRecvMsgSize)
kinds, storeKindsErr := remoteServer.GetStoreKinds(ctx, nil)
if storeKindsErr != nil {
cmd.PrintErrf("failed to get store kinds, error: %p\n", storeKindsErr)
Expand Down
2 changes: 0 additions & 2 deletions pkg/runner/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,6 @@ func invokeRPC(ctx context.Context, conn grpc.ClientConnInterface, method protor
resp *dynamicpb.Message, err error) {
resp = dynamicpb.NewMessage(method.Output())
md, _ := metadata.FromIncomingContext(ctx)

err = conn.Invoke(ctx, getMethodName(method), request, resp, grpc.Header(&md))
return
}
Expand All @@ -584,7 +583,6 @@ func invokeRPCStream(ctx context.Context, conn grpc.ClientConnInterface, method
ServerStreams: method.IsStreamingServer(),
ClientStreams: method.IsStreamingClient(),
}

s, err := conn.NewStream(ctx, sd, getMethodName(method))
if err != nil {
return nil, err
Expand Down
21 changes: 19 additions & 2 deletions pkg/server/remote_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"io"
"net/http"
"os"
reflect "reflect"
"regexp"
Expand All @@ -45,6 +46,7 @@ import (

var (
remoteServerLogger = logging.DefaultLogger(logging.LogLevelInfo).WithName("remote_server")
GrpcMaxRecvMsgSize int
)

type server struct {
Expand All @@ -55,6 +57,8 @@ type server struct {
storeExtMgr ExtManager

secretServer SecretServiceServer

grpcMaxRecvMsgSize int
}

type SecretServiceServer interface {
Expand Down Expand Up @@ -93,17 +97,18 @@ func (f *fakeSecretServer) UpdateSecret(ctx context.Context, in *Secret) (reply
}

// NewRemoteServer creates a remote server instance
func NewRemoteServer(loader testing.Writer, storeWriterFactory testing.StoreWriterFactory, secretServer SecretServiceServer, storeExtMgr ExtManager, configDir string) RunnerServer {
func NewRemoteServer(loader testing.Writer, storeWriterFactory testing.StoreWriterFactory, secretServer SecretServiceServer, storeExtMgr ExtManager, configDir string, grpcMaxRecvMsgSize int) RunnerServer {
if secretServer == nil {
secretServer = &fakeSecretServer{}
}

GrpcMaxRecvMsgSize = grpcMaxRecvMsgSize
return &server{
loader: loader,
storeWriterFactory: storeWriterFactory,
configDir: configDir,
secretServer: secretServer,
storeExtMgr: storeExtMgr,
grpcMaxRecvMsgSize: grpcMaxRecvMsgSize,
}
}

Expand Down Expand Up @@ -438,6 +443,18 @@ func (s *server) RunTestCase(ctx context.Context, in *TestCaseIdentity) (result
lastIndex := len(reply.TestCaseResult) - 1
lastItem := reply.TestCaseResult[lastIndex]

if len(lastItem.Body) > GrpcMaxRecvMsgSize {
e := "the HTTP response body exceeded the maximum message size limit received by the gRPC client"
result = &TestCaseResult{
Output: reply.Message,
Error: e,
Body: "",
Header: lastItem.Header,
StatusCode: http.StatusOK,
}
return
}

result = &TestCaseResult{
Output: reply.Message,
Error: reply.Error,
Expand Down
10 changes: 5 additions & 5 deletions pkg/server/remote_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestRemoteServer(t *testing.T) {

loader := atest.NewFileWriter("")
loader.Put("testdata/simple.yaml")
server := NewRemoteServer(loader, nil, nil, nil, "")
server := NewRemoteServer(loader, nil, nil, nil, "", 1024*1024*4)
_, err := server.Run(ctx, &TestTask{
Kind: "fake",
})
Expand Down Expand Up @@ -130,7 +130,7 @@ func TestRemoteServer(t *testing.T) {
func TestRunTestCase(t *testing.T) {
loader := atest.NewFileWriter("")
loader.Put("testdata/simple.yaml")
server := NewRemoteServer(loader, nil, nil, nil, "")
server := NewRemoteServer(loader, nil, nil, nil, "", 1024*1024*4)

defer gock.Clean()
gock.New(urlFoo).Get("/").MatchHeader("key", "value").
Expand Down Expand Up @@ -305,7 +305,7 @@ func TestUpdateTestCase(t *testing.T) {
assert.NoError(t, err)

ctx := context.Background()
server := NewRemoteServer(writer, nil, nil, nil, "")
server := NewRemoteServer(writer, nil, nil, nil, "", 1024*1024*4)
_, err = server.UpdateTestCase(ctx, &TestCaseWithSuite{
SuiteName: "simple",
Data: &TestCase{
Expand Down Expand Up @@ -377,7 +377,7 @@ func TestListTestCase(t *testing.T) {
writer := atest.NewFileWriter(os.TempDir())
writer.Put(tmpFile.Name())

server := NewRemoteServer(writer, nil, nil, nil, "")
server := NewRemoteServer(writer, nil, nil, nil, "", 1024*1024*4)
ctx := context.Background()

t.Run("get two testcases", func(t *testing.T) {
Expand Down Expand Up @@ -811,7 +811,7 @@ func getRemoteServerInTempDir() (server RunnerServer, call func()) {
call = func() { os.RemoveAll(dir) }

writer := atest.NewFileWriter(dir)
server = NewRemoteServer(writer, newLocalloaderFromStore(), nil, nil, dir)
server = NewRemoteServer(writer, newLocalloaderFromStore(), nil, nil, dir, 1024*1024*4)
return
}

Expand Down

0 comments on commit 42c4eb2

Please sign in to comment.