Skip to content

Commit

Permalink
pubsub: add batch timeouts
Browse files Browse the repository at this point in the history
This PR adds a timeout option to batcher.  The timeout specifies how
long the batcher waits before sending batched messages, even if the
message length is under MinBatchSize.

This ensures delivery guarantees within a given time period if there
are low-throughput messages with a higher min batch size.

To do:

- [x] Implement batch timeouts in options
- [x] Only allow one concurrent request to check for timeouts
- [x] Handle timeouts when grabbing next batch
- [ ] Tests
- [ ] Check to see if this will be accepted upstream
  • Loading branch information
tonyhb committed Feb 23, 2024
1 parent 5ec203b commit 7d91fcc
Showing 1 changed file with 50 additions and 7 deletions.
57 changes: 50 additions & 7 deletions pubsub/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"errors"
"reflect"
"sync"
"sync/atomic"
"time"
)

// Split determines how to split n (representing n items) into batches based on
Expand Down Expand Up @@ -72,6 +74,9 @@ type Batcher struct {
pending []waiter // items waiting to be handled
nHandlers int // number of currently running handler goroutines
shutdown bool

batchSizeTimeout time.Time // the time that len(pending) < opts.MinBatchSize, or zero time
batchTimeoutRunning int32 // atomic counter checking whether timeout wait is running
}

// Message is larger than the maximum batch byte size
Expand All @@ -96,6 +101,9 @@ type Options struct {
MaxBatchSize int
// Maximum bytesize of a batch. 0 means no limit.
MaxBatchByteSize int
// BatchTimeout the maximum time a batch can exist under MinBatchSize
// before being sent anyway.
BatchTimeout time.Duration
}

// newOptionsWithDefaults returns Options with defaults applied to opts.
Expand Down Expand Up @@ -201,27 +209,62 @@ func (b *Batcher) AddNoWait(item interface{}) <-chan error {
// If we can start a handler, do so with the item just added and any others that are pending.
batch := b.nextBatch()
if batch != nil {
b.wg.Add(1)
go func() {
b.callHandler(batch)
b.wg.Done()
}()
b.nHandlers++
b.handleBatch(batch)
}

if batch == nil && len(b.pending) > 0 {
// If the batch size timeout is zero, this is one of the first items to
// be added to the batch under the minimum batch size. Record when this
// happens so that .nextBatch() can grab the batch on timeout.
if b.batchSizeTimeout.IsZero() {
b.batchSizeTimeout = time.Now()
}

// Ensure that we send the batch after the given timeout. Only one
// concurrent process
if atomic.CompareAndSwapInt32(&b.batchTimeoutRunning, 0, 1) {
go func() {
<-time.After(b.opts.BatchTimeout)
batch = b.nextBatch()
if batch != nil {
b.handleBatch(batch)
}
}()
}
}
}
// If we can't start a handler, then one of the currently running handlers will
// take our item.
return c
}

func (b *Batcher) handleBatch(batch []waiter) {
if batch == nil || len(batch) == 0 {
return
}

b.wg.Add(1)
go func() {
b.callHandler(batch)
b.wg.Done()
}()
b.nHandlers++
}

// nextBatch returns the batch to process, and updates b.pending.
// It returns nil if there's no batch ready for processing.
// b.mu must be held.
func (b *Batcher) nextBatch() []waiter {
if len(b.pending) < b.opts.MinBatchSize {
// if there's a min batch size, only skip if we haven't yet waited for the batch timeout
if len(b.pending) < b.opts.MinBatchSize && time.Since(b.batchSizeTimeout) < b.opts.BatchTimeout {
return nil
}

if len(b.pending) < b.opts.MinBatchSize {
// reset the timeout counter to zero time
b.batchSizeTimeout = time.Time{}
}

if b.opts.MaxBatchByteSize == 0 && (b.opts.MaxBatchSize == 0 || len(b.pending) <= b.opts.MaxBatchSize) {
// Send it all!
batch := b.pending
Expand Down

0 comments on commit 7d91fcc

Please sign in to comment.