-
Notifications
You must be signed in to change notification settings - Fork 8
/
manager.go
87 lines (75 loc) · 2.14 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package main
import (
"crypto/tls"
"io"
"sync"
"github.com/baetyl/baetyl-go/log"
"github.com/valyala/fasthttp"
"google.golang.org/grpc"
)
// Manager Manager
type Manager interface {
GetGRPCConnection(string, bool) (*grpc.ClientConn, error)
GetHttpClient() *fasthttp.Client
io.Closer
}
type manager struct {
log *log.Logger
cfg *ClientConfig
lock *sync.Mutex
httpClient *fasthttp.Client
connectionPool map[string]*grpc.ClientConn
}
// NewGRPCManager
func NewManager(cfg ClientConfig) Manager {
httpClient := &fasthttp.Client{
MaxConnsPerHost: cfg.Http.MaxConnsPerHost,
// TODO: support tls
TLSConfig: &tls.Config{InsecureSkipVerify: true},
ReadTimeout: cfg.Http.ReadTimeout,
MaxIdemponentCallAttempts: cfg.Http.MaxIdemponentCallAttempts,
MaxConnDuration: cfg.Http.MaxConnDuration,
}
return &manager{
log: log.With(log.Any("main", "manager")),
cfg: &cfg,
lock: &sync.Mutex{},
httpClient: httpClient,
connectionPool: map[string]*grpc.ClientConn{},
}
}
// GetGRPCConnection returns a new grpc connection for a given address and inits one if doesn't exist
func (g *manager) GetGRPCConnection(address string, recreateIfExists bool) (*grpc.ClientConn, error) {
if val, ok := g.connectionPool[address]; ok && !recreateIfExists {
return val, nil
}
g.lock.Lock()
defer g.lock.Unlock()
if val, ok := g.connectionPool[address]; ok && !recreateIfExists {
return val, nil
}
opts := []grpc.DialOption{
// TODO: tls support
grpc.WithInsecure(),
}
conn, err := grpc.Dial(address, opts...)
if err != nil {
g.log.Error("failed to create connection to server", log.Error(err), log.Any("address", address))
return nil, err
}
g.connectionPool[address] = conn
return conn, nil
}
// GetHttpClient returns a fasthttp client
func (g *manager) GetHttpClient() *fasthttp.Client {
return g.httpClient
}
func (g *manager) Close() error {
for address, conn := range g.connectionPool {
err := conn.Close()
if err != nil {
g.log.Warn("failed to close connection", log.Error(err), log.Any("address", address))
}
}
return nil
}