From 7d91fcce4e57610b271e3d662f790e80704a16a0 Mon Sep 17 00:00:00 2001 From: Tony Holdstock-Brown Date: Fri, 23 Feb 2024 09:44:15 -0800 Subject: [PATCH] pubsub: add batch timeouts This PR adds a timeout option to batcher. The timeout specifies how long the batcher waits before sending batched messages, even if the message length is under MinBatchSize. This ensures delivery guarantees within a given time period if there are low-throughput messages with a higher min batch size. To do: - [x] Implement batch timeouts in options - [x] Only allow one concurrent request to check for timeouts - [x] Handle timeouts when grabbing next batch - [ ] Tests - [ ] Check to see if this will be accepted upstream --- pubsub/batcher/batcher.go | 57 ++++++++++++++++++++++++++++++++++----- 1 file changed, 50 insertions(+), 7 deletions(-) diff --git a/pubsub/batcher/batcher.go b/pubsub/batcher/batcher.go index 917cef822a..de53b68f52 100644 --- a/pubsub/batcher/batcher.go +++ b/pubsub/batcher/batcher.go @@ -23,6 +23,8 @@ import ( "errors" "reflect" "sync" + "sync/atomic" + "time" ) // Split determines how to split n (representing n items) into batches based on @@ -72,6 +74,9 @@ type Batcher struct { pending []waiter // items waiting to be handled nHandlers int // number of currently running handler goroutines shutdown bool + + batchSizeTimeout time.Time // the time that len(pending) < opts.MinBatchSize, or zero time + batchTimeoutRunning int32 // atomic counter checking whether timeout wait is running } // Message is larger than the maximum batch byte size @@ -96,6 +101,9 @@ type Options struct { MaxBatchSize int // Maximum bytesize of a batch. 0 means no limit. MaxBatchByteSize int + // BatchTimeout the maximum time a batch can exist under MinBatchSize + // before being sent anyway. + BatchTimeout time.Duration } // newOptionsWithDefaults returns Options with defaults applied to opts. @@ -201,12 +209,28 @@ func (b *Batcher) AddNoWait(item interface{}) <-chan error { // If we can start a handler, do so with the item just added and any others that are pending. batch := b.nextBatch() if batch != nil { - b.wg.Add(1) - go func() { - b.callHandler(batch) - b.wg.Done() - }() - b.nHandlers++ + b.handleBatch(batch) + } + + if batch == nil && len(b.pending) > 0 { + // If the batch size timeout is zero, this is one of the first items to + // be added to the batch under the minimum batch size. Record when this + // happens so that .nextBatch() can grab the batch on timeout. + if b.batchSizeTimeout.IsZero() { + b.batchSizeTimeout = time.Now() + } + + // Ensure that we send the batch after the given timeout. Only one + // concurrent process + if atomic.CompareAndSwapInt32(&b.batchTimeoutRunning, 0, 1) { + go func() { + <-time.After(b.opts.BatchTimeout) + batch = b.nextBatch() + if batch != nil { + b.handleBatch(batch) + } + }() + } } } // If we can't start a handler, then one of the currently running handlers will @@ -214,14 +238,33 @@ func (b *Batcher) AddNoWait(item interface{}) <-chan error { return c } +func (b *Batcher) handleBatch(batch []waiter) { + if batch == nil || len(batch) == 0 { + return + } + + b.wg.Add(1) + go func() { + b.callHandler(batch) + b.wg.Done() + }() + b.nHandlers++ +} + // nextBatch returns the batch to process, and updates b.pending. // It returns nil if there's no batch ready for processing. // b.mu must be held. func (b *Batcher) nextBatch() []waiter { - if len(b.pending) < b.opts.MinBatchSize { + // if there's a min batch size, only skip if we haven't yet waited for the batch timeout + if len(b.pending) < b.opts.MinBatchSize && time.Since(b.batchSizeTimeout) < b.opts.BatchTimeout { return nil } + if len(b.pending) < b.opts.MinBatchSize { + // reset the timeout counter to zero time + b.batchSizeTimeout = time.Time{} + } + if b.opts.MaxBatchByteSize == 0 && (b.opts.MaxBatchSize == 0 || len(b.pending) <= b.opts.MaxBatchSize) { // Send it all! batch := b.pending