From 70026ab03405ec4a88050ff7b9ce5324ec247a58 Mon Sep 17 00:00:00 2001 From: Stephanie Wei Date: Wed, 15 Oct 2025 14:51:01 -0400 Subject: [PATCH 1/2] Add queue size multiplier config to BulkIndexer --- esutil/bulk_indexer.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/esutil/bulk_indexer.go b/esutil/bulk_indexer.go index 1d758b1c9b..c1e9663064 100644 --- a/esutil/bulk_indexer.go +++ b/esutil/bulk_indexer.go @@ -54,9 +54,10 @@ type BulkIndexer interface { // BulkIndexerConfig represents configuration of the indexer. type BulkIndexerConfig struct { - NumWorkers int // The number of workers. Defaults to runtime.NumCPU(). - FlushBytes int // The flush threshold in bytes. Defaults to 5MB. - FlushInterval time.Duration // The flush threshold as duration. Defaults to 30sec. + NumWorkers int // The number of workers. Defaults to runtime.NumCPU(). + FlushBytes int // The flush threshold in bytes. Defaults to 5MB. + FlushInterval time.Duration // The flush threshold as duration. Defaults to 30sec. + QueueSizeMultiplier int // The multiplier on the size of the worker queue. Defaults to 1. Client esapi.Transport // The Elasticsearch client. Decoder BulkResponseJSONDecoder // A custom JSON decoder. @@ -301,6 +302,10 @@ func NewBulkIndexer(cfg BulkIndexerConfig) (BulkIndexer, error) { cfg.FlushInterval = 30 * time.Second } + if cfg.QueueSizeMultiplier == 0 { + cfg.QueueSizeMultiplier = 1 + } + bi := bulkIndexer{ config: cfg, stats: &bulkIndexerStats{}, @@ -371,7 +376,7 @@ func (bi *bulkIndexer) Stats() BulkIndexerStats { // init initializes the bulk indexer. func (bi *bulkIndexer) init() { - bi.queue = make(chan BulkIndexerItem, bi.config.NumWorkers) + bi.queue = make(chan BulkIndexerItem, bi.config.NumWorkers*bi.config.QueueSizeMultiplier) for i := 1; i <= bi.config.NumWorkers; i++ { bi.wg.Add(1) From 2d60cb2be6788bcf65893dc29881b5c18f181eaa Mon Sep 17 00:00:00 2001 From: Matt Devy Date: Thu, 20 Nov 2025 09:15:16 +0000 Subject: [PATCH 2/2] test: add unit tests for QueueSizeMultiplier BulkIndexerConfig value --- esutil/bulk_indexer.go | 8 +-- esutil/bulk_indexer_internal_test.go | 97 ++++++++++++++++++++++++++++ 2 files changed, 101 insertions(+), 4 deletions(-) diff --git a/esutil/bulk_indexer.go b/esutil/bulk_indexer.go index c1e9663064..95639f7a2b 100644 --- a/esutil/bulk_indexer.go +++ b/esutil/bulk_indexer.go @@ -290,19 +290,19 @@ func NewBulkIndexer(cfg BulkIndexerConfig) (BulkIndexer, error) { cfg.Decoder = defaultJSONDecoder{} } - if cfg.NumWorkers == 0 { + if cfg.NumWorkers <= 0 { cfg.NumWorkers = runtime.NumCPU() } - if cfg.FlushBytes == 0 { + if cfg.FlushBytes <= 0 { cfg.FlushBytes = 5e+6 } - if cfg.FlushInterval == 0 { + if cfg.FlushInterval <= 0 { cfg.FlushInterval = 30 * time.Second } - if cfg.QueueSizeMultiplier == 0 { + if cfg.QueueSizeMultiplier <= 0 { cfg.QueueSizeMultiplier = 1 } diff --git a/esutil/bulk_indexer_internal_test.go b/esutil/bulk_indexer_internal_test.go index 4db4fa38f1..96e7da72df 100644 --- a/esutil/bulk_indexer_internal_test.go +++ b/esutil/bulk_indexer_internal_test.go @@ -186,6 +186,103 @@ func TestBulkIndexer(t *testing.T) { } }) + t.Run("BulkIndexerConfig.QueueSizeMultiplier", func(t *testing.T) { + tests := []struct { + name string + numWorkers int + queueSizeMultiplier int + expectedQueueCap int + }{ + { + name: "Default QueueSizeMultiplier with 1 worker", + numWorkers: 1, + queueSizeMultiplier: 0, + expectedQueueCap: 1, + }, + { + name: "Default QueueSizeMultiplier with 2 workers", + numWorkers: 2, + queueSizeMultiplier: 0, + expectedQueueCap: 2, + }, + { + name: "QueueSizeMultiplier=2 with 1 worker", + numWorkers: 1, + queueSizeMultiplier: 2, + expectedQueueCap: 2, + }, + { + name: "QueueSizeMultiplier=5 with 1 worker", + numWorkers: 1, + queueSizeMultiplier: 5, + expectedQueueCap: 5, + }, + { + name: "QueueSizeMultiplier=10 with 1 worker", + numWorkers: 1, + queueSizeMultiplier: 10, + expectedQueueCap: 10, + }, + { + name: "QueueSizeMultiplier=1 with 4 workers", + numWorkers: 4, + queueSizeMultiplier: 1, + expectedQueueCap: 4, + }, + { + name: "QueueSizeMultiplier=2 with 4 workers", + numWorkers: 4, + queueSizeMultiplier: 2, + expectedQueueCap: 8, + }, + { + name: "QueueSizeMultiplier=5 with 3 workers", + numWorkers: 3, + queueSizeMultiplier: 5, + expectedQueueCap: 15, + }, + { + name: "QueueSizeMultiplier=-1 with 1 worker", + numWorkers: 1, + queueSizeMultiplier: -1, + expectedQueueCap: 1, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + es, err := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{}}) + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + + cfg := BulkIndexerConfig{ + NumWorkers: tt.numWorkers, + QueueSizeMultiplier: tt.queueSizeMultiplier, + Client: es, + } + + bi, err := NewBulkIndexer(cfg) + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + + bir, ok := bi.(*bulkIndexer) + if !ok { + t.Fatalf("Unexpected type: %T", bi) + } + + if queueCap := cap(bir.queue); queueCap != tt.expectedQueueCap { + t.Errorf("Unexpected queue capacity: want=%d, got=%d", tt.expectedQueueCap, queueCap) + } + + // Clean up + _ = bi.Close(context.Background()) + }) + } + }) + t.Run("Add() Timeout", func(t *testing.T) { es, _ := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{}}) bi, _ := NewBulkIndexer(BulkIndexerConfig{NumWorkers: 1, Client: es})