forked from hyperledger/fabric
/
config.go
334 lines (304 loc) · 10.9 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package comm
import (
"context"
"crypto/tls"
"crypto/x509"
"time"
"github.com/ZihuaZhang/fabric/common/flogging"
"github.com/ZihuaZhang/fabric/common/metrics"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
)
// Configuration defaults
// Max send and receive bytes for grpc clients and servers
const (
DefaultMaxRecvMsgSize = 100 * 1024 * 1024
DefaultMaxSendMsgSize = 100 * 1024 * 1024
)
var (
// Default peer keepalive options
DefaultKeepaliveOptions = KeepaliveOptions{
ClientInterval: time.Duration(1) * time.Minute, // 1 min
ClientTimeout: time.Duration(20) * time.Second, // 20 sec - gRPC default
ServerInterval: time.Duration(2) * time.Hour, // 2 hours - gRPC default
ServerTimeout: time.Duration(20) * time.Second, // 20 sec - gRPC default
ServerMinInterval: time.Duration(1) * time.Minute, // match ClientInterval
}
DefaultBackoffOptions = BackoffOptions{
BaseDelay: time.Second,
Multiplier: 1.6,
MaxDelay: time.Minute * 2,
}
// strong TLS cipher suites
DefaultTLSCipherSuites = []uint16{
tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
tls.TLS_RSA_WITH_AES_128_GCM_SHA256,
tls.TLS_RSA_WITH_AES_256_GCM_SHA384,
}
// default connection timeout
DefaultConnectionTimeout = 5 * time.Second
)
// ServerConfig defines the parameters for configuring a GRPCServer instance
type ServerConfig struct {
// ConnectionTimeout specifies the timeout for connection establishment
// for all new connections
ConnectionTimeout time.Duration
// SecOpts defines the security parameters
SecOpts SecureOptions
// KaOpts defines the keepalive parameters
KaOpts KeepaliveOptions
// StreamInterceptors specifies a list of interceptors to apply to
// streaming RPCs. They are executed in order.
StreamInterceptors []grpc.StreamServerInterceptor
// UnaryInterceptors specifies a list of interceptors to apply to unary
// RPCs. They are executed in order.
UnaryInterceptors []grpc.UnaryServerInterceptor
// Logger specifies the logger the server will use
Logger *flogging.FabricLogger
// HealthCheckEnabled enables the gRPC Health Checking Protocol for the server
HealthCheckEnabled bool
// ServerStatsHandler should be set if metrics on connections are to be reported.
ServerStatsHandler *ServerStatsHandler
// Maximum message size the server can receive
MaxRecvMsgSize int
// Maximum message size the server can send
MaxSendMsgSize int
}
// ClientConfig defines the parameters for configuring a GRPCClient instance
type ClientConfig struct {
// SecOpts defines the security parameters
SecOpts SecureOptions
// KaOpts defines the keepalive parameters
KaOpts KeepaliveOptions
// BackoffOpts defines the backoff parameters
BaOpts BackoffOptions
// DialTimeout controls how long the client can block when attempting to
// establish a connection to a server
DialTimeout time.Duration
// AsyncConnect makes connection creation non blocking
AsyncConnect bool
// Maximum message size the client can receive
MaxRecvMsgSize int
// Maximum message size the client can send
MaxSendMsgSize int
}
// Convert the ClientConfig to the approriate set of grpc.DialOptions.
func (cc ClientConfig) DialOptions() ([]grpc.DialOption, error) {
var dialOpts []grpc.DialOption
dialOpts = append(dialOpts, grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: cc.KaOpts.ClientInterval,
Timeout: cc.KaOpts.ClientTimeout,
PermitWithoutStream: true,
}))
if cc.BaOpts.BaseDelay != 0 &&
cc.BaOpts.MaxDelay != 0 &&
cc.BaOpts.Multiplier != 0 {
// backoff options
cp := grpc.ConnectParams{
Backoff: backoff.Config{
BaseDelay: cc.BaOpts.BaseDelay,
Multiplier: cc.BaOpts.Multiplier,
Jitter: 0.2,
MaxDelay: cc.BaOpts.MaxDelay,
},
MinConnectTimeout: 20 * time.Second,
}
// set backoff
dialOpts = append(dialOpts, grpc.WithConnectParams(cp))
}
// Unless asynchronous connect is set, make connection establishment blocking.
if !cc.AsyncConnect {
dialOpts = append(dialOpts,
grpc.WithBlock(),
grpc.FailOnNonTempDialError(true),
)
}
// set send/recv message size to package defaults
maxRecvMsgSize := DefaultMaxRecvMsgSize
if cc.MaxRecvMsgSize != 0 {
maxRecvMsgSize = cc.MaxRecvMsgSize
}
maxSendMsgSize := DefaultMaxSendMsgSize
if cc.MaxSendMsgSize != 0 {
maxSendMsgSize = cc.MaxSendMsgSize
}
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(maxRecvMsgSize),
grpc.MaxCallSendMsgSize(maxSendMsgSize),
))
tlsConfig, err := cc.SecOpts.TLSConfig()
if err != nil {
return nil, err
}
if tlsConfig != nil {
transportCreds := &DynamicClientCredentials{TLSConfig: tlsConfig}
dialOpts = append(dialOpts, grpc.WithTransportCredentials(transportCreds))
} else {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
return dialOpts, nil
}
func (cc ClientConfig) Dial(address string) (*grpc.ClientConn, error) {
dialOpts, err := cc.DialOptions()
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), cc.DialTimeout)
defer cancel()
conn, err := grpc.DialContext(ctx, address, dialOpts...)
if err != nil {
return nil, errors.Wrap(err, "failed to create new connection")
}
return conn, nil
}
// Clone clones this ClientConfig
func (cc ClientConfig) Clone() ClientConfig {
shallowClone := cc
return shallowClone
}
// SecureOptions defines the TLS security parameters for a GRPCServer or
// GRPCClient instance.
type SecureOptions struct {
// VerifyCertificate, if not nil, is called after normal
// certificate verification by either a TLS client or server.
// If it returns a non-nil error, the handshake is aborted and that error results.
VerifyCertificate func(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error
// PEM-encoded X509 public key to be used for TLS communication
Certificate []byte
// PEM-encoded private key to be used for TLS communication
Key []byte
// Set of PEM-encoded X509 certificate authorities used by clients to
// verify server certificates
ServerRootCAs [][]byte
// Set of PEM-encoded X509 certificate authorities used by servers to
// verify client certificates
ClientRootCAs [][]byte
// Whether or not to use TLS for communication
UseTLS bool
// Whether or not TLS client must present certificates for authentication
RequireClientCert bool
// CipherSuites is a list of supported cipher suites for TLS
CipherSuites []uint16
// TimeShift makes TLS handshakes time sampling shift to the past by a given duration
TimeShift time.Duration
// ServerNameOverride is used to verify the hostname on the returned certificates. It
// is also included in the client's handshake to support virtual hosting
// unless it is an IP address.
ServerNameOverride string
}
func (so SecureOptions) TLSConfig() (*tls.Config, error) {
// if TLS is not enabled, return
if !so.UseTLS {
return nil, nil
}
tlsConfig := &tls.Config{
MinVersion: tls.VersionTLS12,
ServerName: so.ServerNameOverride,
VerifyPeerCertificate: so.VerifyCertificate,
}
if len(so.ServerRootCAs) > 0 {
tlsConfig.RootCAs = x509.NewCertPool()
for _, certBytes := range so.ServerRootCAs {
if !tlsConfig.RootCAs.AppendCertsFromPEM(certBytes) {
return nil, errors.New("error adding root certificate")
}
}
}
if so.RequireClientCert {
cert, err := so.ClientCertificate()
if err != nil {
return nil, errors.WithMessage(err, "failed to load client certificate")
}
tlsConfig.Certificates = append(tlsConfig.Certificates, cert)
}
if so.TimeShift > 0 {
tlsConfig.Time = func() time.Time {
return time.Now().Add((-1) * so.TimeShift)
}
}
return tlsConfig, nil
}
// ClientCertificate returns the client certificate that will be used
// for mutual TLS.
func (so SecureOptions) ClientCertificate() (tls.Certificate, error) {
if so.Key == nil || so.Certificate == nil {
return tls.Certificate{}, errors.New("both Key and Certificate are required when using mutual TLS")
}
cert, err := tls.X509KeyPair(so.Certificate, so.Key)
if err != nil {
return tls.Certificate{}, errors.WithMessage(err, "failed to create key pair")
}
return cert, nil
}
// KeepaliveOptions is used to set the gRPC keepalive settings for both
// clients and servers
type KeepaliveOptions struct {
// ClientInterval is the duration after which if the client does not see
// any activity from the server it pings the server to see if it is alive
ClientInterval time.Duration
// ClientTimeout is the duration the client waits for a response
// from the server after sending a ping before closing the connection
ClientTimeout time.Duration
// ServerInterval is the duration after which if the server does not see
// any activity from the client it pings the client to see if it is alive
ServerInterval time.Duration
// ServerTimeout is the duration the server waits for a response
// from the client after sending a ping before closing the connection
ServerTimeout time.Duration
// ServerMinInterval is the minimum permitted time between client pings.
// If clients send pings more frequently, the server will disconnect them
ServerMinInterval time.Duration
}
// BackoffOptions defines the configuration options for GRPC client.
type BackoffOptions struct {
// BaseDelay is the amount of time to backoff after the first failure.
BaseDelay time.Duration
// Multiplier is the factor with which to multiply backoffs after a
// failed retry. Should ideally be greater than 1.
Multiplier float64
// MaxDelay is the upper bound of backoff delay.
MaxDelay time.Duration
}
// ServerKeepaliveOptions returns gRPC keepalive options for a server.
func (ka KeepaliveOptions) ServerKeepaliveOptions() []grpc.ServerOption {
var serverOpts []grpc.ServerOption
kap := keepalive.ServerParameters{
Time: ka.ServerInterval,
Timeout: ka.ServerTimeout,
}
serverOpts = append(serverOpts, grpc.KeepaliveParams(kap))
kep := keepalive.EnforcementPolicy{
MinTime: ka.ServerMinInterval,
// allow keepalive w/o rpc
PermitWithoutStream: true,
}
serverOpts = append(serverOpts, grpc.KeepaliveEnforcementPolicy(kep))
return serverOpts
}
// ClientKeepaliveOptions returns gRPC keepalive dial options for clients.
func (ka KeepaliveOptions) ClientKeepaliveOptions() []grpc.DialOption {
var dialOpts []grpc.DialOption
kap := keepalive.ClientParameters{
Time: ka.ClientInterval,
Timeout: ka.ClientTimeout,
PermitWithoutStream: true,
}
dialOpts = append(dialOpts, grpc.WithKeepaliveParams(kap))
return dialOpts
}
type Metrics struct {
// OpenConnCounter keeps track of number of open connections
OpenConnCounter metrics.Counter
// ClosedConnCounter keeps track of number connections closed
ClosedConnCounter metrics.Counter
}