Skip to content

Commit

Permalink
Options for GRPC message size configurable
Browse files Browse the repository at this point in the history
Signed-off-by: Parameswaran Selvam <parselva@in.ibm.com>
  • Loading branch information
Param-S authored and denyeart committed Aug 23, 2021
1 parent c91b546 commit b076bd7
Show file tree
Hide file tree
Showing 17 changed files with 176 additions and 11 deletions.
10 changes: 10 additions & 0 deletions core/peer/config.go
Expand Up @@ -407,6 +407,16 @@ func GetServerConfig() (comm.ServerConfig, error) {
if viper.IsSet("peer.keepalive.minInterval") {
serverConfig.KaOpts.ServerMinInterval = viper.GetDuration("peer.keepalive.minInterval")
}

serverConfig.MaxRecvMsgSize = comm.DefaultMaxRecvMsgSize
serverConfig.MaxSendMsgSize = comm.DefaultMaxSendMsgSize

if viper.IsSet("peer.maxRecvMsgSize") {
serverConfig.MaxRecvMsgSize = int(viper.GetInt("peer.maxRecvMsgSize"))
}
if viper.IsSet("peer.maxSendMsgSize") {
serverConfig.MaxSendMsgSize = int(viper.GetInt("peer.maxSendMsgSize"))
}
return serverConfig, nil
}

Expand Down
10 changes: 10 additions & 0 deletions core/peer/config_test.go
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/hyperledger/fabric/internal/pkg/comm"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestCacheConfigurationNegative(t *testing.T) {
Expand Down Expand Up @@ -155,6 +156,15 @@ func TestGetServerConfig(t *testing.T) {
assert.Equal(t, true, sc.SecOpts.RequireClientCert, "ServerConfig.SecOpts.RequireClientCert should be true")
assert.Equal(t, 2, len(sc.SecOpts.ClientRootCAs), "ServerConfig.SecOpts.ClientRootCAs should contain 2 entries")

// GRPC max message size options
require.Equal(t, comm.DefaultMaxRecvMsgSize, sc.MaxRecvMsgSize, "ServerConfig.MaxRecvMsgSize should be set to default value %v", comm.DefaultMaxRecvMsgSize)
require.Equal(t, comm.DefaultMaxSendMsgSize, sc.MaxSendMsgSize, "ServerConfig.MaxSendMsgSize should be set to default value %v", comm.DefaultMaxSendMsgSize)
viper.Set("peer.maxRecvMsgSize", "1024")
viper.Set("peer.maxSendMsgSize", "1024")
sc, _ = GetServerConfig()
require.Equal(t, 1024, sc.MaxRecvMsgSize, "ServerConfig.MaxRecvMsgSize should be set to custom value 1024")
require.Equal(t, 1024, sc.MaxSendMsgSize, "ServerConfig.MaxSendMsgSize should be set to custom value 1024")

// bad config with TLS
viper.Set("peer.tls.rootcert.file", "non-existent-file.pem")
_, err = GetServerConfig()
Expand Down
4 changes: 2 additions & 2 deletions core/peer/peer_test.go
Expand Up @@ -67,8 +67,8 @@ func NewTestPeer(t *testing.T) (*Peer, func()) {
defaultDeliverClientDialOpts,
grpc.WithBlock(),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(comm.MaxRecvMsgSize),
grpc.MaxCallSendMsgSize(comm.MaxSendMsgSize),
grpc.MaxCallRecvMsgSize(comm.DefaultMaxRecvMsgSize),
grpc.MaxCallSendMsgSize(comm.DefaultMaxSendMsgSize),
),
)
defaultDeliverClientDialOpts = append(
Expand Down
2 changes: 1 addition & 1 deletion core/scc/cscc/configure_test.go
Expand Up @@ -287,7 +287,7 @@ func TestConfigerInvokeJoinChainCorrectParams(t *testing.T) {
defaultDeliverClientDialOpts = append(
defaultDeliverClientDialOpts,
grpc.WithBlock(),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(comm.MaxRecvMsgSize), grpc.MaxCallSendMsgSize(comm.MaxSendMsgSize)),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(comm.DefaultMaxRecvMsgSize), grpc.MaxCallSendMsgSize(comm.DefaultMaxSendMsgSize)),
)
defaultDeliverClientDialOpts = append(
defaultDeliverClientDialOpts,
Expand Down
2 changes: 1 addition & 1 deletion gossip/gossip/gossip_test.go
Expand Up @@ -670,7 +670,7 @@ func TestNoMessagesSelfLoop(t *testing.T) {
case msg := <-ch:
{
if protoext.IsDataMsg(msg.GetGossipMessage().GossipMessage) {
t.Fatal("Should not receive data message back, got", msg)
t.Errorf("Should not receive data message back, got %s", msg)
}
}
// Waiting for 2 seconds to make sure we won't
Expand Down
8 changes: 8 additions & 0 deletions internal/peer/common/common.go
Expand Up @@ -277,6 +277,14 @@ func configFromEnv(prefix string) (address, override string, clientConfig comm.C
secOpts.Certificate = certPEM
}
clientConfig.SecOpts = secOpts
clientConfig.MaxRecvMsgSize = comm.DefaultMaxRecvMsgSize
if viper.IsSet(prefix + ".maxRecvMsgSize") {
clientConfig.MaxRecvMsgSize = int(viper.GetInt(prefix + ".maxRecvMsgSize"))
}
clientConfig.MaxSendMsgSize = comm.DefaultMaxSendMsgSize
if viper.IsSet(prefix + ".maxSendMsgSize") {
clientConfig.MaxSendMsgSize = int(viper.GetInt(prefix + ".maxSendMsgSize"))
}
return
}

Expand Down
64 changes: 64 additions & 0 deletions internal/peer/common/common_test.go
Expand Up @@ -14,15 +14,19 @@ import (
"path/filepath"
"strings"
"testing"
"time"

"github.com/hyperledger/fabric/common/crypto/tlsgen"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/core/config/configtest"
"github.com/hyperledger/fabric/internal/peer/common"
"github.com/hyperledger/fabric/internal/pkg/comm"
"github.com/hyperledger/fabric/msp"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestInitConfig(t *testing.T) {
Expand Down Expand Up @@ -247,3 +251,63 @@ func TestInitCmdWithoutInitCrypto(t *testing.T) {

common.InitCmd(packageCmd, nil)
}

func TestConfigFromEnv(t *testing.T) {
tempdir, err := ioutil.TempDir("", "peer-clientcert")
require.NoError(t, err)
defer os.RemoveAll(tempdir)

// peer client config
address, _, clientConfig, err := common.ConfigFromEnv("peer")
require.NoError(t, err)
require.Equal(t, "", address, "ClientConfig.address by default not set")
require.Equal(t, common.DefaultConnTimeout, clientConfig.Timeout, "ClientConfig.Timeout should be set to default value of %v", common.DefaultConnTimeout)
require.Equal(t, false, clientConfig.SecOpts.UseTLS, "ClientConfig.SecOpts.UseTLS default value should be false")
require.Equal(t, comm.DefaultMaxRecvMsgSize, clientConfig.MaxRecvMsgSize, "ServerConfig.MaxRecvMsgSize should be set to default value %v", comm.DefaultMaxRecvMsgSize)
require.Equal(t, comm.DefaultMaxSendMsgSize, clientConfig.MaxSendMsgSize, "ServerConfig.MaxSendMsgSize should be set to default value %v", comm.DefaultMaxSendMsgSize)

viper.Set("peer.address", "127.0.0.1")
viper.Set("peer.client.connTimeout", "30s")
viper.Set("peer.maxRecvMsgSize", "1024")
viper.Set("peer.maxSendMsgSize", "2048")
address, _, clientConfig, err = common.ConfigFromEnv("peer")
require.NoError(t, err)
require.Equal(t, "127.0.0.1", address, "ClientConfig.address should be set to 127.0.0.1")
require.Equal(t, 30*time.Second, clientConfig.Timeout, "ClientConfig.Timeout should be set to default value of 30s")
require.Equal(t, 1024, clientConfig.MaxRecvMsgSize, "ClientConfig.MaxRecvMsgSize should be set to 1024")
require.Equal(t, 2048, clientConfig.MaxSendMsgSize, "ClientConfig.maxSendMsgSize should be set to 2048")

viper.Set("peer.tls.enabled", true)
viper.Set("peer.tls.rootcert.file", "./filenotfound.pem")
_, _, _, err = common.ConfigFromEnv("peer")
require.Error(t, err, "ClientConfig should return with bad root cert file path")

viper.Set("peer.tls.enabled", false)
viper.Set("peer.tls.clientAuthRequired", true)
viper.Set("peer.tls.clientKey.file", "./filenotfound.pem")
_, _, clientConfig, err = common.ConfigFromEnv("peer")
require.Equal(t, false, clientConfig.SecOpts.UseTLS, "ClientConfig.SecOpts.UseTLS should be false")
require.Error(t, err, "ClientConfig should return with client key file path")

org1CA, err := tlsgen.NewCA()
require.NoError(t, err)
err = ioutil.WriteFile(filepath.Join(tempdir, "org1-ca-cert.pem"), org1CA.CertBytes(), 0o644)
require.NoError(t, err)
org1ServerKP, err := org1CA.NewServerCertKeyPair("localhost")
require.NoError(t, err)
err = ioutil.WriteFile(filepath.Join(tempdir, "org1-peer1-cert.pem"), org1ServerKP.Cert, 0o644)
require.NoError(t, err)
err = ioutil.WriteFile(filepath.Join(tempdir, "org1-peer1-key.pem"), org1ServerKP.Key, 0o600)
require.NoError(t, err)

viper.Set("peer.tls.enabled", true)
viper.Set("peer.tls.clientAuthRequired", true)
viper.Set("peer.tls.rootcert.file", filepath.Join(tempdir, "org1-ca-cert.pem"))
viper.Set("peer.tls.clientCert.file", filepath.Join(tempdir, "org1-peer1-cert.pem"))
viper.Set("peer.tls.clientKey.file", filepath.Join(tempdir, "org1-peer1-key.pem"))
_, _, clientConfig, err = common.ConfigFromEnv("peer")
require.NoError(t, err)
require.Equal(t, 1, len(clientConfig.SecOpts.ServerRootCAs), "ClientConfig.SecOpts.ServerRootCAs should contain 1 entries")
require.Equal(t, org1ServerKP.Key, clientConfig.SecOpts.Key, "Client.SecOpts.Key should be set to configured key")
require.Equal(t, org1ServerKP.Cert, clientConfig.SecOpts.Certificate, "Client.SecOpts.Certificate shoulbe bet set to configured certificate")
}
10 changes: 10 additions & 0 deletions internal/peer/common/export_test.go
@@ -0,0 +1,10 @@
/*
SPDX-License-Identifier: Apache-2.0
*/

package common

var (
ConfigFromEnv = configFromEnv
DefaultConnTimeout = defaultConnTimeout
)
10 changes: 10 additions & 0 deletions internal/peer/common/peerclient.go
Expand Up @@ -77,6 +77,16 @@ func NewPeerClientForAddress(address, tlsRootCertFile string) (*PeerClient, erro
}
clientConfig.SecOpts.ServerRootCAs = [][]byte{caPEM}
}

clientConfig.MaxRecvMsgSize = comm.DefaultMaxRecvMsgSize
if viper.IsSet("peer.maxRecvMsgSize") {
clientConfig.MaxRecvMsgSize = int(viper.GetInt("peer.maxRecvMsgSize"))
}
clientConfig.MaxSendMsgSize = comm.DefaultMaxSendMsgSize
if viper.IsSet("peer.maxSendMsgSize") {
clientConfig.MaxSendMsgSize = int(viper.GetInt("peer.maxSendMsgSize"))
}

return newPeerClientForClientConfig(address, override, clientConfig)
}

Expand Down
10 changes: 9 additions & 1 deletion internal/peer/node/start.go
Expand Up @@ -1127,9 +1127,17 @@ func secureDialOpts(credSupport *comm.CredentialSupport) func() []grpc.DialOptio
return func() []grpc.DialOption {
var dialOpts []grpc.DialOption
// set max send/recv msg sizes
maxRecvMsgSize := comm.DefaultMaxRecvMsgSize
if viper.IsSet("peer.maxRecvMsgSize") {
maxRecvMsgSize = int(viper.GetInt("peer.maxRecvMsgSize"))
}
maxSendMsgSize := comm.DefaultMaxSendMsgSize
if viper.IsSet("peer.maxSendMsgSize") {
maxSendMsgSize = int(viper.GetInt("peer.maxSendMsgSize"))
}
dialOpts = append(
dialOpts,
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(comm.DefaultMaxRecvMsgSize), grpc.MaxCallSendMsgSize(comm.DefaultMaxSendMsgSize)),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxRecvMsgSize), grpc.MaxCallSendMsgSize(maxSendMsgSize)),
)
// set the keepalive options
kaOpts := comm.DefaultKeepaliveOptions
Expand Down
4 changes: 4 additions & 0 deletions internal/pkg/comm/config.go
Expand Up @@ -68,6 +68,10 @@ type ServerConfig struct {
HealthCheckEnabled bool
// ServerStatsHandler should be set if metrics on connections are to be reported.
ServerStatsHandler *ServerStatsHandler
// Maximum message size the server can receive
MaxRecvMsgSize int
// Maximum message size the server can send
MaxSendMsgSize int
}

// ClientConfig defines the parameters for configuring a GRPCClient instance
Expand Down
13 changes: 11 additions & 2 deletions internal/pkg/comm/server.go
Expand Up @@ -124,9 +124,18 @@ func NewGRPCServerFromListener(listener net.Listener, serverConfig ServerConfig)
return nil, errors.New("serverConfig.SecOpts must contain both Key and Certificate when UseTLS is true")
}
}

// set max send and recv msg sizes
serverOpts = append(serverOpts, grpc.MaxSendMsgSize(DefaultMaxSendMsgSize))
serverOpts = append(serverOpts, grpc.MaxRecvMsgSize(DefaultMaxRecvMsgSize))
maxSendMsgSize := DefaultMaxSendMsgSize
if serverConfig.MaxSendMsgSize != 0 {
maxSendMsgSize = serverConfig.MaxSendMsgSize
}
maxRecvMsgSize := DefaultMaxRecvMsgSize
if serverConfig.MaxRecvMsgSize != 0 {
maxRecvMsgSize = serverConfig.MaxRecvMsgSize
}
serverOpts = append(serverOpts, grpc.MaxSendMsgSize(maxSendMsgSize))
serverOpts = append(serverOpts, grpc.MaxRecvMsgSize(maxRecvMsgSize))
// set the keepalive options
serverOpts = append(serverOpts, ServerKeepaliveOptions(serverConfig.KaOpts)...)
// set connection timeout
Expand Down
11 changes: 11 additions & 0 deletions orderer/common/localconfig/config.go
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/viperutil"
coreconfig "github.com/hyperledger/fabric/core/config"
"github.com/hyperledger/fabric/internal/pkg/comm"
"github.com/spf13/viper"
)

Expand Down Expand Up @@ -57,6 +58,8 @@ type General struct {
LocalMSPID string
BCCSP *bccsp.FactoryOpts
Authentication Authentication
MaxRecvMsgSize int32
MaxSendMsgSize int32
}

type Cluster struct {
Expand Down Expand Up @@ -242,6 +245,8 @@ var Defaults = TopLevel{
Authentication: Authentication{
TimeWindow: time.Duration(15 * time.Minute),
},
MaxRecvMsgSize: comm.DefaultMaxRecvMsgSize,
MaxSendMsgSize: comm.DefaultMaxSendMsgSize,
},
FileLedger: FileLedger{
Location: "/var/hyperledger/production/orderer",
Expand Down Expand Up @@ -494,6 +499,12 @@ func (c *TopLevel) completeInitialization(configDir string) {
logger.Infof("Kafka.Version unset, setting to %v", Defaults.Kafka.Version)
c.Kafka.Version = Defaults.Kafka.Version

case c.General.MaxRecvMsgSize == 0:
logger.Infof("General.MaxRecvMsgSize is unset, setting to %v", Defaults.General.MaxRecvMsgSize)
c.General.MaxRecvMsgSize = Defaults.General.MaxRecvMsgSize
case c.General.MaxSendMsgSize == 0:
logger.Infof("General.MaxSendMsgSize is unset, setting to %v", Defaults.General.MaxSendMsgSize)
c.General.MaxSendMsgSize = Defaults.General.MaxSendMsgSize
default:
return
}
Expand Down
12 changes: 8 additions & 4 deletions orderer/common/server/main.go
Expand Up @@ -436,10 +436,12 @@ func configureClusterListener(conf *localconfig.TopLevel, generalConf comm.Serve

func initializeClusterClientConfig(conf *localconfig.TopLevel) comm.ClientConfig {
cc := comm.ClientConfig{
AsyncConnect: true,
KaOpts: comm.DefaultKeepaliveOptions,
Timeout: conf.General.Cluster.DialTimeout,
SecOpts: comm.SecureOptions{},
AsyncConnect: true,
KaOpts: comm.DefaultKeepaliveOptions,
Timeout: conf.General.Cluster.DialTimeout,
SecOpts: comm.SecureOptions{},
MaxRecvMsgSize: int(conf.General.MaxRecvMsgSize),
MaxSendMsgSize: int(conf.General.MaxSendMsgSize),
}

reuseGrpcListener := reuseListener(conf)
Expand Down Expand Up @@ -570,6 +572,8 @@ func initializeServerConfig(conf *localconfig.TopLevel, metricsProvider metrics.
grpclogging.WithLeveler(grpclogging.LevelerFunc(grpcLeveler)),
),
},
MaxRecvMsgSize: int(conf.General.MaxRecvMsgSize),
MaxSendMsgSize: int(conf.General.MaxSendMsgSize),
}
}

Expand Down
2 changes: 2 additions & 0 deletions sampleconfig/configtx.yaml
Expand Up @@ -290,6 +290,8 @@ Orderer: &OrdererDefaults
# this value will be rejected by ordering. If the "kafka" OrdererType is
# selected, set 'message.max.bytes' and 'replica.fetch.max.bytes' on
# the Kafka brokers to a value that is larger than this one.
# Based on networking configuration the value needs to be tuned. With
# default 100 MB node grpc msg configuration, 49 MB is the max can be set.
AbsoluteMaxBytes: 10 MB

# Preferred Max Bytes: The preferred maximum number of bytes allowed
Expand Down
7 changes: 7 additions & 0 deletions sampleconfig/core.yaml
Expand Up @@ -463,6 +463,13 @@ peer:
# deliverService limits concurrent event listeners registered to deliver service for blocks and transaction events.
deliverService: 2500

# Since all nodes should be consistent it is recommended to keep
# the default value of 100MB for MaxRecvMsgSize & MaxSendMsgSize
# Max message size in bytes GRPC server and client can receive
maxRecvMsgSize: 104857600
# Max message size in bytes GRPC server and client can send
maxSendMsgSize: 104857600

###############################################################################
#
# VM section
Expand Down
8 changes: 8 additions & 0 deletions sampleconfig/orderer.yaml
Expand Up @@ -40,6 +40,14 @@ General:
# ServerTimeout is the duration the server waits for a response from
# a client before closing the connection.
ServerTimeout: 20s

# Since all nodes should be consistent it is recommended to keep
# the default value of 100MB for MaxRecvMsgSize & MaxSendMsgSize
# Max message size in bytes the GRPC server and client can receive
MaxRecvMsgSize: 104857600
# Max message size in bytes the GRPC server and client can send
MaxSendMsgSize: 104857600

# Cluster settings for ordering service nodes that communicate with other ordering service nodes
# such as Raft based ordering service.
Cluster:
Expand Down

0 comments on commit b076bd7

Please sign in to comment.