Skip to content

Commit

Permalink
pubsub: add batch timeouts
Browse files Browse the repository at this point in the history
This PR adds a timeout option to batcher.  The timeout specifies how
long the batcher waits before sending batched messages, even if the
message length is under MinBatchSize.

This ensures delivery guarantees within a given time period if there
are low-throughput messages with a higher min batch size.

To do:

- [x] Implement batch timeouts in options
- [x] Only allow one concurrent request to check for timeouts
- [x] Handle timeouts when grabbing next batch
- [ ] Tests
  • Loading branch information
tonyhb committed Feb 27, 2024
1 parent 7672fca commit 7a77e0b
Showing 1 changed file with 45 additions and 2 deletions.
47 changes: 45 additions & 2 deletions pubsub/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"errors"
"reflect"
"sync"
"sync/atomic"
"time"
)

// Split determines how to split n (representing n items) into batches based on
Expand Down Expand Up @@ -72,6 +74,9 @@ type Batcher struct {
pending []waiter // items waiting to be handled
nHandlers int // number of currently running handler goroutines
shutdown bool

batchSizeTimeout time.Time // the time that len(pending) < opts.MinBatchSize, or zero time
batchTimeoutRunning int32 // atomic counter checking whether timeout wait is running
}

// Message is larger than the maximum batch byte size
Expand All @@ -96,6 +101,9 @@ type Options struct {
MaxBatchSize int
// Maximum bytesize of a batch. 0 means no limit.
MaxBatchByteSize int
// BatchTimeout the maximum time a batch can exist under MinBatchSize
// before being sent anyway.
BatchTimeout time.Duration
}

// newOptionsWithDefaults returns Options with defaults applied to opts.
Expand Down Expand Up @@ -200,7 +208,30 @@ func (b *Batcher) AddNoWait(item interface{}) <-chan error {
if b.nHandlers < b.opts.MaxHandlers {
// If we can start a handler, do so with the item just added and any others that are pending.
batch := b.nextBatch()
b.handleBatch(batch)
if batch != nil {
b.handleBatch(batch)
}

if batch == nil && len(b.pending) > 0 {
// If the batch size timeout is zero, this is one of the first items to
// be added to the batch under the minimum batch size. Record when this
// happens so that .nextBatch() can grab the batch on timeout.
if b.batchSizeTimeout.IsZero() {
b.batchSizeTimeout = time.Now()
}
// Ensure that we send the batch after the given timeout. Only one
// concurrent process can run this goroutine, ensuring that we don't
// duplicate work.
if atomic.CompareAndSwapInt32(&b.batchTimeoutRunning, 0, 1) {
go func() {
<-time.After(b.opts.BatchTimeout)
batch = b.nextBatch()
if batch != nil {
b.handleBatch(batch)
}
}()
}
}
}
// If we can't start a handler, then one of the currently running handlers will
// take our item.
Expand All @@ -227,13 +258,25 @@ func (b *Batcher) nextBatch() []waiter {
if len(b.pending) < b.opts.MinBatchSize {
// We handle minimum batch sizes depending on specific
// situations.
// XXX: If we allow max batch lifetimes, handle that here.
if time.Since(b.batchSizeTimeout) < b.opts.BatchTimeout {
// If we're within the max batch lifetime, respect minimum batch
// sizes and return nil.
return nil
}
if b.shutdown == false {
// If we're not shutting down, respect minimums. If we're
// shutting down, though, we ignore minimums to flush the
// entire batch.
return nil
}
// At this point, either we're shutting down or we've we've waited
// too long for the minimum size to be met. We're going to proceed
// with flushing the batch.
}

if len(b.pending) < b.opts.MinBatchSize {
// reset the timeout counter to zero time
b.batchSizeTimeout = time.Time{}
}

if b.opts.MaxBatchByteSize == 0 && (b.opts.MaxBatchSize == 0 || len(b.pending) <= b.opts.MaxBatchSize) {
Expand Down

0 comments on commit 7a77e0b

Please sign in to comment.