forked from hyperledger/fabric
-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.go
213 lines (184 loc) · 6.08 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package comm
import (
"context"
"crypto/tls"
"crypto/x509"
"time"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)
type GRPCClient struct {
// TLS configuration used by the grpc.ClientConn
tlsConfig *tls.Config
// Options for setting up new connections
dialOpts []grpc.DialOption
// Duration for which to block while established a new connection
timeout time.Duration
// Maximum message size the client can receive
maxRecvMsgSize int
// Maximum message size the client can send
maxSendMsgSize int
}
// NewGRPCClient creates a new implementation of GRPCClient given an address
// and client configuration
func NewGRPCClient(config ClientConfig) (*GRPCClient, error) {
client := &GRPCClient{}
// parse secure options
err := client.parseSecureOptions(config.SecOpts)
if err != nil {
return client, err
}
// keepalive options
kap := keepalive.ClientParameters{
Time: config.KaOpts.ClientInterval,
Timeout: config.KaOpts.ClientTimeout,
PermitWithoutStream: true,
}
// set keepalive
client.dialOpts = append(client.dialOpts, grpc.WithKeepaliveParams(kap))
// Unless asynchronous connect is set, make connection establishment blocking.
if !config.AsyncConnect {
client.dialOpts = append(client.dialOpts, grpc.WithBlock())
client.dialOpts = append(client.dialOpts, grpc.FailOnNonTempDialError(true))
}
client.timeout = config.Timeout
// set send/recv message size to package defaults
client.maxRecvMsgSize = MaxRecvMsgSize
client.maxSendMsgSize = MaxSendMsgSize
return client, nil
}
func (client *GRPCClient) parseSecureOptions(opts SecureOptions) error {
// if TLS is not enabled, return
if !opts.UseTLS {
return nil
}
client.tlsConfig = &tls.Config{
VerifyPeerCertificate: opts.VerifyCertificate,
MinVersion: tls.VersionTLS12} // TLS 1.2 only
if len(opts.ServerRootCAs) > 0 {
client.tlsConfig.RootCAs = x509.NewCertPool()
for _, certBytes := range opts.ServerRootCAs {
err := AddPemToCertPool(certBytes, client.tlsConfig.RootCAs)
if err != nil {
commLogger.Debugf("error adding root certificate: %v", err)
return errors.WithMessage(err,
"error adding root certificate")
}
}
}
if opts.RequireClientCert {
// make sure we have both Key and Certificate
if opts.Key != nil &&
opts.Certificate != nil {
cert, err := tls.X509KeyPair(opts.Certificate,
opts.Key)
if err != nil {
return errors.WithMessage(err, "failed to "+
"load client certificate")
}
client.tlsConfig.Certificates = append(
client.tlsConfig.Certificates, cert)
} else {
return errors.New("both Key and Certificate " +
"are required when using mutual TLS")
}
}
if opts.TimeShift > 0 {
client.tlsConfig.Time = func() time.Time {
return time.Now().Add((-1) * opts.TimeShift)
}
}
return nil
}
// Certificate returns the tls.Certificate used to make TLS connections
// when client certificates are required by the server
func (client *GRPCClient) Certificate() tls.Certificate {
cert := tls.Certificate{}
if client.tlsConfig != nil && len(client.tlsConfig.Certificates) > 0 {
cert = client.tlsConfig.Certificates[0]
}
return cert
}
// TLSEnabled is a flag indicating whether to use TLS for client
// connections
func (client *GRPCClient) TLSEnabled() bool {
return client.tlsConfig != nil
}
// MutualTLSRequired is a flag indicating whether the client
// must send a certificate when making TLS connections
func (client *GRPCClient) MutualTLSRequired() bool {
return client.tlsConfig != nil &&
len(client.tlsConfig.Certificates) > 0
}
// SetMaxRecvMsgSize sets the maximum message size the client can receive
func (client *GRPCClient) SetMaxRecvMsgSize(size int) {
client.maxRecvMsgSize = size
}
// SetMaxSendMsgSize sets the maximum message size the client can send
func (client *GRPCClient) SetMaxSendMsgSize(size int) {
client.maxSendMsgSize = size
}
// SetServerRootCAs sets the list of authorities used to verify server
// certificates based on a list of PEM-encoded X509 certificate authorities
func (client *GRPCClient) SetServerRootCAs(serverRoots [][]byte) error {
// NOTE: if no serverRoots are specified, the current cert pool will be
// replaced with an empty one
certPool := x509.NewCertPool()
for _, root := range serverRoots {
err := AddPemToCertPool(root, certPool)
if err != nil {
return errors.WithMessage(err, "error adding root certificate")
}
}
client.tlsConfig.RootCAs = certPool
return nil
}
type TLSOption func(tlsConfig *tls.Config)
func ServerNameOverride(name string) TLSOption {
return func(tlsConfig *tls.Config) {
tlsConfig.ServerName = name
}
}
func CertPoolOverride(pool *x509.CertPool) TLSOption {
return func(tlsConfig *tls.Config) {
tlsConfig.RootCAs = pool
}
}
// NewConnection returns a grpc.ClientConn for the target address and
// overrides the server name used to verify the hostname on the
// certificate returned by a server when using TLS
func (client *GRPCClient) NewConnection(address string, tlsOptions ...TLSOption) (*grpc.ClientConn, error) {
var dialOpts []grpc.DialOption
dialOpts = append(dialOpts, client.dialOpts...)
// set transport credentials and max send/recv message sizes
// immediately before creating a connection in order to allow
// SetServerRootCAs / SetMaxRecvMsgSize / SetMaxSendMsgSize
// to take effect on a per connection basis
if client.tlsConfig != nil {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(
&DynamicClientCredentials{
TLSConfig: client.tlsConfig,
TLSOptions: tlsOptions,
},
))
} else {
dialOpts = append(dialOpts, grpc.WithInsecure())
}
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(client.maxRecvMsgSize),
grpc.MaxCallSendMsgSize(client.maxSendMsgSize),
))
ctx, cancel := context.WithTimeout(context.Background(), client.timeout)
defer cancel()
conn, err := grpc.DialContext(ctx, address, dialOpts...)
if err != nil {
return nil, errors.WithMessage(errors.WithStack(err),
"failed to create new connection")
}
return conn, nil
}