-
Notifications
You must be signed in to change notification settings - Fork 15
/
batch.go
487 lines (397 loc) · 12.2 KB
/
batch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
package processor
import (
"context"
"errors"
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"
"github.com/ethpandaops/xatu/pkg/observability"
"github.com/sirupsen/logrus"
)
// ItemExporter is an interface for exporting items.
type ItemExporter[T any] interface {
// ExportItems exports a batch of items.
//
// This function is called synchronously, so there is no concurrency
// safety requirement. However, due to the synchronous calling pattern,
// it is critical that all timeouts and cancellations contained in the
// passed context must be honored.
//
// Any retry logic must be contained in this function. The SDK that
// calls this function will not implement any retry logic. All errors
// returned by this function are considered unrecoverable and will be
// reported to a configured error Handler.
ExportItems(ctx context.Context, items []*T) error
// Shutdown notifies the exporter of a pending halt to operations. The
// exporter is expected to preform any cleanup or synchronization it
// requires while honoring all timeouts and cancellations contained in
// the passed context.
Shutdown(ctx context.Context) error
}
const (
DefaultMaxQueueSize = 51200
DefaultScheduleDelay = 5000
DefaultExportTimeout = 30000
DefaultMaxExportBatchSize = 512
DefaultShippingMethod = ShippingMethodAsync
DefaultNumWorkers = 5
)
// ShippingMethod is the method of shipping items for export.
type ShippingMethod string
const (
ShippingMethodUnknown ShippingMethod = "unknown"
ShippingMethodAsync ShippingMethod = "async"
ShippingMethodSync ShippingMethod = "sync"
)
// BatchItemProcessorOption is a functional option for the batch item processor.
type BatchItemProcessorOption func(o *BatchItemProcessorOptions)
type BatchItemProcessorOptions struct {
// MaxQueueSize is the maximum queue size to buffer items for delayed processing. If the
// queue gets full it drops the items.
// The default value of MaxQueueSize is 51200.
MaxQueueSize int
// BatchTimeout is the maximum duration for constructing a batch. Processor
// forcefully sends available items when timeout is reached.
// The default value of BatchTimeout is 5000 msec.
BatchTimeout time.Duration
// ExportTimeout specifies the maximum duration for exporting items. If the timeout
// is reached, the export will be cancelled.
// The default value of ExportTimeout is 30000 msec.
ExportTimeout time.Duration
// MaxExportBatchSize is the maximum number of items to include in a batch.
// The default value of MaxExportBatchSize is 512.
MaxExportBatchSize int
// ShippingMethod is the method of shipping items for export. The default value
// of ShippingMethod is "async".
ShippingMethod ShippingMethod
// Workers is the number of workers to process batches.
// The default value of Workers is 5.
Workers int
}
func (o *BatchItemProcessorOptions) Validate() error {
if o.MaxExportBatchSize > o.MaxQueueSize {
return errors.New("max export batch size cannot be greater than max queue size")
}
if o.Workers == 0 {
return errors.New("workers must be greater than 0")
}
if o.MaxExportBatchSize < 1 {
return errors.New("max export batch size must be greater than 0")
}
return nil
}
// BatchItemProcessor is a processor that batches items for export.
type BatchItemProcessor[T any] struct {
e ItemExporter[T]
o BatchItemProcessorOptions
log logrus.FieldLogger
queue chan traceableItem[T]
batchCh chan []traceableItem[T]
dropped uint32
name string
metrics *Metrics
timer *time.Timer
stopWait sync.WaitGroup
stopOnce sync.Once
stopCh chan struct{}
stopWorkersCh chan struct{}
}
type traceableItem[T any] struct {
item *T
errCh chan error
completedCh chan struct{}
}
// NewBatchItemProcessor creates a new batch item processor.
func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log logrus.FieldLogger, options ...BatchItemProcessorOption) (*BatchItemProcessor[T], error) {
maxQueueSize := DefaultMaxQueueSize
maxExportBatchSize := DefaultMaxExportBatchSize
if maxExportBatchSize > maxQueueSize {
if DefaultMaxExportBatchSize > maxQueueSize {
maxExportBatchSize = maxQueueSize
} else {
maxExportBatchSize = DefaultMaxExportBatchSize
}
}
o := BatchItemProcessorOptions{
BatchTimeout: time.Duration(DefaultScheduleDelay) * time.Millisecond,
ExportTimeout: time.Duration(DefaultExportTimeout) * time.Millisecond,
MaxQueueSize: maxQueueSize,
MaxExportBatchSize: maxExportBatchSize,
ShippingMethod: DefaultShippingMethod,
Workers: DefaultNumWorkers,
}
for _, opt := range options {
opt(&o)
}
if err := o.Validate(); err != nil {
return nil, fmt.Errorf("invalid batch item processor options: %w: %s", err, name)
}
metrics := DefaultMetrics
bvp := BatchItemProcessor[T]{
e: exporter,
o: o,
log: log,
name: name,
metrics: metrics,
timer: time.NewTimer(o.BatchTimeout),
queue: make(chan traceableItem[T], o.MaxQueueSize),
batchCh: make(chan []traceableItem[T], o.Workers),
stopCh: make(chan struct{}),
stopWorkersCh: make(chan struct{}),
}
bvp.log.WithFields(
logrus.Fields{
"workers": bvp.o.Workers,
"batch_timeout": bvp.o.BatchTimeout,
"export_timeout": bvp.o.ExportTimeout,
"max_export_batch_size": bvp.o.MaxExportBatchSize,
"max_queue_size": bvp.o.MaxQueueSize,
"shipping_method": bvp.o.ShippingMethod,
},
).Info("Batch item processor initialized")
bvp.stopWait.Add(o.Workers)
for i := 0; i < o.Workers; i++ {
go func(num int) {
defer bvp.stopWait.Done()
bvp.worker(context.Background(), num)
}(i)
}
go func() {
bvp.batchBuilder(context.Background())
bvp.log.Info("Batch builder exited")
}()
return &bvp, nil
}
// Write writes items to the queue. If the Processor is configured to use
// the sync shipping method, the items will be written to the queue and this
// function will return when all items have been processed. If the Processor is
// configured to use the async shipping method, the items will be written to
// the queue and this function will return immediately.
func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error {
_, span := observability.Tracer().Start(ctx, "BatchItemProcessor.Write")
defer span.End()
bvp.metrics.SetItemsQueued(bvp.name, float64(len(bvp.queue)))
if bvp.e == nil {
return errors.New("exporter is nil")
}
// Break our items up in to chunks that can be processed at
// one time by our workers. This is to prevent wasting
// resources sending items if we've failed an earlier
// batch.
batchSize := bvp.o.Workers * bvp.o.MaxExportBatchSize
for start := 0; start < len(s); start += batchSize {
end := start + batchSize
if end > len(s) {
end = len(s)
}
prepared := []traceableItem[T]{}
for _, i := range s[start:end] {
item := traceableItem[T]{
item: i,
}
if bvp.o.ShippingMethod == ShippingMethodSync {
item.errCh = make(chan error, 1)
item.completedCh = make(chan struct{}, 1)
}
prepared = append(prepared, item)
}
for _, i := range prepared {
if err := bvp.enqueueOrDrop(ctx, i); err != nil {
return err
}
}
if bvp.o.ShippingMethod == ShippingMethodSync {
for _, item := range prepared {
select {
case err := <-item.errCh:
if err != nil {
return err
}
case <-item.completedCh:
continue
case <-ctx.Done():
return ctx.Err()
}
}
}
}
return nil
}
// exportWithTimeout exports items with a timeout.
func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBatch []traceableItem[T]) error {
if bvp.o.ExportTimeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, bvp.o.ExportTimeout)
defer cancel()
}
items := make([]*T, len(itemsBatch))
for i, item := range itemsBatch {
items[i] = item.item
}
err := bvp.e.ExportItems(ctx, items)
if err != nil {
bvp.metrics.IncItemsFailedBy(bvp.name, float64(len(itemsBatch)))
} else {
bvp.metrics.IncItemsExportedBy(bvp.name, float64(len(itemsBatch)))
}
for _, item := range itemsBatch {
if item.errCh != nil {
item.errCh <- err
close(item.errCh)
}
if item.completedCh != nil {
item.completedCh <- struct{}{}
close(item.completedCh)
}
}
return nil
}
// Shutdown shuts down the batch item processor.
func (bvp *BatchItemProcessor[T]) Shutdown(ctx context.Context) error {
var err error
bvp.stopOnce.Do(func() {
wait := make(chan struct{})
go func() {
bvp.log.Info("Stopping processor")
close(bvp.stopCh)
bvp.timer.Stop()
bvp.drainQueue()
close(bvp.stopWorkersCh)
bvp.stopWait.Wait()
if bvp.e != nil {
if err = bvp.e.Shutdown(ctx); err != nil {
bvp.log.WithError(err).Error("failed to shutdown processor")
}
}
close(wait)
}()
select {
case <-wait:
case <-ctx.Done():
err = ctx.Err()
}
})
return err
}
func WithMaxQueueSize(size int) BatchItemProcessorOption {
return func(o *BatchItemProcessorOptions) {
o.MaxQueueSize = size
}
}
func WithMaxExportBatchSize(size int) BatchItemProcessorOption {
return func(o *BatchItemProcessorOptions) {
o.MaxExportBatchSize = size
}
}
func WithBatchTimeout(delay time.Duration) BatchItemProcessorOption {
return func(o *BatchItemProcessorOptions) {
o.BatchTimeout = delay
}
}
func WithExportTimeout(timeout time.Duration) BatchItemProcessorOption {
return func(o *BatchItemProcessorOptions) {
o.ExportTimeout = timeout
}
}
func WithShippingMethod(method ShippingMethod) BatchItemProcessorOption {
return func(o *BatchItemProcessorOptions) {
o.ShippingMethod = method
}
}
func WithWorkers(workers int) BatchItemProcessorOption {
return func(o *BatchItemProcessorOptions) {
o.Workers = workers
}
}
func (bvp *BatchItemProcessor[T]) batchBuilder(ctx context.Context) {
log := bvp.log.WithField("module", "batch_builder")
var batch []traceableItem[T]
for {
select {
case <-bvp.stopWorkersCh:
log.Info("Stopping batch builder")
return
case item := <-bvp.queue:
batch = append(batch, item)
if len(batch) >= bvp.o.MaxExportBatchSize {
bvp.sendBatch(batch, "max_export_batch_size")
batch = []traceableItem[T]{}
}
case <-bvp.timer.C:
if len(batch) > 0 {
bvp.sendBatch(batch, "timer")
batch = []traceableItem[T]{}
} else {
bvp.timer.Reset(bvp.o.BatchTimeout)
}
}
}
}
func (bvp *BatchItemProcessor[T]) sendBatch(batch []traceableItem[T], reason string) {
log := bvp.log.WithField("reason", reason)
log.Tracef("Creating a batch of %d items", len(batch))
batchCopy := make([]traceableItem[T], len(batch))
copy(batchCopy, batch)
log.Tracef("Batch items copied")
bvp.batchCh <- batchCopy
log.Tracef("Batch sent to batch channel")
}
func (bvp *BatchItemProcessor[T]) worker(ctx context.Context, number int) {
bvp.log.Infof("Starting worker %d", number)
for {
select {
case <-bvp.stopWorkersCh:
bvp.log.Infof("Stopping worker %d", number)
return
case batch := <-bvp.batchCh:
bvp.timer.Reset(bvp.o.BatchTimeout)
if err := bvp.exportWithTimeout(ctx, batch); err != nil {
bvp.log.WithError(err).Error("failed to export items")
}
}
}
}
func (bvp *BatchItemProcessor[T]) drainQueue() {
bvp.log.Info("Draining queue: waiting for the batch builder to pull all the items from the queue")
for len(bvp.queue) > 0 {
time.Sleep(10 * time.Millisecond)
}
bvp.log.Info("Draining queue: waiting for workers to finish processing batches")
for len(bvp.queue) > 0 {
<-bvp.queue
}
bvp.log.Info("Draining queue: all batches finished")
close(bvp.queue)
}
func recoverSendOnClosedChan() {
x := recover()
switch err := x.(type) {
case nil:
return
case runtime.Error:
if err.Error() == "send on closed channel" {
return
}
}
panic(x)
}
func (bvp *BatchItemProcessor[T]) enqueueOrDrop(ctx context.Context, item traceableItem[T]) error {
// This ensures the bvp.queue<- below does not panic as the
// processor shuts down.
defer recoverSendOnClosedChan()
select {
case <-bvp.stopCh:
return errors.New("processor is shutting down")
default:
}
select {
case bvp.queue <- item:
return nil
default:
atomic.AddUint32(&bvp.dropped, 1)
bvp.metrics.IncItemsDroppedBy(bvp.name, float64(1))
}
return errors.New("queue is full")
}