Skip to content

Commit

Permalink
Merge "[FAB-7043] Refactor gRPC keepalive code"
Browse files Browse the repository at this point in the history
  • Loading branch information
C0rWin authored and Gerrit Code Review committed Nov 20, 2017
2 parents e9d532c + d59acdb commit a2ebd1b
Show file tree
Hide file tree
Showing 11 changed files with 136 additions and 172 deletions.
86 changes: 38 additions & 48 deletions core/comm/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,12 @@ var (
maxRecvMsgSize = 100 * 1024 * 1024
maxSendMsgSize = 100 * 1024 * 1024
// Default peer keepalive options
keepaliveOptions = KeepaliveOptions{
ClientKeepaliveTime: 60, // 1 min
ClientKeepaliveTimeout: 20, // 20 sec - gRPC default
ServerKeepaliveTime: 7200, // 2 hours - gRPC default
ServerKeepaliveTimeout: 20, // 20 sec - gRPC default
}
// chaincode keepalive options separate from peer keepalive
// options above (for flexibility)
chaincodeKeepaliveOptions = KeepaliveOptions{
ClientKeepaliveTime: 60, // 1 min
ClientKeepaliveTimeout: 20, // 20 sec - gRPC default
ServerKeepaliveTime: 60, // 1 min
ServerKeepaliveTimeout: 20, // 20 sec - gRPC default
keepaliveOptions = &KeepaliveOptions{
ClientInterval: time.Duration(1) * time.Minute, // 1 min
ClientTimeout: time.Duration(20) * time.Second, // 20 sec - gRPC default
ServerInterval: time.Duration(2) * time.Hour, // 2 hours - gRPC default
ServerTimeout: time.Duration(20) * time.Second, // 20 sec - gRPC default
ServerMinInterval: time.Duration(1) * time.Minute, // match ClientInterval
}
)

Expand Down Expand Up @@ -69,20 +62,21 @@ type SecureOptions struct {
// KeepAliveOptions is used to set the gRPC keepalive settings for both
// clients and servers
type KeepaliveOptions struct {
// ClientKeepaliveTime is the duration in seconds after which if the client
// does not see any activity from the server it pings the server to see
// if it is alive
ClientKeepaliveTime int
// ClientKeepaliveTimeout is the duration the client waits for a response
// ClientInterval is the duration after which if the client does not see
// any activity from the server it pings the server to see if it is alive
ClientInterval time.Duration
// ClientTimeout is the duration the client waits for a response
// from the server after sending a ping before closing the connection
ClientKeepaliveTimeout int
// ServerKeepaliveTime is the duration in seconds after which if the server
// does not see any activity from the client it pings the client to see
// if it is alive
ServerKeepaliveTime int
// ServerKeepaliveTimeout is the duration the server waits for a response
ClientTimeout time.Duration
// ServerInterval is the duration after which if the server does not see
// any activity from the client it pings the client to see if it is alive
ServerInterval time.Duration
// ServerTimeout is the duration the server waits for a response
// from the client after sending a ping before closing the connection
ServerKeepaliveTimeout int
ServerTimeout time.Duration
// ServerMinInterval is the minimum permitted time between client pings.
// If clients send pings more frequently, the server will disconnect them
ServerMinInterval time.Duration
}

// cacheConfiguration caches common package scoped variables
Expand Down Expand Up @@ -125,44 +119,40 @@ func SetMaxSendMsgSize(size int) {
maxSendMsgSize = size
}

// SetKeepaliveOptions sets the gRPC keepalive options for both clients and
// servers
func SetKeepaliveOptions(ka KeepaliveOptions) {
keepaliveOptions = ka
}

// ServerKeepaliveOptions returns the gRPC keepalive options for servers
func ServerKeepaliveOptions() []grpc.ServerOption {
return serverKeepaliveOptionsWithKa(&keepaliveOptions)
}

func serverKeepaliveOptionsWithKa(ka *KeepaliveOptions) []grpc.ServerOption {
// ServerKeepaliveOptions returns gRPC keepalive options for server. If
// opts is nil, the default keepalive options are returned
func ServerKeepaliveOptions(ka *KeepaliveOptions) []grpc.ServerOption {
// use default keepalive options if nil
if ka == nil {
ka = keepaliveOptions
}
var serverOpts []grpc.ServerOption
kap := keepalive.ServerParameters{
Time: time.Duration(ka.ServerKeepaliveTime) * time.Second,
Timeout: time.Duration(ka.ServerKeepaliveTimeout) * time.Second,
Time: ka.ServerInterval,
Timeout: ka.ServerTimeout,
}
serverOpts = append(serverOpts, grpc.KeepaliveParams(kap))
kep := keepalive.EnforcementPolicy{
// needs to match clientKeepalive
MinTime: time.Duration(ka.ClientKeepaliveTime) * time.Second,
MinTime: ka.ServerMinInterval,
// allow keepalive w/o rpc
PermitWithoutStream: true,
}
serverOpts = append(serverOpts, grpc.KeepaliveEnforcementPolicy(kep))
return serverOpts
}

// ClientKeepaliveOptions returns the gRPC keepalive options for clients
func ClientKeepaliveOptions() []grpc.DialOption {
return clientKeepaliveOptionsWithKa(&keepaliveOptions)
}
// ClientKeepaliveOptions returns gRPC keepalive options for clients. If
// opts is nil, the default keepalive options are returned
func ClientKeepaliveOptions(ka *KeepaliveOptions) []grpc.DialOption {
// use default keepalive options if nil
if ka == nil {
ka = keepaliveOptions
}

func clientKeepaliveOptionsWithKa(ka *KeepaliveOptions) []grpc.DialOption {
var dialOpts []grpc.DialOption
kap := keepalive.ClientParameters{
Time: time.Duration(ka.ClientKeepaliveTime) * time.Second,
Timeout: time.Duration(ka.ClientKeepaliveTimeout) * time.Second,
Time: ka.ClientInterval,
Timeout: ka.ClientTimeout,
PermitWithoutStream: true,
}
dialOpts = append(dialOpts, grpc.WithKeepaliveParams(kap))
Expand Down
16 changes: 0 additions & 16 deletions core/comm/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,6 @@ func TestConfig(t *testing.T) {
assert.EqualValues(t, size, MaxRecvMsgSize())
assert.EqualValues(t, size, MaxSendMsgSize())

// set keepalive options
timeout := 1000
ka := KeepaliveOptions{
ClientKeepaliveTime: timeout,
ClientKeepaliveTimeout: timeout + 1,
ServerKeepaliveTime: timeout + 2,
ServerKeepaliveTimeout: timeout + 3,
}
SetKeepaliveOptions(ka)
assert.EqualValues(t, timeout, keepaliveOptions.ClientKeepaliveTime)
assert.EqualValues(t, timeout+1, keepaliveOptions.ClientKeepaliveTimeout)
assert.EqualValues(t, timeout+2, keepaliveOptions.ServerKeepaliveTime)
assert.EqualValues(t, timeout+3, keepaliveOptions.ServerKeepaliveTimeout)
assert.EqualValues(t, 2, len(ServerKeepaliveOptions()))
assert.Equal(t, 1, len(ClientKeepaliveOptions()))

// reset cache
configurationCached = false
viper.Set("peer.tls.enabled", true)
Expand Down
15 changes: 6 additions & 9 deletions core/comm/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,18 +181,15 @@ func GetPeerTestingAddress(port string) string {
}

// NewClientConnectionWithAddress Returns a new grpc.ClientConn to the given address
func NewClientConnectionWithAddress(peerAddress string, block bool, tslEnabled bool, creds credentials.TransportCredentials) (*grpc.ClientConn, error) {
return newClientConnectionWithAddressWithKa(peerAddress, block, tslEnabled, creds, nil)
func NewClientConnectionWithAddress(peerAddress string, block bool, tslEnabled bool,
creds credentials.TransportCredentials, ka *KeepaliveOptions) (*grpc.ClientConn, error) {
return newClientConnectionWithAddressWithKa(peerAddress, block, tslEnabled, creds, ka)
}

// NewChaincodeClientConnectionWithAddress Returns a new chaincode type grpc.ClientConn to the given address
func NewChaincodeClientConnectionWithAddress(peerAddress string, block bool, tslEnabled bool, creds credentials.TransportCredentials) (*grpc.ClientConn, error) {
ka := chaincodeKeepaliveOptions
//client side's keepalive parameter better be greater than EnforcementPolicies MinTime
//to prevent server killing the connection due to timing issues. Just increase by a min
ka.ClientKeepaliveTime += 60

return newClientConnectionWithAddressWithKa(peerAddress, block, tslEnabled, creds, &ka)
ka := &KeepaliveOptions{}
return newClientConnectionWithAddressWithKa(peerAddress, block, tslEnabled, creds, ka)
}

// newClientConnectionWithAddressWithKa Returns a new grpc.ClientConn to the given address using specied keepalive options
Expand All @@ -203,7 +200,7 @@ func newClientConnectionWithAddressWithKa(peerAddress string, block bool, tslEna
//want to change this in future to have peer client
//send keepalives too
if ka != nil {
opts = clientKeepaliveOptionsWithKa(ka)
opts = ClientKeepaliveOptions(ka)
}

if tslEnabled {
Expand Down
12 changes: 8 additions & 4 deletions core/comm/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,11 @@ func TestConnection_Correct(t *testing.T) {
var tmpConn *grpc.ClientConn
var err error
if TLSEnabled() {
tmpConn, err = NewClientConnectionWithAddress(peerAddress, true, true, InitTLSForPeer())
tmpConn, err = NewClientConnectionWithAddress(peerAddress, true, true,
InitTLSForPeer(), nil)
}
tmpConn, err = NewClientConnectionWithAddress(peerAddress, true, false, nil)
tmpConn, err = NewClientConnectionWithAddress(peerAddress, true, false,
nil, nil)
if err != nil {
t.Fatalf("error connection to server at host:port = %s\n", peerAddress)
}
Expand Down Expand Up @@ -88,9 +90,11 @@ func TestConnection_WrongAddress(t *testing.T) {
var tmpConn *grpc.ClientConn
var err error
if TLSEnabled() {
tmpConn, err = NewClientConnectionWithAddress(peerAddress, true, true, InitTLSForPeer())
tmpConn, err = NewClientConnectionWithAddress(peerAddress, true, true,
InitTLSForPeer(), nil)
}
tmpConn, err = NewClientConnectionWithAddress(peerAddress, true, false, nil)
tmpConn, err = NewClientConnectionWithAddress(peerAddress, true, false,
nil, nil)
if err == nil {
fmt.Printf("error connection to server - at host:port = %s\n", peerAddress)
t.Error("error connection to server - connection should fail")
Expand Down
Loading

0 comments on commit a2ebd1b

Please sign in to comment.