forked from go-kit/kit
-
Notifications
You must be signed in to change notification settings - Fork 0
/
grpc.go
239 lines (211 loc) · 8.9 KB
/
grpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
package addtransport
import (
"context"
"errors"
"time"
"google.golang.org/grpc"
stdopentracing "github.com/opentracing/opentracing-go"
stdzipkin "github.com/openzipkin/zipkin-go"
"github.com/sony/gobreaker"
oldcontext "golang.org/x/net/context"
"golang.org/x/time/rate"
"github.com/go-kit/kit/circuitbreaker"
"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/ratelimit"
"github.com/go-kit/kit/tracing/opentracing"
"github.com/go-kit/kit/tracing/zipkin"
grpctransport "github.com/go-kit/kit/transport/grpc"
"github.com/go-kit/kit/examples/addsvc/pb"
"github.com/go-kit/kit/examples/addsvc/pkg/addendpoint"
"github.com/go-kit/kit/examples/addsvc/pkg/addservice"
)
type grpcServer struct {
sum grpctransport.Handler
concat grpctransport.Handler
}
// NewGRPCServer makes a set of endpoints available as a gRPC AddServer.
func NewGRPCServer(endpoints addendpoint.Set, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer, logger log.Logger) pb.AddServer {
// Zipkin GRPC Server Trace can either be instantiated per gRPC method with a
// provided operation name or a global tracing service can be instantiated
// without an operation name and fed to each Go kit gRPC server as a
// ServerOption.
// In the latter case, the operation name will be the endpoint's grpc method
// path if used in combination with the Go kit gRPC Interceptor.
//
// In this example, we demonstrate a global Zipkin tracing service with
// Go kit gRPC Interceptor.
zipkinServer := zipkin.GRPCServerTrace(zipkinTracer)
options := []grpctransport.ServerOption{
grpctransport.ServerErrorLogger(logger),
zipkinServer,
}
return &grpcServer{
sum: grpctransport.NewServer(
endpoints.SumEndpoint,
decodeGRPCSumRequest,
encodeGRPCSumResponse,
append(options, grpctransport.ServerBefore(opentracing.GRPCToContext(otTracer, "Sum", logger)))...,
),
concat: grpctransport.NewServer(
endpoints.ConcatEndpoint,
decodeGRPCConcatRequest,
encodeGRPCConcatResponse,
append(options, grpctransport.ServerBefore(opentracing.GRPCToContext(otTracer, "Concat", logger)))...,
),
}
}
func (s *grpcServer) Sum(ctx oldcontext.Context, req *pb.SumRequest) (*pb.SumReply, error) {
_, rep, err := s.sum.ServeGRPC(ctx, req)
if err != nil {
return nil, err
}
return rep.(*pb.SumReply), nil
}
func (s *grpcServer) Concat(ctx oldcontext.Context, req *pb.ConcatRequest) (*pb.ConcatReply, error) {
_, rep, err := s.concat.ServeGRPC(ctx, req)
if err != nil {
return nil, err
}
return rep.(*pb.ConcatReply), nil
}
// NewGRPCClient returns an AddService backed by a gRPC server at the other end
// of the conn. The caller is responsible for constructing the conn, and
// eventually closing the underlying transport. We bake-in certain middlewares,
// implementing the client library pattern.
func NewGRPCClient(conn *grpc.ClientConn, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer, logger log.Logger) addservice.Service {
// We construct a single ratelimiter middleware, to limit the total outgoing
// QPS from this client to all methods on the remote instance. We also
// construct per-endpoint circuitbreaker middlewares to demonstrate how
// that's done, although they could easily be combined into a single breaker
// for the entire remote instance, too.
limiter := ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 100))
// Zipkin GRPC Client Trace can either be instantiated per gRPC method with a
// provided operation name or a global tracing client can be instantiated
// without an operation name and fed to each Go kit client as ClientOption.
// In the latter case, the operation name will be the endpoint's grpc method
// path.
//
// In this example, we demonstrace a global tracing client.
zipkinClient := zipkin.GRPCClientTrace(zipkinTracer)
// global client middlewares
options := []grpctransport.ClientOption{
zipkinClient,
}
// Each individual endpoint is an grpc/transport.Client (which implements
// endpoint.Endpoint) that gets wrapped with various middlewares. If you
// made your own client library, you'd do this work there, so your server
// could rely on a consistent set of client behavior.
var sumEndpoint endpoint.Endpoint
{
sumEndpoint = grpctransport.NewClient(
conn,
"pb.Add",
"Sum",
encodeGRPCSumRequest,
decodeGRPCSumResponse,
pb.SumReply{},
append(options, grpctransport.ClientBefore(opentracing.ContextToGRPC(otTracer, logger)))...,
).Endpoint()
sumEndpoint = opentracing.TraceClient(otTracer, "Sum")(sumEndpoint)
sumEndpoint = limiter(sumEndpoint)
sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
Name: "Sum",
Timeout: 30 * time.Second,
}))(sumEndpoint)
}
// The Concat endpoint is the same thing, with slightly different
// middlewares to demonstrate how to specialize per-endpoint.
var concatEndpoint endpoint.Endpoint
{
concatEndpoint = grpctransport.NewClient(
conn,
"pb.Add",
"Concat",
encodeGRPCConcatRequest,
decodeGRPCConcatResponse,
pb.ConcatReply{},
append(options, grpctransport.ClientBefore(opentracing.ContextToGRPC(otTracer, logger)))...,
).Endpoint()
concatEndpoint = opentracing.TraceClient(otTracer, "Concat")(concatEndpoint)
concatEndpoint = limiter(concatEndpoint)
concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
Name: "Concat",
Timeout: 10 * time.Second,
}))(concatEndpoint)
}
// Returning the endpoint.Set as a service.Service relies on the
// endpoint.Set implementing the Service methods. That's just a simple bit
// of glue code.
return addendpoint.Set{
SumEndpoint: sumEndpoint,
ConcatEndpoint: concatEndpoint,
}
}
// decodeGRPCSumRequest is a transport/grpc.DecodeRequestFunc that converts a
// gRPC sum request to a user-domain sum request. Primarily useful in a server.
func decodeGRPCSumRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
req := grpcReq.(*pb.SumRequest)
return addendpoint.SumRequest{A: int(req.A), B: int(req.B)}, nil
}
// decodeGRPCConcatRequest is a transport/grpc.DecodeRequestFunc that converts a
// gRPC concat request to a user-domain concat request. Primarily useful in a
// server.
func decodeGRPCConcatRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
req := grpcReq.(*pb.ConcatRequest)
return addendpoint.ConcatRequest{A: req.A, B: req.B}, nil
}
// decodeGRPCSumResponse is a transport/grpc.DecodeResponseFunc that converts a
// gRPC sum reply to a user-domain sum response. Primarily useful in a client.
func decodeGRPCSumResponse(_ context.Context, grpcReply interface{}) (interface{}, error) {
reply := grpcReply.(*pb.SumReply)
return addendpoint.SumResponse{V: int(reply.V), Err: str2err(reply.Err)}, nil
}
// decodeGRPCConcatResponse is a transport/grpc.DecodeResponseFunc that converts
// a gRPC concat reply to a user-domain concat response. Primarily useful in a
// client.
func decodeGRPCConcatResponse(_ context.Context, grpcReply interface{}) (interface{}, error) {
reply := grpcReply.(*pb.ConcatReply)
return addendpoint.ConcatResponse{V: reply.V, Err: str2err(reply.Err)}, nil
}
// encodeGRPCSumResponse is a transport/grpc.EncodeResponseFunc that converts a
// user-domain sum response to a gRPC sum reply. Primarily useful in a server.
func encodeGRPCSumResponse(_ context.Context, response interface{}) (interface{}, error) {
resp := response.(addendpoint.SumResponse)
return &pb.SumReply{V: int64(resp.V), Err: err2str(resp.Err)}, nil
}
// encodeGRPCConcatResponse is a transport/grpc.EncodeResponseFunc that converts
// a user-domain concat response to a gRPC concat reply. Primarily useful in a
// server.
func encodeGRPCConcatResponse(_ context.Context, response interface{}) (interface{}, error) {
resp := response.(addendpoint.ConcatResponse)
return &pb.ConcatReply{V: resp.V, Err: err2str(resp.Err)}, nil
}
// encodeGRPCSumRequest is a transport/grpc.EncodeRequestFunc that converts a
// user-domain sum request to a gRPC sum request. Primarily useful in a client.
func encodeGRPCSumRequest(_ context.Context, request interface{}) (interface{}, error) {
req := request.(addendpoint.SumRequest)
return &pb.SumRequest{A: int64(req.A), B: int64(req.B)}, nil
}
// encodeGRPCConcatRequest is a transport/grpc.EncodeRequestFunc that converts a
// user-domain concat request to a gRPC concat request. Primarily useful in a
// client.
func encodeGRPCConcatRequest(_ context.Context, request interface{}) (interface{}, error) {
req := request.(addendpoint.ConcatRequest)
return &pb.ConcatRequest{A: req.A, B: req.B}, nil
}
// These annoying helper functions are required to translate Go error types to
// and from strings, which is the type we use in our IDLs to represent errors.
// There is special casing to treat empty strings as nil errors.
func str2err(s string) error {
if s == "" {
return nil
}
return errors.New(s)
}
func err2str(err error) string {
if err == nil {
return ""
}
return err.Error()
}