From eacf9e58bbb2d1bfee48af1f0f258dcc8b825c22 Mon Sep 17 00:00:00 2001 From: Shunsuke Otani Date: Thu, 17 Apr 2025 21:41:22 +0900 Subject: [PATCH] esutil: Modify the `BulkIndexerConfig.Client` type to `esapi.Transport` (#957) --- esutil/bulk_indexer.go | 2 +- esutil/bulk_indexer_internal_test.go | 187 +++++++++++++++------------ 2 files changed, 106 insertions(+), 83 deletions(-) diff --git a/esutil/bulk_indexer.go b/esutil/bulk_indexer.go index 8faaa6459d..1d758b1c9b 100644 --- a/esutil/bulk_indexer.go +++ b/esutil/bulk_indexer.go @@ -58,7 +58,7 @@ type BulkIndexerConfig struct { FlushBytes int // The flush threshold in bytes. Defaults to 5MB. FlushInterval time.Duration // The flush threshold as duration. Defaults to 30sec. - Client *elasticsearch.Client // The Elasticsearch client. + Client esapi.Transport // The Elasticsearch client. Decoder BulkResponseJSONDecoder // A custom JSON decoder. DebugLogger BulkIndexerDebugLogger // An optional logger for debugging. diff --git a/esutil/bulk_indexer_internal_test.go b/esutil/bulk_indexer_internal_test.go index f89e9ba02c..4db4fa38f1 100644 --- a/esutil/bulk_indexer_internal_test.go +++ b/esutil/bulk_indexer_internal_test.go @@ -62,104 +62,127 @@ func (t *mockTransport) RoundTrip(req *http.Request) (*http.Response, error) { func TestBulkIndexer(t *testing.T) { t.Run("Basic", func(t *testing.T) { - var ( - wg sync.WaitGroup + tests := []struct { + name string + makeClient func(cfg elasticsearch.Config) (esapi.Transport, error) + }{ + { + name: "Client", + makeClient: func(cfg elasticsearch.Config) (esapi.Transport, error) { + return elasticsearch.NewClient(cfg) + }, + }, + { + name: "TypedClient", + makeClient: func(cfg elasticsearch.Config) (esapi.Transport, error) { + return elasticsearch.NewTypedClient(cfg) + }, + }, + } + for _, tt := range tests { + tt := tt - countReqs int - testfile string - numItems = 6 - ) + t.Run(tt.name, func(t *testing.T) { + var ( + wg sync.WaitGroup + + countReqs int + testfile string + numItems = 6 + ) + + es, _ := tt.makeClient(elasticsearch.Config{Transport: &mockTransport{ + RoundTripFunc: func(*http.Request) (*http.Response, error) { + countReqs++ + switch countReqs { + case 1: + testfile = "testdata/bulk_response_1a.json" + case 2: + testfile = "testdata/bulk_response_1b.json" + case 3: + testfile = "testdata/bulk_response_1c.json" + } + bodyContent, _ := ioutil.ReadFile(testfile) + return &http.Response{ + Body: ioutil.NopCloser(bytes.NewBuffer(bodyContent)), + Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}}, + }, nil + }, + }}) - es, _ := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{ - RoundTripFunc: func(*http.Request) (*http.Response, error) { - countReqs++ - switch countReqs { - case 1: - testfile = "testdata/bulk_response_1a.json" - case 2: - testfile = "testdata/bulk_response_1b.json" - case 3: - testfile = "testdata/bulk_response_1c.json" + cfg := BulkIndexerConfig{ + NumWorkers: 1, + FlushBytes: 39 * 2, // 38 bytes header + body, times 2 to match 2 responses per file in testdata + FlushInterval: time.Hour, // Disable auto-flushing, because response doesn't match number of items + Client: es} + if os.Getenv("DEBUG") != "" { + cfg.DebugLogger = log.New(os.Stdout, "", 0) } - bodyContent, _ := ioutil.ReadFile(testfile) - return &http.Response{ - Body: ioutil.NopCloser(bytes.NewBuffer(bodyContent)), - Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}}, - }, nil - }, - }}) - cfg := BulkIndexerConfig{ - NumWorkers: 1, - FlushBytes: 39 * 2, // 38 bytes header + body, times 2 to match 2 responses per file in testdata - FlushInterval: time.Hour, // Disable auto-flushing, because response doesn't match number of items - Client: es} - if os.Getenv("DEBUG") != "" { - cfg.DebugLogger = log.New(os.Stdout, "", 0) - } - - bi, _ := NewBulkIndexer(cfg) + bi, _ := NewBulkIndexer(cfg) + + for i := 1; i <= numItems; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + err := bi.Add(context.Background(), BulkIndexerItem{ + Action: "foo", + DocumentID: strconv.Itoa(i), + Body: strings.NewReader(fmt.Sprintf(`{"title":"foo-%d"}`, i)), + }) + if err != nil { + t.Errorf("Unexpected error: %s", err) + return + } + }(i) + } + wg.Wait() - for i := 1; i <= numItems; i++ { - wg.Add(1) - go func(i int) { - defer wg.Done() - err := bi.Add(context.Background(), BulkIndexerItem{ - Action: "foo", - DocumentID: strconv.Itoa(i), - Body: strings.NewReader(fmt.Sprintf(`{"title":"foo-%d"}`, i)), - }) - if err != nil { + if err := bi.Close(context.Background()); err != nil { t.Errorf("Unexpected error: %s", err) - return } - }(i) - } - wg.Wait() - if err := bi.Close(context.Background()); err != nil { - t.Errorf("Unexpected error: %s", err) - } + stats := bi.Stats() - stats := bi.Stats() - - // added = numitems - if stats.NumAdded != uint64(numItems) { - t.Errorf("Unexpected NumAdded: want=%d, got=%d", numItems, stats.NumAdded) - } + // added = numitems + if stats.NumAdded != uint64(numItems) { + t.Errorf("Unexpected NumAdded: want=%d, got=%d", numItems, stats.NumAdded) + } - // flushed = numitems - 1x conflict + 1x not_found - if stats.NumFlushed != uint64(numItems-2) { - t.Errorf("Unexpected NumFlushed: want=%d, got=%d", numItems-2, stats.NumFlushed) - } + // flushed = numitems - 1x conflict + 1x not_found + if stats.NumFlushed != uint64(numItems-2) { + t.Errorf("Unexpected NumFlushed: want=%d, got=%d", numItems-2, stats.NumFlushed) + } - // failed = 1x conflict + 1x not_found - if stats.NumFailed != 2 { - t.Errorf("Unexpected NumFailed: want=%d, got=%d", 2, stats.NumFailed) - } + // failed = 1x conflict + 1x not_found + if stats.NumFailed != 2 { + t.Errorf("Unexpected NumFailed: want=%d, got=%d", 2, stats.NumFailed) + } - // indexed = 1x - if stats.NumIndexed != 1 { - t.Errorf("Unexpected NumIndexed: want=%d, got=%d", 1, stats.NumIndexed) - } + // indexed = 1x + if stats.NumIndexed != 1 { + t.Errorf("Unexpected NumIndexed: want=%d, got=%d", 1, stats.NumIndexed) + } - // created = 1x - if stats.NumCreated != 1 { - t.Errorf("Unexpected NumCreated: want=%d, got=%d", 1, stats.NumCreated) - } + // created = 1x + if stats.NumCreated != 1 { + t.Errorf("Unexpected NumCreated: want=%d, got=%d", 1, stats.NumCreated) + } - // deleted = 1x - if stats.NumDeleted != 1 { - t.Errorf("Unexpected NumDeleted: want=%d, got=%d", 1, stats.NumDeleted) - } + // deleted = 1x + if stats.NumDeleted != 1 { + t.Errorf("Unexpected NumDeleted: want=%d, got=%d", 1, stats.NumDeleted) + } - if stats.NumUpdated != 1 { - t.Errorf("Unexpected NumUpdated: want=%d, got=%d", 1, stats.NumUpdated) - } + if stats.NumUpdated != 1 { + t.Errorf("Unexpected NumUpdated: want=%d, got=%d", 1, stats.NumUpdated) + } - // 3 items * 40 bytes, 2 workers, 1 request per worker - if stats.NumRequests != 3 { - t.Errorf("Unexpected NumRequests: want=%d, got=%d", 3, stats.NumRequests) + // 3 items * 40 bytes, 2 workers, 1 request per worker + if stats.NumRequests != 3 { + t.Errorf("Unexpected NumRequests: want=%d, got=%d", 3, stats.NumRequests) + } + }) } })