-
Notifications
You must be signed in to change notification settings - Fork 0
/
subscriber.go
182 lines (148 loc) · 3.93 KB
/
subscriber.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
package sqstransport
import (
"context"
"sync"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/transport"
)
// Subscriber is a go-kit sqs transport.
// Before, DecodeRequest, Handler, and ResponseHandler run inside the same anonymous function.
// This anonymous function creates a context, which uses the BaseContext as the parent,
// and is canceled when the function execution is finished.
// This anonymous function is passed to the runner.
// AfterBatch is run after each batch of messages.
type Subscriber struct {
// message processing callbacks are required: before, decodeRequest, handler, and responseHandler
before []RequestFunc
decodeRequest DecodeRequestFunc
handler endpoint.Endpoint
responseHandler []ResponseFunc
afterBatch AfterBatchFunc
inputFactory InputFactory
baseContext func() context.Context
runner Runner
errorHandler transport.ErrorHandler
cancel context.CancelFunc
initLock sync.Mutex
started bool
onExit func()
}
// New returns a new Subscriber.
// Mandatory options start with "Use...".
// (maybe they should be explicit, might change later ¯\_(ツ)_/¯)
func New(options ...Option) *Subscriber {
result := &Subscriber{}
for _, opt := range options {
opt(result)
}
if result.handler == nil {
panic("Handler is required")
}
if result.inputFactory == nil {
panic("InputFactory is required")
}
if result.decodeRequest == nil {
panic("DecodeRequest is required")
}
if len(result.responseHandler) == 0 {
panic("ResponseHandler is required")
}
if result.baseContext == nil {
result.baseContext = context.Background
}
if result.runner == nil {
result.runner = newDefaultRunner()
}
return result
}
// Serve starts receiving messages from the queue and calling the handler on each.
func (obj *Subscriber) Serve(ctx context.Context, l Client) error {
if obj.onExit != nil {
defer obj.onExit()
}
if err := obj.init(); err != nil {
return err
}
ctx, obj.cancel = context.WithCancel(ctx)
defer obj.cancel()
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
input, opts := obj.inputFactory()
output, err := l.ReceiveMessage(ctx, input, opts...)
if err != nil {
obj.notifyError(ctx, err)
continue
}
for _, msg := range output.Messages {
obj.runHandler(obj.baseContext(), msg)
}
if obj.afterBatch != nil {
obj.afterBatch(ctx)
}
}
}
func (obj *Subscriber) Shutdown() { obj.cancel() }
func (obj *Subscriber) runHandler(ctx context.Context, msg types.Message) {
obj.runner.Run(func() {
scopedCtx, cancel := context.WithCancel(ctx)
defer cancel()
scopedCtx = obj.runBefore(scopedCtx, msg)
req, err := obj.decodeRequest(scopedCtx, msg)
if err != nil {
err := &DecoderError{
Err: err,
Msg: msg,
}
obj.notifyError(scopedCtx, err)
return
}
resp, err := obj.handler(scopedCtx, req)
if err != nil {
err := &HandlerError{
Err: err,
Request: req,
Msg: msg,
}
obj.notifyError(scopedCtx, err)
return
}
obj.runResponseHandler(scopedCtx, msg, resp)
})
}
func (obj *Subscriber) runBefore(ctx context.Context, msg types.Message) context.Context {
for _, fn := range obj.before {
ctx = fn(ctx, msg)
if ctx == nil {
panic("before function returned a nil context. it must return a non-nil context")
}
}
return ctx
}
func (obj *Subscriber) runResponseHandler(ctx context.Context, msg types.Message, resp interface{}) {
for _, fn := range obj.responseHandler {
ctx = fn(ctx, msg, resp)
if ctx == nil {
panic("before function returned a nil context. it must return a non-nil context")
}
}
}
func (obj *Subscriber) notifyError(ctx context.Context, err error) {
if obj.errorHandler == nil {
return
}
obj.errorHandler.Handle(ctx, err)
}
func (obj *Subscriber) init() error {
obj.initLock.Lock()
defer obj.initLock.Unlock()
if obj.started {
return ErrAlreadyStarted
}
obj.started = true
return nil
}