/
rpc_client_pool.go
152 lines (132 loc) · 3.81 KB
/
rpc_client_pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package backend_utils
import (
"google.golang.org/grpc"
"errors"
"log"
"io"
)
const (
PKG_NAME = "RpcClientPool"
VERSION = "1.1"
)
var (
ERR_FATAL error = errors.New("Fatal error.")
)
type ConnEndpointInfo struct {
Tls bool
CertFile string
ServerHostOverride string
ServerAddr string
}
type RpcClientPool struct {
doHeartBeat func(*grpc.ClientConn) error
conn_pool chan *grpc.ClientConn
conn_endpoints map[*grpc.ClientConn] int
endpoints_map map[int] interface{}
elog *log.Logger
ilog *log.Logger
pool_created bool
}
func (r *RpcClientPool) createPool(endpoints []interface{}, conn_per_ep int) error {
if len(endpoints) == 0 || conn_per_ep == 0 {
r.elog.Println("Failed creating conn pool.")
return ERR_FATAL
}
r.conn_endpoints = make(map[*grpc.ClientConn] int, conn_per_ep * len(endpoints))
r.conn_pool = make(chan *grpc.ClientConn, conn_per_ep * len(endpoints))
r.endpoints_map = make(map[int] interface{}, len(endpoints))
for i := range endpoints {
r.endpoints_map[i] = endpoints[i]
for j := 0; j < conn_per_ep; j++ {
new_conn, err := r.newRPCConn(endpoints[i])
if err != nil {
r.elog.Printf("Failed creating connection Ep: %+v. Err:%s\n", endpoints[i], err.Error())
continue
}
r.conn_endpoints[new_conn] = i
r.Put(new_conn)
r.ilog.Printf("Successfully created new connection to Ep:%+v\n", endpoints[i])
}
}
if len(r.conn_endpoints) == 0 {
r.elog.Println("Failed creating any connection.")
return ERR_FATAL
}
r.pool_created = true
return nil
}
func (r *RpcClientPool) newRPCConn(ep interface{}) (*grpc.ClientConn, error) {
var cli *GrpcClientConfig
switch ep.(type) {
case ConnEndpointInfo:
cli = &GrpcClientConfig{
UseTls: ep.(ConnEndpointInfo).Tls,
ServerHostOverride: ep.(ConnEndpointInfo).ServerHostOverride,
ServerAddr: ep.(ConnEndpointInfo).ServerAddr,
UseJwt: false,
}
break
case GrpcClientConfig:
cli = &GrpcClientConfig{
UseTls: ep.(GrpcClientConfig).UseTls,
ServerHostOverride: ep.(GrpcClientConfig).ServerHostOverride,
ServerAddr: ep.(GrpcClientConfig).ServerAddr,
UseJwt: ep.(GrpcClientConfig).UseJwt,
JwtToken: ep.(GrpcClientConfig).JwtToken,
}
break
}
conn, err := cli.NewRPCConn()
if err != nil {
r.elog.Printf("Failed to dial. ERR:%s\n", err.Error())
return nil, err
}
r.ilog.Printf("Established new RPC connection to %s.\n", cli.ServerAddr)
return conn, nil
}
func NewRpcClientPool(do_heartbeat func(*grpc.ClientConn) error, endpoints []interface{},
conn_per_ep int, logr_op io.Writer) *RpcClientPool {
client_pool := new(RpcClientPool)
client_pool.doHeartBeat = do_heartbeat
client_pool.initLogger(logr_op)
if err := client_pool.createPool(endpoints, conn_per_ep); err != nil {
client_pool.elog.Printf("Failed to create RPC pool. ERR:%s\n", err.Error())
return nil
}
return client_pool
}
func (r *RpcClientPool) initLogger(logger_op io.Writer) {
err_prefix := PKG_NAME + ":" + VERSION + "\tERROR\t"
info_prefix := PKG_NAME + ":" + VERSION + "\tINFO\t"
r.elog = log.New(logger_op, err_prefix, log.Ldate|log.Ltime|log.Lshortfile)
r.ilog = log.New(logger_op, info_prefix, log.Ldate|log.Ltime|log.Lshortfile)
}
func (r *RpcClientPool) Get() *grpc.ClientConn {
if len(r.conn_endpoints) == 0 {
r.elog.Println("No more connections in map.")
return nil
}
var conn *grpc.ClientConn
select {
case conn = <- r.conn_pool:
if err := r.doHeartBeat(conn); err != nil {
ep := r.conn_endpoints[conn]
delete(r.conn_endpoints, conn)
conn, err = r.newRPCConn(r.endpoints_map[ep])
if err != nil {
r.elog.Printf("Failed to re-establish connection. Ep:%+v ERR:%s\n", ep, err.Error())
// Try to get another connection.
return r.Get()
}
r.conn_endpoints[conn] = ep
}
default:
}
return conn
}
func (r *RpcClientPool) Put(conn *grpc.ClientConn) {
select {
case r.conn_pool <- conn:
default:
}
}