From c5c2b624d01d3dd33de7d44c8838fa9aa6a47332 Mon Sep 17 00:00:00 2001 From: Kazuma Arimura Date: Sat, 6 Apr 2024 00:20:59 +0900 Subject: [PATCH] add PoolCompressor option and pass it to elastictransport.Config (#840) --- elasticsearch.go | 2 ++ esutil/bulk_indexer_integration_test.go | 10 ++++++++++ 2 files changed, 12 insertions(+) diff --git a/elasticsearch.go b/elasticsearch.go index dcfc98cd02..c736ded991 100644 --- a/elasticsearch.go +++ b/elasticsearch.go @@ -91,6 +91,7 @@ type Config struct { CompressRequestBody bool // Default: false. CompressRequestBodyLevel int // Default: gzip.DefaultCompression. + PoolCompressor bool // If true, a sync.Pool based gzip writer is used. Default: false. DiscoverNodesOnStart bool // Discover nodes when initializing the client. Default: false. DiscoverNodesInterval time.Duration // Discover nodes periodically. Default: disabled. @@ -294,6 +295,7 @@ func newTransport(cfg Config) (*elastictransport.Client, error) { CompressRequestBody: cfg.CompressRequestBody, CompressRequestBodyLevel: cfg.CompressRequestBodyLevel, + PoolCompressor: cfg.PoolCompressor, EnableMetrics: cfg.EnableMetrics, EnableDebugLogger: cfg.EnableDebugLogger, diff --git a/esutil/bulk_indexer_integration_test.go b/esutil/bulk_indexer_integration_test.go index 4c38cc0a9a..aec38c8f88 100644 --- a/esutil/bulk_indexer_integration_test.go +++ b/esutil/bulk_indexer_integration_test.go @@ -43,6 +43,7 @@ func TestBulkIndexerIntegration(t *testing.T) { name string CompressRequestBodyEnabled bool CompressRequestBodyLevel int + PoolCompressor bool }{ { name: "Without body compression", @@ -57,6 +58,11 @@ func TestBulkIndexerIntegration(t *testing.T) { CompressRequestBodyEnabled: true, CompressRequestBodyLevel: gzip.BestSpeed, }, + { + name: "With body compression (sync.Pool)", + CompressRequestBodyEnabled: true, + PoolCompressor: true, + }, } for _, tt := range testCases { @@ -68,6 +74,7 @@ func TestBulkIndexerIntegration(t *testing.T) { es, _ := elasticsearch.NewClient(elasticsearch.Config{ CompressRequestBody: tt.CompressRequestBodyEnabled, CompressRequestBodyLevel: tt.CompressRequestBodyLevel, + PoolCompressor: tt.PoolCompressor, Logger: &elastictransport.ColorLogger{Output: os.Stdout}, }) @@ -134,6 +141,7 @@ func TestBulkIndexerIntegration(t *testing.T) { es, _ := elasticsearch.NewClient(elasticsearch.Config{ CompressRequestBody: tt.CompressRequestBodyEnabled, CompressRequestBodyLevel: tt.CompressRequestBodyLevel, + PoolCompressor: tt.PoolCompressor, Logger: &elastictransport.ColorLogger{Output: os.Stdout}, }) @@ -203,6 +211,7 @@ func TestBulkIndexerIntegration(t *testing.T) { es, _ := elasticsearch.NewClient(elasticsearch.Config{ CompressRequestBody: tt.CompressRequestBodyEnabled, CompressRequestBodyLevel: tt.CompressRequestBodyLevel, + PoolCompressor: tt.PoolCompressor, Logger: &elastictransport.ColorLogger{Output: os.Stdout}, }) @@ -265,6 +274,7 @@ func TestBulkIndexerIntegration(t *testing.T) { es, _ := elasticsearch.NewClient(elasticsearch.Config{ CompressRequestBody: tt.CompressRequestBodyEnabled, CompressRequestBodyLevel: tt.CompressRequestBodyLevel, + PoolCompressor: tt.PoolCompressor, Logger: &elastictransport.ColorLogger{Output: os.Stdout, EnableRequestBody: true, EnableResponseBody: true}, })