/
dispatcher.go
129 lines (114 loc) · 3.7 KB
/
dispatcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package grpc
import (
"bytes"
"context"
"errors"
"strings"
"time"
"github.com/go-coldbrew/grpcpool"
"github.com/gojek/fiber"
fiberError "github.com/gojek/fiber/errors"
"github.com/gojek/fiber/protocol"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
func init() {
encoding.RegisterCodec(NewFiberCodec())
}
const (
TimeoutDefault = time.Second
// Number of gRPC connection in a pool
ConnPoolCount = 10
)
type Dispatcher struct {
timeout time.Duration
// serviceMethod is the service and method of server point in the format "{grpc_service_name}/{method_name}"
serviceMethod string
// endpoint is the host+port of the grpc server, eg "127.0.0.1:50050"
endpoint string
// conn is the grpc connection dialed upon creation of dispatcher
conn grpc.ClientConnInterface
}
type DispatcherConfig struct {
ServiceMethod string
Endpoint string
Timeout time.Duration
}
func (d *Dispatcher) Do(request fiber.Request) fiber.Response {
grpcRequest, ok := request.(*Request)
if !ok {
return fiber.NewErrorResponse(
fiberError.FiberError{
Code: int(codes.InvalidArgument),
Message: "fiber: grpc dispatcher: only grpc.Request type of requests are supported",
})
}
ctx, cancel := context.WithTimeout(context.Background(), d.timeout)
defer cancel()
ctx = metadata.NewOutgoingContext(ctx, grpcRequest.Metadata)
response := new(bytes.Buffer)
var responseHeader metadata.MD
// Dispatcher will send both request and payload as bytes, with the use of codec
// to prevent marshaling. The codec content type will be sent with request and
// the server will attempt to unmarshal with the codec.
err := d.conn.Invoke(
ctx,
d.serviceMethod,
grpcRequest.Payload(),
response,
grpc.Header(&responseHeader),
grpc.CallContentSubtype(codecName),
grpc.WaitForReady(true),
)
if err != nil {
// if ok is false, unknown codes.Unknown and Status msg is returned in Status
responseStatus, _ := status.FromError(err)
return fiber.NewErrorResponse(
fiberError.FiberError{
Code: int(responseStatus.Code()),
Message: responseStatus.String(),
})
}
return &Response{
Metadata: responseHeader,
Message: response.Bytes(),
Status: *status.New(codes.OK, "Success"),
}
}
// NewDispatcher is the constructor to create a dispatcher. It will create the clientconn and set defaults.
// Endpoint, serviceMethod and response proto are required minimally to work.
func NewDispatcher(config DispatcherConfig) (*Dispatcher, error) {
configuredTimeout := TimeoutDefault
if config.Timeout != 0 {
configuredTimeout = config.Timeout
}
if config.Endpoint == "" || config.ServiceMethod == "" {
return nil, fiberError.ErrInvalidInput(
protocol.GRPC,
errors.New("grpc dispatcher: missing config (endpoint/serviceMethod)"))
}
var serviceMethodStringBuilder strings.Builder
if !strings.HasPrefix(config.ServiceMethod, "/") {
serviceMethodStringBuilder.WriteString("/")
}
serviceMethodStringBuilder.WriteString(config.ServiceMethod)
conn, err := grpcpool.DialContext(context.Background(), config.Endpoint, ConnPoolCount, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
// if ok is false, unknown codes.Unknown and Status msg is returned in Status
responseStatus, _ := status.FromError(err)
return nil, fiberError.ErrRequestFailed(
protocol.GRPC,
errors.New("grpc dispatcher: "+responseStatus.String()))
}
dispatcher := &Dispatcher{
timeout: configuredTimeout,
serviceMethod: serviceMethodStringBuilder.String(),
endpoint: config.Endpoint,
conn: conn,
}
return dispatcher, nil
}