forked from go-kit/kit
-
Notifications
You must be signed in to change notification settings - Fork 0
/
subscriber.go
197 lines (166 loc) · 5.88 KB
/
subscriber.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
package nats
import (
"context"
"encoding/json"
"github.com/AshleyDumaine/kit/endpoint"
"github.com/AshleyDumaine/kit/log"
"github.com/AshleyDumaine/kit/transport"
"github.com/nats-io/nats.go"
)
// Subscriber wraps an endpoint and provides nats.MsgHandler.
type Subscriber struct {
e endpoint.Endpoint
dec DecodeRequestFunc
enc EncodeResponseFunc
before []RequestFunc
after []SubscriberResponseFunc
errorEncoder ErrorEncoder
finalizer []SubscriberFinalizerFunc
errorHandler transport.ErrorHandler
}
// NewSubscriber constructs a new subscriber, which provides nats.MsgHandler and wraps
// the provided endpoint.
func NewSubscriber(
e endpoint.Endpoint,
dec DecodeRequestFunc,
enc EncodeResponseFunc,
options ...SubscriberOption,
) *Subscriber {
s := &Subscriber{
e: e,
dec: dec,
enc: enc,
errorEncoder: DefaultErrorEncoder,
errorHandler: transport.NewLogErrorHandler(log.NewNopLogger()),
}
for _, option := range options {
option(s)
}
return s
}
// SubscriberOption sets an optional parameter for subscribers.
type SubscriberOption func(*Subscriber)
// SubscriberBefore functions are executed on the publisher request object before the
// request is decoded.
func SubscriberBefore(before ...RequestFunc) SubscriberOption {
return func(s *Subscriber) { s.before = append(s.before, before...) }
}
// SubscriberAfter functions are executed on the subscriber reply after the
// endpoint is invoked, but before anything is published to the reply.
func SubscriberAfter(after ...SubscriberResponseFunc) SubscriberOption {
return func(s *Subscriber) { s.after = append(s.after, after...) }
}
// SubscriberErrorEncoder is used to encode errors to the subscriber reply
// whenever they're encountered in the processing of a request. Clients can
// use this to provide custom error formatting. By default,
// errors will be published with the DefaultErrorEncoder.
func SubscriberErrorEncoder(ee ErrorEncoder) SubscriberOption {
return func(s *Subscriber) { s.errorEncoder = ee }
}
// SubscriberErrorLogger is used to log non-terminal errors. By default, no errors
// are logged. This is intended as a diagnostic measure. Finer-grained control
// of error handling, including logging in more detail, should be performed in a
// custom SubscriberErrorEncoder which has access to the context.
// Deprecated: Use SubscriberErrorHandler instead.
func SubscriberErrorLogger(logger log.Logger) SubscriberOption {
return func(s *Subscriber) { s.errorHandler = transport.NewLogErrorHandler(logger) }
}
// SubscriberErrorHandler is used to handle non-terminal errors. By default, non-terminal errors
// are ignored. This is intended as a diagnostic measure. Finer-grained control
// of error handling, including logging in more detail, should be performed in a
// custom SubscriberErrorEncoder which has access to the context.
func SubscriberErrorHandler(errorHandler transport.ErrorHandler) SubscriberOption {
return func(s *Subscriber) { s.errorHandler = errorHandler }
}
// SubscriberFinalizer is executed at the end of every request from a publisher through NATS.
// By default, no finalizer is registered.
func SubscriberFinalizer(f ...SubscriberFinalizerFunc) SubscriberOption {
return func(s *Subscriber) { s.finalizer = f }
}
// ServeMsg provides nats.MsgHandler.
func (s Subscriber) ServeMsg(nc *nats.Conn) func(msg *nats.Msg) {
return func(msg *nats.Msg) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if len(s.finalizer) > 0 {
defer func() {
for _, f := range s.finalizer {
f(ctx, msg)
}
}()
}
for _, f := range s.before {
ctx = f(ctx, msg)
}
request, err := s.dec(ctx, msg)
if err != nil {
s.errorHandler.Handle(ctx, err)
if msg.Reply == "" {
return
}
s.errorEncoder(ctx, err, msg.Reply, nc)
return
}
response, err := s.e(ctx, request)
if err != nil {
s.errorHandler.Handle(ctx, err)
if msg.Reply == "" {
return
}
s.errorEncoder(ctx, err, msg.Reply, nc)
return
}
for _, f := range s.after {
ctx = f(ctx, nc)
}
if msg.Reply == "" {
return
}
if err := s.enc(ctx, msg.Reply, nc, response); err != nil {
s.errorHandler.Handle(ctx, err)
s.errorEncoder(ctx, err, msg.Reply, nc)
return
}
}
}
// ErrorEncoder is responsible for encoding an error to the subscriber reply.
// Users are encouraged to use custom ErrorEncoders to encode errors to
// their replies, and will likely want to pass and check for their own error
// types.
type ErrorEncoder func(ctx context.Context, err error, reply string, nc *nats.Conn)
// ServerFinalizerFunc can be used to perform work at the end of an request
// from a publisher, after the response has been written to the publisher. The principal
// intended use is for request logging.
type SubscriberFinalizerFunc func(ctx context.Context, msg *nats.Msg)
// NopRequestDecoder is a DecodeRequestFunc that can be used for requests that do not
// need to be decoded, and simply returns nil, nil.
func NopRequestDecoder(_ context.Context, _ *nats.Msg) (interface{}, error) {
return nil, nil
}
// EncodeJSONResponse is a EncodeResponseFunc that serializes the response as a
// JSON object to the subscriber reply. Many JSON-over services can use it as
// a sensible default.
func EncodeJSONResponse(_ context.Context, reply string, nc *nats.Conn, response interface{}) error {
b, err := json.Marshal(response)
if err != nil {
return err
}
return nc.Publish(reply, b)
}
// DefaultErrorEncoder writes the error to the subscriber reply.
func DefaultErrorEncoder(_ context.Context, err error, reply string, nc *nats.Conn) {
logger := log.NewNopLogger()
type Response struct {
Error string `json:"err"`
}
var response Response
response.Error = err.Error()
b, err := json.Marshal(response)
if err != nil {
logger.Log("err", err)
return
}
if err := nc.Publish(reply, b); err != nil {
logger.Log("err", err)
}
}