/
client.go
124 lines (104 loc) · 3.78 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package grpcclient
import (
"fmt"
"time"
"github.com/dbunion/com/rpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
)
// grpcDialOptions is a registry of functions that append grpcDialOption to use when dialing a service
var grpcDialOptions []func(opts []grpc.DialOption, grpcAuthStaticPassword []byte) ([]grpc.DialOption, error)
// RegisterGRPCDialOptions registers an implementation of AuthServer.
func RegisterGRPCDialOptions(grpcDialOptionsFunc func(opts []grpc.DialOption, grpcAuthStaticPassword []byte) ([]grpc.DialOption, error)) {
grpcDialOptions = append(grpcDialOptions, grpcDialOptionsFunc)
}
// DefaultConfig - grpc client dial default config
var DefaultConfig rpc.Config = rpc.Config{
MaxMessageSize: rpc.DefaultMaxMessageSize,
GRPCKeepAliveTime: 10 * time.Second,
GRPCKeepAliveTimeout: 10 * time.Second,
GRPCInitialConnWindowSize: 0,
GRPCInitialWindowSize: 0,
}
// Conn - grpc conn
type Conn struct {
*grpc.ClientConn
cfg rpc.Config
}
// NewConn - create new grpc client conn
func NewConn(target string, cfg *rpc.Config) (*Conn, error) {
conn := &Conn{
cfg: *cfg,
}
return conn.Dial(target)
}
// Close - release conn
func (c *Conn) Close() error {
return c.ClientConn.Close()
}
// Dial creates a grpc connection to the given target.
// failFast is a non-optional parameter because callers are required to specify
// what that should be.
func (c *Conn) Dial(target string, opts ...grpc.DialOption) (*Conn, error) {
newOpts := []grpc.DialOption{
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(int(c.cfg.MaxMessageSize)),
grpc.MaxCallSendMsgSize(int(c.cfg.MaxMessageSize)),
),
}
if c.cfg.GRPCKeepAliveTime != 0 || c.cfg.GRPCKeepAliveTimeout != 0 {
kp := keepalive.ClientParameters{
// After a duration of this time if the client doesn't see any activity it pings the server to see if the transport is still alive.
Time: c.cfg.GRPCKeepAliveTime,
// After having pinged for keepalive check, the client waits for a duration of Timeout and if no activity is seen even after that
// the connection is closed. (This will eagerly fail inflight grpc requests even if they don't have timeouts.)
Timeout: c.cfg.GRPCKeepAliveTimeout,
PermitWithoutStream: true,
}
newOpts = append(newOpts, grpc.WithKeepaliveParams(kp))
}
if c.cfg.GRPCInitialConnWindowSize != 0 {
newOpts = append(newOpts, grpc.WithInitialConnWindowSize(c.cfg.GRPCInitialConnWindowSize))
}
if c.cfg.GRPCInitialWindowSize != 0 {
newOpts = append(newOpts, grpc.WithInitialWindowSize(c.cfg.GRPCInitialWindowSize))
}
var err error
for _, grpcDialOptionInitializer := range grpcDialOptions {
newOpts, err = grpcDialOptionInitializer(newOpts, []byte(c.cfg.GRPCAuthStaticPassword))
if err != nil {
return nil, fmt.Errorf("there was an error initializing client grpc.DialOption: %v", err)
}
}
// secure dial opt init
secOpt, err := secureDialOption(c.cfg)
if err == nil {
newOpts = append(newOpts, secOpt)
}
if len(opts) > 0 {
newOpts = append(newOpts, opts...)
}
c.ClientConn, err = grpc.Dial(target, newOpts...)
if err != nil {
return nil, err
}
return c, nil
}
// secureDialOption returns the gRPC dial option to use for the
// given client connection. It is either using TLS, or Insecure if
// nothing is set.
func secureDialOption(cfg rpc.Config) (grpc.DialOption, error) {
// No security options set, just return.
if (cfg.GRPCCert == "" || cfg.GRPCKey == "") && cfg.GRPCCA == "" {
return grpc.WithInsecure(), nil
}
// Load the config.
config, err := rpc.ClientConfig(cfg.GRPCCert, cfg.GRPCKey, cfg.GRPCCA, cfg.GRPCServerName)
if err != nil {
return nil, err
}
// Create the creds server options.
creds := credentials.NewTLS(config)
return grpc.WithTransportCredentials(creds), nil
}