From 7a77e0b454838e4f8b2d15a701ef2dd316015fa3 Mon Sep 17 00:00:00 2001 From: Tony Holdstock-Brown Date: Fri, 23 Feb 2024 09:44:15 -0800 Subject: [PATCH] pubsub: add batch timeouts This PR adds a timeout option to batcher. The timeout specifies how long the batcher waits before sending batched messages, even if the message length is under MinBatchSize. This ensures delivery guarantees within a given time period if there are low-throughput messages with a higher min batch size. To do: - [x] Implement batch timeouts in options - [x] Only allow one concurrent request to check for timeouts - [x] Handle timeouts when grabbing next batch - [ ] Tests --- pubsub/batcher/batcher.go | 47 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 45 insertions(+), 2 deletions(-) diff --git a/pubsub/batcher/batcher.go b/pubsub/batcher/batcher.go index ad1667e0b..8fd760a97 100644 --- a/pubsub/batcher/batcher.go +++ b/pubsub/batcher/batcher.go @@ -23,6 +23,8 @@ import ( "errors" "reflect" "sync" + "sync/atomic" + "time" ) // Split determines how to split n (representing n items) into batches based on @@ -72,6 +74,9 @@ type Batcher struct { pending []waiter // items waiting to be handled nHandlers int // number of currently running handler goroutines shutdown bool + + batchSizeTimeout time.Time // the time that len(pending) < opts.MinBatchSize, or zero time + batchTimeoutRunning int32 // atomic counter checking whether timeout wait is running } // Message is larger than the maximum batch byte size @@ -96,6 +101,9 @@ type Options struct { MaxBatchSize int // Maximum bytesize of a batch. 0 means no limit. MaxBatchByteSize int + // BatchTimeout the maximum time a batch can exist under MinBatchSize + // before being sent anyway. + BatchTimeout time.Duration } // newOptionsWithDefaults returns Options with defaults applied to opts. @@ -200,7 +208,30 @@ func (b *Batcher) AddNoWait(item interface{}) <-chan error { if b.nHandlers < b.opts.MaxHandlers { // If we can start a handler, do so with the item just added and any others that are pending. batch := b.nextBatch() - b.handleBatch(batch) + if batch != nil { + b.handleBatch(batch) + } + + if batch == nil && len(b.pending) > 0 { + // If the batch size timeout is zero, this is one of the first items to + // be added to the batch under the minimum batch size. Record when this + // happens so that .nextBatch() can grab the batch on timeout. + if b.batchSizeTimeout.IsZero() { + b.batchSizeTimeout = time.Now() + } + // Ensure that we send the batch after the given timeout. Only one + // concurrent process can run this goroutine, ensuring that we don't + // duplicate work. + if atomic.CompareAndSwapInt32(&b.batchTimeoutRunning, 0, 1) { + go func() { + <-time.After(b.opts.BatchTimeout) + batch = b.nextBatch() + if batch != nil { + b.handleBatch(batch) + } + }() + } + } } // If we can't start a handler, then one of the currently running handlers will // take our item. @@ -227,13 +258,25 @@ func (b *Batcher) nextBatch() []waiter { if len(b.pending) < b.opts.MinBatchSize { // We handle minimum batch sizes depending on specific // situations. - // XXX: If we allow max batch lifetimes, handle that here. + if time.Since(b.batchSizeTimeout) < b.opts.BatchTimeout { + // If we're within the max batch lifetime, respect minimum batch + // sizes and return nil. + return nil + } if b.shutdown == false { // If we're not shutting down, respect minimums. If we're // shutting down, though, we ignore minimums to flush the // entire batch. return nil } + // At this point, either we're shutting down or we've we've waited + // too long for the minimum size to be met. We're going to proceed + // with flushing the batch. + } + + if len(b.pending) < b.opts.MinBatchSize { + // reset the timeout counter to zero time + b.batchSizeTimeout = time.Time{} } if b.opts.MaxBatchByteSize == 0 && (b.opts.MaxBatchSize == 0 || len(b.pending) <= b.opts.MaxBatchSize) {