-
Notifications
You must be signed in to change notification settings - Fork 0
/
handler.go
277 lines (230 loc) · 7.02 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
package conductor
import (
"context"
"fmt"
"log"
"sync"
"time"
"github.com/gofrs/uuid/v5"
"github.com/element84/swoop-go/pkg/config"
"github.com/element84/swoop-go/pkg/db"
"github.com/element84/swoop-go/pkg/errors"
)
const (
// TODO: make this a top-level config param
pollInterval = 600 * time.Second
// TODO: handler parameter, related to rate limiting and max concurrency
batchSize = 100
)
type nothing struct{}
func HandleActionWrapper(
ctx context.Context,
conn db.Conn,
thread *db.Thread,
isAsyncAction bool,
handleFn func() error,
) error {
var err error
// TODO: need a test to verify we don't leak locks
defer thread.Unlock(ctx, conn)
tx, err := conn.Begin(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx)
if isAsyncAction {
err = thread.InsertQueuedEvent(ctx, tx)
} else {
err = thread.InsertSuccessfulEvent(ctx, tx)
}
if err != nil {
return err
}
handleError := func(_err error, retryable bool) error {
err := tx.Rollback(ctx)
if err != nil {
return err
}
if retryable {
// TODO: we don't have a way to track attempts
// -> can't calculate backoff
// -> can't calculate retries exhausted
// Once we can, maybe put this logic in a higher-level function on the thread type?
// TODO: need to schedule handler poll once this backoff is due, else we'll miss it
return thread.InsertBackoffEvent(ctx, conn, 60, _err.Error())
}
return thread.InsertFailedEvent(ctx, conn, _err.Error())
}
err = handleFn()
if err != nil {
retryable := true
re, ok := err.(*errors.RequestError)
if ok {
retryable = re.Retryable
}
_err := handleError(err, retryable)
if _err != nil {
return fmt.Errorf(
"error running action: %s; while handling that error encountered another: %s",
err,
_err,
)
}
return err
}
return tx.Commit(ctx)
}
type HandlerClient interface {
HandleAction(ctx context.Context, conn db.Conn, thread *db.Thread) error
}
type Handler struct {
name string
isNotified chan nothing
conf *config.Handler
client HandlerClient
}
/*
SOME INCOHERENT AND LIKELY MISLEADING NOTES:
rate limit (configuredLimit) is actions per Second
- failure halves limit to Min(limit, Every(maxBackoff))
- success doubles limit to Max(limit, configuredLimit)
- we don't actually want to update the rate on every completion
- periodic rate update method takes successCount - failureCount to derive new limit
also need a maxConcurrency specifying how many actions can be in process at any given time
- defaults to Max(int(configuredLimit), 1)
- queryLimit is Min(maxConcurrency - currentActions, availableTokens)
- we can work out the rate limiting with https://pkg.go.dev/golang.org/x/time/rate
- we can work out the concurrency limit with https://pkg.go.dev/golang.org/x/sync/semaphore
- except we can't see the capacity, so we have a gap here...
- query is not thread-safe, but is a private method and is only called internally in one place
- Bad example code:
queryLimit := 0
for {
// This is dumb, we only need to
queryLimit = Min(maxConLimiter.Tokens(), maxConcurrency - currentActions)
if queryLmit >= 1 {
break
}
time.Sleep(100 * time.Milisecond)
}
err := Limiter.WaitN(ctx, queryLimit)
MAYBE BETTER IDEA:
We have a "semaphore" and a token bucket. The semaphore needs to return us
the max number of "resources" available, same with the bucket and number of
tokens, but we block on either until that number is at least 1. Then we
take the min of those values and request that allocation from each.
See the following resources:
- https://cs.opensource.google/go/x/sync/+/refs/tags/v0.3.0:semaphore/semaphore.go
- https://cs.opensource.google/go/x/time/+/master:rate/rate.go
Can we combine the bucket and semaphore into a single object? Like the
limit is adjusted based on the number of reserved, i.e., limit =
max_limit - allocated? Like a token bucket that you have to release as
well as request? <-- YES, THIS
*/
func (h *Handler) GetName() string {
return h.name
}
func (h *Handler) Notify() {
// TODO: batch requests until either count or timer reaches threshold before notifying
h.NotifyNow()
}
func (h *Handler) NotifyNow() {
select {
case h.isNotified <- nothing{}:
log.Printf("handler %s: received notification", h.name)
default:
// notification already pending, nothing to do
}
}
func (h *Handler) query(ctx context.Context, conn db.Conn, limit int) ([]*db.Thread, error) {
return db.GetProcessableThreads(ctx, conn, h.name, limit, []uuid.UUID{})
}
func (h *Handler) poller(ctx context.Context) {
// polling is implemented simply by periodic "notification"
for {
select {
case <-ctx.Done():
return
case <-time.After(pollInterval):
h.NotifyNow()
}
}
}
func (h *Handler) Run(ctx context.Context, conn db.Conn) error {
// escape hatch when context is done
select {
case <-ctx.Done():
return nil
default:
}
select {
case <-ctx.Done():
return nil
case <-h.isNotified:
}
threads, err := h.query(ctx, conn, batchSize)
if err != nil {
return err
}
log.Printf("handler %s: got %d threads", h.name, len(threads))
// TODO: test case for this condition
if len(threads) == batchSize {
// if we got as many records as we asked for then we suspect we have
// more records to process and we notify so we'll immediately query again
h.NotifyNow()
}
// We could handle the threads in a goroutine to unblock and increase
// throughput. But then we have to track all actionUuids using this
// connection, and ensure they are filtered from queries. The expense of
// queries then increases, and at a increasing volumes will actually begin
// to reduce throughput.
//
// Perhaps the best solution is ultimately to have a worker pool that can
// do the heavy lifiting for handlers. That could possibly be a worker pool
// per handler, to prevent other handlers being blocked, or maybe it is a
// global pool to keep load more predicatable. Either way, such a model
// would offer a means to process a block of records to completion in a
// blocking fashion, but allow multiple blocks to execute in parallel to
// help mitigate the impact of slow requests.
var wg sync.WaitGroup
for _, thread := range threads {
thread := thread
wg.Add(1)
go func() {
defer wg.Done()
err := h.client.HandleAction(ctx, conn, thread)
if err != nil {
log.Printf("handler %s: failed to process thread %s: %s", h.name, thread.Uuid, err)
return
}
log.Printf("handler %s: successfully processed thread %s", h.name, thread.Uuid)
}()
}
wg.Wait()
return nil
}
func (h *Handler) Start(ctx context.Context, dbConf *db.ConnectConfig) error {
// TODO: need some way to re-enter this, say when we get a db connection error or something
// wait.Until maybe?
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// force polling on start
h.NotifyNow()
conn, err := dbConf.Connect(ctx)
if err != nil {
return err
}
defer conn.Close(ctx)
go h.poller(ctx)
for {
select {
case <-ctx.Done():
return nil
default:
}
err = h.Run(ctx, conn)
if err != nil {
return err
}
}
}