diff --git a/pubsub/batcher/batcher.go b/pubsub/batcher/batcher.go index 917cef822a..de53b68f52 100644 --- a/pubsub/batcher/batcher.go +++ b/pubsub/batcher/batcher.go @@ -23,6 +23,8 @@ import ( "errors" "reflect" "sync" + "sync/atomic" + "time" ) // Split determines how to split n (representing n items) into batches based on @@ -72,6 +74,9 @@ type Batcher struct { pending []waiter // items waiting to be handled nHandlers int // number of currently running handler goroutines shutdown bool + + batchSizeTimeout time.Time // the time that len(pending) < opts.MinBatchSize, or zero time + batchTimeoutRunning int32 // atomic counter checking whether timeout wait is running } // Message is larger than the maximum batch byte size @@ -96,6 +101,9 @@ type Options struct { MaxBatchSize int // Maximum bytesize of a batch. 0 means no limit. MaxBatchByteSize int + // BatchTimeout the maximum time a batch can exist under MinBatchSize + // before being sent anyway. + BatchTimeout time.Duration } // newOptionsWithDefaults returns Options with defaults applied to opts. @@ -201,12 +209,28 @@ func (b *Batcher) AddNoWait(item interface{}) <-chan error { // If we can start a handler, do so with the item just added and any others that are pending. batch := b.nextBatch() if batch != nil { - b.wg.Add(1) - go func() { - b.callHandler(batch) - b.wg.Done() - }() - b.nHandlers++ + b.handleBatch(batch) + } + + if batch == nil && len(b.pending) > 0 { + // If the batch size timeout is zero, this is one of the first items to + // be added to the batch under the minimum batch size. Record when this + // happens so that .nextBatch() can grab the batch on timeout. + if b.batchSizeTimeout.IsZero() { + b.batchSizeTimeout = time.Now() + } + + // Ensure that we send the batch after the given timeout. Only one + // concurrent process + if atomic.CompareAndSwapInt32(&b.batchTimeoutRunning, 0, 1) { + go func() { + <-time.After(b.opts.BatchTimeout) + batch = b.nextBatch() + if batch != nil { + b.handleBatch(batch) + } + }() + } } } // If we can't start a handler, then one of the currently running handlers will @@ -214,14 +238,33 @@ func (b *Batcher) AddNoWait(item interface{}) <-chan error { return c } +func (b *Batcher) handleBatch(batch []waiter) { + if batch == nil || len(batch) == 0 { + return + } + + b.wg.Add(1) + go func() { + b.callHandler(batch) + b.wg.Done() + }() + b.nHandlers++ +} + // nextBatch returns the batch to process, and updates b.pending. // It returns nil if there's no batch ready for processing. // b.mu must be held. func (b *Batcher) nextBatch() []waiter { - if len(b.pending) < b.opts.MinBatchSize { + // if there's a min batch size, only skip if we haven't yet waited for the batch timeout + if len(b.pending) < b.opts.MinBatchSize && time.Since(b.batchSizeTimeout) < b.opts.BatchTimeout { return nil } + if len(b.pending) < b.opts.MinBatchSize { + // reset the timeout counter to zero time + b.batchSizeTimeout = time.Time{} + } + if b.opts.MaxBatchByteSize == 0 && (b.opts.MaxBatchSize == 0 || len(b.pending) <= b.opts.MaxBatchSize) { // Send it all! batch := b.pending