Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,7 @@ testdata

# Node modules
node_modules/
internal_docs/

# Coverage
coverage.html
25 changes: 12 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func main() {

### Why You'll Love This Batcher

* **⚡ Blazing Performance** – Process millions of items with minimal overhead ([benchmarks](#benchmark-results): 145 ns/op)
* **⚡ Blazing Performance** – Process millions of items with minimal overhead ([benchmarks](#benchmark-results): 135 ns/op)
* **🧠 Smart Batching** – Auto-groups by size or time interval, whichever comes first
* **🔁 Built-in Deduplication** – Optional dedup ensures each item is processed only once
* **🛡️ Thread-Safe by Design** – Concurrent Put() from multiple goroutines without worry
Expand Down Expand Up @@ -379,18 +379,17 @@ make bench

| Benchmark | Description | ns/op | B/op | allocs/op |
|--------------------------------------------------------------------------------------|----------------------------------|--------:|------:|----------:|
| [BenchmarkBatcherPut](batcher_comprehensive_benchmark_test.go) | Basic Put operation | 145.2 | 11 | 0 |
| [BenchmarkBatcherPutParallel](batcher_comprehensive_benchmark_test.go) | Concurrent Put operations | 308.1 | 11 | 0 |
| [BenchmarkBatcherTrigger](batcher_comprehensive_benchmark_test.go) | Manual batch trigger | 466.2 | 248 | 3 |
| [BenchmarkBatcherWithBackground/Foreground](batcher_comprehensive_benchmark_test.go) | Foreground processing | 146.0 | 61 | 0 |
| [BenchmarkTimePartitionedMapSet](batcher_comprehensive_benchmark_test.go) | Map Set operation | 248.0 | 301 | 2 |
| [BenchmarkTimePartitionedMapGet](batcher_comprehensive_benchmark_test.go) | Map Get operation | 169.2 | 236 | 2 |
| [BenchmarkTimePartitionedMapDelete](batcher_comprehensive_benchmark_test.go) | Map Delete operation | 623.2 | 343 | 3 |
| [BenchmarkTimePartitionedMapCount](batcher_comprehensive_benchmark_test.go) | Map Count operation | 0.54 | 0 | 0 |
| [BenchmarkTimePartitionedMapConcurrent](batcher_comprehensive_benchmark_test.go) | Concurrent map operations | 345.1 | 258 | 2 |
| [BenchmarkBatcherWithDedupPut](batcher_comprehensive_benchmark_test.go) | Put with deduplication | 425.3 | 402 | 4 |
| [BenchmarkBatcher](batcher_benchmark_test.go) | Full batch processing (1M items) | 1,193ms | 895MB | 3.6M |
| [BenchmarkBatcherWithDeduplication](batcher_benchmark_test.go) | Deduplication processing | 803ms | 530MB | 5.9M |
| [BenchmarkBatcherPut](batcher_comprehensive_benchmark_test.go) | Basic Put operation | 135.1 | 8 | 0 |
| [BenchmarkBatcherPutParallel](batcher_comprehensive_benchmark_test.go) | Concurrent Put operations | 310.0 | 9 | 0 |
| [BenchmarkPutComparison/Put](benchmark_comparison_test.go) | Put operation (non-blocking) | 300.7 | 9 | 0 |
| [BenchmarkPutComparison/PutWithPool](benchmark_comparison_test.go) | Put with slice pooling | 309.9 | 1 | 0 |
| [BenchmarkWithPoolComparison/Batcher](benchmark_comparison_test.go) | Standard batcher | 171.2 | 18 | 1 |
| [BenchmarkWithPoolComparison/WithPool](benchmark_comparison_test.go) | Pooled batcher | 184.0 | 9 | 1 |
| [BenchmarkTimePartitionedMapSet](batcher_comprehensive_benchmark_test.go) | Map Set operation (bloom filter) | 366.7 | 147 | 6 |
| [BenchmarkTimePartitionedMapGet](batcher_comprehensive_benchmark_test.go) | Map Get operation (bloom filter) | 80.5 | 39 | 2 |
| [BenchmarkBatcherWithDedupPut](batcher_comprehensive_benchmark_test.go) | Put with deduplication | 740.1 | 166 | 7 |
| [BenchmarkBatcher](batcher_benchmark_test.go) | Full batch processing (1M items) | 1,081ms | 710MB | 1.9M |
| [BenchmarkBatcherWithDeduplication](batcher_benchmark_test.go) | Deduplication processing | 90.7 | 13 | 0 |

> Performance benchmarks for the core functions in this library, executed on an Apple M1 Max (ARM64).
> The benchmarks demonstrate excellent performance with minimal allocations for basic operations.
Expand Down
114 changes: 98 additions & 16 deletions batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
package batcher

import (
"sync"
"time"
)

Expand All @@ -55,6 +56,8 @@ import (
// - ch: Buffered channel for receiving items to batch
// - triggerCh: Channel for manual batch processing triggers
// - background: If true, batch processing happens in a separate goroutine
// - usePool: If true, uses sync.Pool for slice reuse to reduce allocations
// - pool: Optional sync.Pool for reusing batch slices
//
// Notes:
// - The Batcher is thread-safe and can be used concurrently
Expand All @@ -68,6 +71,8 @@ type Batcher[T any] struct {
ch chan *T
triggerCh chan struct{}
background bool
usePool bool
pool *sync.Pool
}

// New creates a new Batcher instance with the specified configuration.
Expand Down Expand Up @@ -104,18 +109,56 @@ func New[T any](size int, timeout time.Duration, fn func(batch []*T), background
ch: make(chan *T, size*64),
triggerCh: make(chan struct{}),
background: background,
usePool: false,
}

go b.worker()

return b
}

// Put adds an item to the batch for processing.
// NewWithPool creates a new Batcher instance with slice pooling enabled.
//
// This constructor is similar to New() but initializes a sync.Pool for batch slices
// and uses worker logic that retrieves and returns slices from the pool.
// This can significantly reduce memory allocations and GC pressure in high-throughput scenarios.
//
// Parameters:
// - size: Maximum number of items per batch
// - timeout: Maximum duration to wait before processing an incomplete batch
// - fn: Callback function that processes each batch
// - background: If true, batch processing happens asynchronously
//
// Returns:
// - *Batcher[T]: A configured and running Batcher instance with pooling enabled
func NewWithPool[T any](size int, timeout time.Duration, fn func(batch []*T), background bool) *Batcher[T] {
b := &Batcher[T]{
fn: fn,
size: size,
timeout: timeout,
batch: make([]*T, 0, size),
ch: make(chan *T, size*64),
triggerCh: make(chan struct{}),
background: background,
usePool: true,
pool: &sync.Pool{
New: func() interface{} {
slice := make([]*T, 0, size)
return &slice
},
},
}

go b.worker()

return b
}

// Put adds an item to the batch for processing using non-blocking channel send when possible.
//
// This method sends the item to the internal batching channel where it will be collected
// by the worker goroutine. The item will be included in the next batch that is processed,
// which occurs when the batch size is reached; the timeout expires, or Trigger() is called.
// by the worker goroutine. It attempts a non-blocking send first, falling back to blocking
// only when the channel is full. This reduces goroutine blocking in high-throughput scenarios.
//
// Parameters:
// - item: Pointer to the item to be batched. Must not be nil
Expand All @@ -129,11 +172,18 @@ func New[T any](size int, timeout time.Duration, fn func(batch []*T), background
// - May trigger batch processing if this item completes a full batch
//
// Notes:
// - This method will block if the internal channel buffer is full (64x batch size)
// - Uses fast-path non-blocking send when possible
// - Falls back to blocking send only when channel is full
// - Items are processed in the order they are received
// - The variadic parameter exists for interface compatibility but is not used
func (b *Batcher[T]) Put(item *T, _ ...int) { // Payload size is not used in this implementation
b.ch <- item
select {
case b.ch <- item:
// Fast path - non-blocking send succeeded
default:
// Channel is full, fallback to blocking send
b.ch <- item
}
}

// Trigger forces immediate processing of the current batch.
Expand Down Expand Up @@ -165,17 +215,17 @@ func (b *Batcher[T]) Trigger() {
//
// This function runs as a background goroutine and continuously monitors three conditions
// for batch processing: size limit reached, timeout expired, or manual trigger received.
// It uses a nested loop structure with goto statements for efficient batch processing.
// It uses timer reuse and slice pooling when enabled.
//
// This function performs the following steps:
// - Creates a new timeout timer for each batch cycle
// - Creates a reusable timeout timer (optimization over time.After)
// - Monitors three channels simultaneously using select:
// - Item channel: Receives new items to add to the current batch
// - Timeout channel: Fires when the timeout duration expires
// - Trigger channel: Receives manual trigger signals
//
// - Processes the batch when any trigger condition is met
// - Resets the batch and starts a new cycle
// - Resets the batch and starts a new cycle with efficient slice management
//
// Parameters:
// - None (operates on Batcher receiver fields)
Expand All @@ -187,16 +237,28 @@ func (b *Batcher[T]) Trigger() {
// - Consumes items from the internal channels
// - Invokes the batch processing function (fn) with completed batches
// - May spawn new goroutines if background processing is enabled
// - Uses slice pooling if enabled to reduce allocations
//
// Notes:
// - This function runs indefinitely and cannot be stopped
// - Uses goto for performance optimization to avoid deep nesting
// - Empty batches are not processed (checked before invoking fn)
// - The batch slice is reallocated after each processing to avoid memory issues
// - When background=true, batch processing is non-blocking and concurrent
func (b *Batcher[T]) worker() { //nolint:gocognit // Worker function handles multiple channels and conditions
// - Reuses timers to reduce allocations and GC pressure
// - When usePool=true, manages slice lifecycle through sync.Pool for memory efficiency
func (b *Batcher[T]) worker() { //nolint:gocognit,gocyclo // Worker function handles multiple channels and conditions
// Create a reusable timer for optimization
timer := time.NewTimer(b.timeout)
defer timer.Stop()

for {
expire := time.After(b.timeout)
// Reset the timer for this batch cycle
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
timer.Reset(b.timeout)

for {
select {
Expand All @@ -207,7 +269,7 @@ func (b *Batcher[T]) worker() { //nolint:gocognit // Worker function handles mul
goto saveBatch
}

case <-expire:
case <-timer.C:
goto saveBatch

case <-b.triggerCh:
Expand All @@ -216,16 +278,36 @@ func (b *Batcher[T]) worker() { //nolint:gocognit // Worker function handles mul
}

saveBatch:
if len(b.batch) > 0 {
if len(b.batch) > 0 { //nolint:nestif // Necessary complexity for handling pooling and background modes
batch := b.batch

if b.background {
go b.fn(batch)
if b.usePool {
go func(batch []*T) {
b.fn(batch)
// Return the slice to the pool after processing
slice := batch[:0]
b.pool.Put(&slice)
}(batch)
} else {
go b.fn(batch)
}
} else {
b.fn(batch)
if b.usePool {
// Return the slice to the pool after processing
slice := batch[:0]
b.pool.Put(&slice)
}
}

b.batch = make([]*T, 0, b.size)
// Get a new slice (from pool if enabled, or allocate new)
if b.usePool {
newBatchPtr := b.pool.Get().(*[]*T)
b.batch = *newBatchPtr
} else {
b.batch = make([]*T, 0, b.size)
}
}
}
}
Loading