Skip to content

Commit

Permalink
fix the testMaxMsgSizeServerAPI failure
Browse files Browse the repository at this point in the history
  • Loading branch information
lyuxuan committed Apr 6, 2017
1 parent f1bb70f commit 6f8b553
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 129 deletions.
14 changes: 4 additions & 10 deletions call.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,16 +150,6 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
return invoke(ctx, method, args, reply, cc, opts...)
}

const defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
const defaultClientMaxSendMessageSize = 1024 * 1024 * 4

func min(a, b int) int {
if a < b {
return a
}
return b
}

func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (e error) {
c := defaultCallInfo
maxReceiveMessageSize := defaultClientMaxReceiveMessageSize
Expand All @@ -179,12 +169,16 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
maxSendMessageSize = min(*mc.MaxReqSize, cc.dopts.maxSendMessageSize)
} else if mc.MaxReqSize != nil {
maxSendMessageSize = *mc.MaxReqSize
} else if mc.MaxReqSize == nil && cc.dopts.maxSendMessageSize >= 0 {
maxSendMessageSize = cc.dopts.maxSendMessageSize
}

if mc.MaxRespSize != nil && cc.dopts.maxReceiveMessageSize >= 0 {
maxReceiveMessageSize = min(*mc.MaxRespSize, cc.dopts.maxReceiveMessageSize)
} else if mc.MaxRespSize != nil {
maxReceiveMessageSize = *mc.MaxRespSize
} else if mc.MaxRespSize == nil && cc.dopts.maxReceiveMessageSize >= 0 {
maxReceiveMessageSize = cc.dopts.maxReceiveMessageSize
}
} else {
if cc.dopts.maxSendMessageSize >= 0 {
Expand Down
14 changes: 8 additions & 6 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ package grpc
import (
"errors"
"fmt"
"math"
"net"
"strings"
"sync"
Expand Down Expand Up @@ -103,16 +102,19 @@ type dialOptions struct {
maxSendMessageSize int
}

const defaultClientMaxMsgSize = math.MaxInt32
const (
defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
defaultClientMaxSendMessageSize = 1024 * 1024 * 4
defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
defaultServerMaxSendMessageSize = 1024 * 1024 * 4
)

// DialOption configures how we set up the connection.
type DialOption func(*dialOptions)

// WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive. This function is for backward API compatibility. It has essentially the same functionality as WithMaxReceiveMessageSize.
// WithMaxMsgSize Deprecated: use WithMaxReceiveMessageSize instead.
func WithMaxMsgSize(s int) DialOption {
return func(o *dialOptions) {
o.maxReceiveMessageSize = s
}
return WithMaxReceiveMessageSize(s)
}

// WithMaxReceiveMessageSize returns a DialOption which sets the maximum message size the client can receive. Negative input is invalid and has the same effect as not setting the field.
Expand Down
10 changes: 10 additions & 0 deletions rpc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,9 @@ type ServiceConfig struct {
// via grpc.WithBalancer will override this.
LB Balancer
// Methods contains a map for the methods in this service.
// If there is an exact match for a method (i.e. /service/method) in the map, use the corresponding MethodConfig.
// If there's no exact match, look for the default config for all methods under the service (/service/) and use the corresponding MethodConfig.
// Otherwise, the method has no MethodConfig to use.
Methods map[string]MethodConfig
}

Expand All @@ -498,3 +501,10 @@ const SupportPackageIsVersion4 = true

// Version is the current grpc version.
const Version = "1.3.0-dev"

func min(a, b int) int {
if a < b {
return a
}
return b
}
21 changes: 11 additions & 10 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,6 @@ type options struct {
keepalivePolicy keepalive.EnforcementPolicy
}

const defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4 // Use 4MB as the default receive message size limit.
const defaultServerMaxSendMessageSize = 1024 * 1024 * 4 // Use 4MB as the default send message size limit.

// A ServerOption sets options.
type ServerOption func(*options)

Expand Down Expand Up @@ -166,12 +163,9 @@ func RPCDecompressor(dc Decompressor) ServerOption {
}
}

// MaxMsgSize returns a ServerOption to set the max message size in bytes for inbound mesages.
// If this is not set, gRPC uses the default 4MB. This function is for backward compatability. It has essentially the same functionality as MaxReceiveMessageSize.
// MaxMsgSize Deprecated: use MaxReceiveMessageSize instead.
func MaxMsgSize(m int) ServerOption {
return func(o *options) {
o.maxReceiveMessageSize = m
}
return MaxReceiveMessageSize(m)
}

// MaxReceiveMessageSize returns a ServerOption to set the max message size in bytes for inbound mesages.
Expand Down Expand Up @@ -650,7 +644,7 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str
grpclog.Fatalf("grpc: Server failed to encode response %v", err)
}
if len(p) > s.opts.maxSendMessageSize {
return Errorf(codes.InvalidArgument, "Sent message larger than max (%d vs. %d)", len(p), s.opts.maxSendMessageSize)
return status.Errorf(codes.InvalidArgument, "Sent message larger than max (%d vs. %d)", len(p), s.opts.maxSendMessageSize)
}
err = t.Write(stream, p, opts)
if err == nil && outPayload != nil {
Expand Down Expand Up @@ -755,7 +749,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if len(req) > s.opts.maxReceiveMessageSize {
// TODO: Revisit the error code. Currently keep it consistent with
// java implementation.
return status.Errorf(codes.InvalidArgument, "grpc: server received a message of %d bytes exceeding %d limit", len(req), s.opts.maxReceiveMessageSize)
return status.Errorf(codes.InvalidArgument, "Received message larger than max (%d vs. %d)", len(req), s.opts.maxReceiveMessageSize)
}
if err := s.opts.codec.Unmarshal(req, v); err != nil {
return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
Expand Down Expand Up @@ -798,6 +792,13 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if err := s.sendResponse(t, stream, reply, s.opts.cp, opts); err != nil {
// TODO: Translate error into a status.Status error if necessary?
// TODO: Write status when appropriate.
switch e := err.(type) {
case status.Status:
if se := t.WriteStatus(stream, e); e != nil {
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", se)
}
default:
}
return err
}
if trInfo != nil {
Expand Down
6 changes: 4 additions & 2 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
c := defaultCallInfo
maxReceiveMessageSize := defaultClientMaxReceiveMessageSize
maxSendMessageSize := defaultClientMaxSendMessageSize

if mc, ok := cc.GetMethodConfig(method); ok {
if mc.WaitForReady != nil {
c.failFast = !*mc.WaitForReady
Expand All @@ -131,12 +130,16 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
maxSendMessageSize = min(*mc.MaxReqSize, cc.dopts.maxSendMessageSize)
} else if mc.MaxReqSize != nil {
maxSendMessageSize = *mc.MaxReqSize
} else if mc.MaxReqSize == nil && cc.dopts.maxSendMessageSize >= 0 {
maxSendMessageSize = cc.dopts.maxSendMessageSize
}

if mc.MaxRespSize != nil && cc.dopts.maxReceiveMessageSize >= 0 {
maxReceiveMessageSize = min(*mc.MaxRespSize, cc.dopts.maxReceiveMessageSize)
} else if mc.MaxRespSize != nil {
maxReceiveMessageSize = *mc.MaxRespSize
} else if mc.MaxRespSize == nil && cc.dopts.maxReceiveMessageSize >= 0 {
maxReceiveMessageSize = cc.dopts.maxReceiveMessageSize
}
} else {
if cc.dopts.maxSendMessageSize >= 0 {
Expand All @@ -146,7 +149,6 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
maxReceiveMessageSize = cc.dopts.maxReceiveMessageSize
}
}

for _, o := range opts {
if err := o.before(&c); err != nil {
return nil, toRPCErr(err)
Expand Down

0 comments on commit 6f8b553

Please sign in to comment.