-
Notifications
You must be signed in to change notification settings - Fork 0
/
grpc.go
230 lines (191 loc) · 5.82 KB
/
grpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
package grpc
import (
"context"
"errors"
"sync"
"github.com/easeq/go-service/client"
"github.com/easeq/go-service/component"
"github.com/easeq/go-service/logger"
"github.com/easeq/go-service/pool"
"github.com/easeq/go-service/registry"
"google.golang.org/grpc"
)
const (
// Default registry connection scheme
defaultScheme = "http"
)
var (
// ErrInvalidAddress returned when the address provided is invalid
ErrInvalidAddress = errors.New("invalid service address")
// ErrInvalidRegistry returned when the registry provided does not implement regsitry.ServiceRegsitry
ErrInvalidRegistry = errors.New("registry provided is invalid")
// ErrTooFewArgs returned when the args provided is less the args required
ErrTooFewArgs = errors.New("too few arguments. Required 3")
// ErrTooFewFactoryArgs returned when the args provided is less the args required
ErrTooFewFactoryArgs = errors.New("too few arguments for the factory. Required 1 address")
// ErrInvalidDialOptions returned when the dial options provided are not valid
ErrInvalidDialOptions = errors.New("dial options provided are invalid")
// ErrInvalidGrpcClient returned when type assertion to GrpcClient fails
ErrInvalidGrpcClient = errors.New("invalid GrpcClient")
// ErrInvalidStreamDescription returned when the variable passed is not of grpc.StreamDesc type
ErrInvalidStreamDescription = errors.New("invalid stream description")
// ErrInvalidFactoryConn returned when factory conn is invalid
ErrInvalidFactoryConn = errors.New("invalid factory client connection")
)
// ClientOption to pass as arg while creating new service
type ClientOption func(*Grpc)
// Grpc client that holds the reference to the pool,
// along with other configuration required to create the pool
// It holds a reference to the service registry used by the service.
type Grpc struct {
i component.Initializer
logger logger.Logger
pool *pool.ConnectionPool
factory pool.Factory
closeFunc pool.CloseFunc
Registry registry.ServiceRegistry
sync.RWMutex
}
// NewGrpc creates a new gRPC client
func NewGrpc(opts ...ClientOption) *Grpc {
c := new(Grpc)
for _, opt := range opts {
opt(c)
}
c.i = NewInitializer(c)
c.pool = pool.NewPool(
pool.WithFactory(c.factory),
pool.WithSize(10),
pool.WithCloseFunc(c.closeFunc),
pool.WithLogger(c.logger),
)
return c
}
// WithRegistry passes services registry externally
func WithRegistry(registry registry.ServiceRegistry) ClientOption {
return func(c *Grpc) {
c.Registry = registry
}
}
// WithFactory defines the client connection creation factory
func WithFactory(factory pool.Factory) ClientOption {
return func(c *Grpc) {
c.factory = factory
}
}
// WithCloseFunc passes the callback function to close the gRPC connection in the pool
func WithCloseFunc(closeFunc pool.CloseFunc) ClientOption {
return func(c *Grpc) {
c.closeFunc = closeFunc
}
}
// Dial creates/gets a connection from the pool using the address from the service registry
func (c *Grpc) Dial(name string, opts ...client.DialOption) (pool.Connection, error) {
address := c.Registry.ConnectionString(name, defaultScheme)
c.logger.Debugf("dial: %s", address)
return c.pool.Get(address)
}
// Get client conn
func (c *Grpc) GetConnFromPool(serviceName string) (pool.Connection, *grpc.ClientConn, error) {
pcc, err := c.Dial(serviceName)
if err != nil {
return nil, nil, err
}
cc, ok := pcc.Conn().(*grpc.ClientConn)
if !ok {
return nil, nil, ErrInvalidFactoryConn
}
return pcc, cc, nil
}
// Call gRPC method
func (c *Grpc) Call(
ctx context.Context,
sc client.ServiceClient,
method string,
req interface{},
res interface{},
opts ...client.CallOption,
) error {
pcc, cc, err := c.GetConnFromPool(sc.GetServiceName())
if err != nil {
c.logger.Errorf("conn error: %s", err)
return err
}
defer pcc.Close()
callOpts := make([]grpc.CallOption, len(opts))
for i, opt := range opts {
callOpts[i] = opt.(grpc.CallOption)
}
return cc.Invoke(ctx, method, req, res, callOpts...)
}
// Stream gRPC method
func (c *Grpc) Stream(
ctx context.Context,
sc client.ServiceClient,
desc interface{},
method string,
req interface{},
opts ...client.CallOption,
) (client.StreamClient, error) {
pcc, cc, err := c.GetConnFromPool(sc.GetServiceName())
if err != nil {
c.logger.Errorf("conn error: %s", err)
return nil, err
}
callOpts := make([]grpc.CallOption, len(opts))
for i, opt := range opts {
callOpts[i] = opt.(grpc.CallOption)
}
serviceDesc, ok := desc.(*grpc.StreamDesc)
if !ok {
return nil, ErrInvalidStreamDescription
}
stream, err := cc.NewStream(ctx, serviceDesc, method, callOpts...)
if err != nil {
return nil, err
}
gs := &GrpcStreamClient{stream, pcc}
if req == nil {
return gs, nil
}
if err := gs.stream.SendMsg(req); err != nil {
return nil, err
}
return gs, nil
}
func (c *Grpc) HasInitializer() bool {
return true
}
func (g *Grpc) Initializer() component.Initializer {
return g.i
}
// GrpcStreamClient is the gRPC client that allows streaming. It holds the stream and the connection to the gRPC server.
type GrpcStreamClient struct {
stream grpc.ClientStream
conn pool.Connection
}
// Recv receive a message from the stream
func (sc *GrpcStreamClient) Recv(res interface{}) error {
if err := sc.stream.RecvMsg(res); err != nil {
return err
}
return nil
}
// Send sends a message
func (sc *GrpcStreamClient) Send(req interface{}) error {
return sc.stream.SendMsg(req)
}
// CloseAndRecv first close the server stream and receives messages on the client stream
func (sc *GrpcStreamClient) CloseAndRecv(res interface{}) error {
if err := sc.stream.CloseSend(); err != nil {
return err
}
if err := sc.stream.RecvMsg(res); err != nil {
return err
}
return nil
}
// CloseConn closes the connection in the pool if the pool is full or adds it back to the pool
func (sc *GrpcStreamClient) CloseConn() error {
return sc.conn.Close()
}