Skip to content

Commit

Permalink
add PoolCompressor option and pass it to elastictransport.Config (#840)
Browse files Browse the repository at this point in the history
  • Loading branch information
pakio committed Apr 5, 2024
1 parent 2edc522 commit c5c2b62
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 0 deletions.
2 changes: 2 additions & 0 deletions elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type Config struct {

CompressRequestBody bool // Default: false.
CompressRequestBodyLevel int // Default: gzip.DefaultCompression.
PoolCompressor bool // If true, a sync.Pool based gzip writer is used. Default: false.

DiscoverNodesOnStart bool // Discover nodes when initializing the client. Default: false.
DiscoverNodesInterval time.Duration // Discover nodes periodically. Default: disabled.
Expand Down Expand Up @@ -294,6 +295,7 @@ func newTransport(cfg Config) (*elastictransport.Client, error) {

CompressRequestBody: cfg.CompressRequestBody,
CompressRequestBodyLevel: cfg.CompressRequestBodyLevel,
PoolCompressor: cfg.PoolCompressor,

EnableMetrics: cfg.EnableMetrics,
EnableDebugLogger: cfg.EnableDebugLogger,
Expand Down
10 changes: 10 additions & 0 deletions esutil/bulk_indexer_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func TestBulkIndexerIntegration(t *testing.T) {
name string
CompressRequestBodyEnabled bool
CompressRequestBodyLevel int
PoolCompressor bool
}{
{
name: "Without body compression",
Expand All @@ -57,6 +58,11 @@ func TestBulkIndexerIntegration(t *testing.T) {
CompressRequestBodyEnabled: true,
CompressRequestBodyLevel: gzip.BestSpeed,
},
{
name: "With body compression (sync.Pool)",
CompressRequestBodyEnabled: true,
PoolCompressor: true,
},
}

for _, tt := range testCases {
Expand All @@ -68,6 +74,7 @@ func TestBulkIndexerIntegration(t *testing.T) {
es, _ := elasticsearch.NewClient(elasticsearch.Config{
CompressRequestBody: tt.CompressRequestBodyEnabled,
CompressRequestBodyLevel: tt.CompressRequestBodyLevel,
PoolCompressor: tt.PoolCompressor,
Logger: &elastictransport.ColorLogger{Output: os.Stdout},
})

Expand Down Expand Up @@ -134,6 +141,7 @@ func TestBulkIndexerIntegration(t *testing.T) {
es, _ := elasticsearch.NewClient(elasticsearch.Config{
CompressRequestBody: tt.CompressRequestBodyEnabled,
CompressRequestBodyLevel: tt.CompressRequestBodyLevel,
PoolCompressor: tt.PoolCompressor,
Logger: &elastictransport.ColorLogger{Output: os.Stdout},
})

Expand Down Expand Up @@ -203,6 +211,7 @@ func TestBulkIndexerIntegration(t *testing.T) {
es, _ := elasticsearch.NewClient(elasticsearch.Config{
CompressRequestBody: tt.CompressRequestBodyEnabled,
CompressRequestBodyLevel: tt.CompressRequestBodyLevel,
PoolCompressor: tt.PoolCompressor,
Logger: &elastictransport.ColorLogger{Output: os.Stdout},
})

Expand Down Expand Up @@ -265,6 +274,7 @@ func TestBulkIndexerIntegration(t *testing.T) {
es, _ := elasticsearch.NewClient(elasticsearch.Config{
CompressRequestBody: tt.CompressRequestBodyEnabled,
CompressRequestBodyLevel: tt.CompressRequestBodyLevel,
PoolCompressor: tt.PoolCompressor,
Logger: &elastictransport.ColorLogger{Output: os.Stdout, EnableRequestBody: true, EnableResponseBody: true},
})

Expand Down

0 comments on commit c5c2b62

Please sign in to comment.