Skip to content

Commit

Permalink
Atomically update nHandlers
Browse files Browse the repository at this point in the history
  • Loading branch information
tonyhb committed Mar 1, 2024
1 parent 7672fca commit bfb28ad
Showing 1 changed file with 6 additions and 5 deletions.
11 changes: 6 additions & 5 deletions pubsub/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"errors"
"reflect"
"sync"
"sync/atomic"
)

// Split determines how to split n (representing n items) into batches based on
Expand Down Expand Up @@ -70,7 +71,7 @@ type Batcher struct {

mu sync.Mutex
pending []waiter // items waiting to be handled
nHandlers int // number of currently running handler goroutines
nHandlers int32 // number of currently running handler goroutines
shutdown bool
}

Expand Down Expand Up @@ -197,7 +198,7 @@ func (b *Batcher) AddNoWait(item interface{}) <-chan error {

// Add the item to the pending list.
b.pending = append(b.pending, waiter{item, c})
if b.nHandlers < b.opts.MaxHandlers {
if atomic.LoadInt32(&b.nHandlers) < int32(b.opts.MaxHandlers) {
// If we can start a handler, do so with the item just added and any others that are pending.
batch := b.nextBatch()
b.handleBatch(batch)
Expand All @@ -217,7 +218,7 @@ func (b *Batcher) handleBatch(batch []waiter) {
b.callHandler(batch)
b.wg.Done()
}()
b.nHandlers++
atomic.AddInt32(&b.nHandlers, 1)
}

// nextBatch returns the batch to process, and updates b.pending.
Expand Down Expand Up @@ -284,7 +285,7 @@ func (b *Batcher) callHandler(batch []waiter) {
// always get to run.
batch = b.nextBatch()
if batch == nil {
b.nHandlers--
atomic.AddInt32(&b.nHandlers, -1)
}
b.mu.Unlock()
}
Expand All @@ -300,7 +301,7 @@ func (b *Batcher) Shutdown() {

// On shutdown, ensure that we attempt to flush any pending items
// if there's a minimum batch size.
if b.nHandlers < b.opts.MaxHandlers {
if atomic.LoadInt32(&b.nHandlers) < int32(b.opts.MaxHandlers) {
batch := b.nextBatch()
b.handleBatch(batch)
}
Expand Down

0 comments on commit bfb28ad

Please sign in to comment.