-
Notifications
You must be signed in to change notification settings - Fork 635
Open
Open
Copy link
Labels
Area: HelpersImprovements or additions to the helpImprovements or additions to the helpCategory: BugSomething isn't workingSomething isn't working
Description
In worker.run when the buffer would get full due to a new item, there is a forced flush attempt. If this fails returning false, the current item recently popped from the itemQueue is lost (no callback is called and it is not requeued/reprocessed).
Here is a test reproducing the issue. If the status code is changed to http.StatusOK it passes.
package esutil
import (
"bytes"
"context"
"io"
"log"
"net/http"
"strconv"
"strings"
"testing"
"github.com/elastic/go-elasticsearch/v9"
)
func TestItemLoss(t *testing.T) {
esConfig := elasticsearch.Config{
Transport: &mockTransport{
RoundTripFunc: func(request *http.Request) (*http.Response, error) {
return &http.Response{
// StatusCode: http.StatusOK,
StatusCode: http.StatusInternalServerError,
Body: io.NopCloser(strings.NewReader(`{}`)),
Header: http.Header{
"X-Elastic-Product": []string{"Elasticsearch"},
},
}, nil
},
},
}
client, err := elasticsearch.NewClient(esConfig)
if err != nil {
log.Fatal(err)
}
logbuf := bytes.Buffer{}
logger := log.New(&logbuf, "", 0)
cfg := BulkIndexerConfig{
NumWorkers: 1,
Client: client,
FlushBytes: 100,
DebugLogger: logger,
}
bi, err := NewBulkIndexer(cfg)
if err != nil {
log.Fatal(err)
}
itemSuccessCallback := func(ctx context.Context, item BulkIndexerItem, response BulkIndexerResponseItem) {
logger.Printf("doc %s callback: success\n", response.DocumentID)
}
itemFailureCallback := func(ctx context.Context, item BulkIndexerItem, response BulkIndexerResponseItem, err error) {
logger.Printf("doc %s callback: fail\n", response.DocumentID)
}
bi.Add(context.Background(), BulkIndexerItem{
Action: "index",
DocumentID: strconv.Itoa(1),
Body: strings.NewReader(`{"title":"foo1"}`),
OnSuccess: itemSuccessCallback,
OnFailure: itemFailureCallback,
})
bi.Add(context.Background(), BulkIndexerItem{
Action: "index",
DocumentID: strconv.Itoa(2),
Body: strings.NewReader(`{"title":"foo2"}`),
OnSuccess: itemSuccessCallback,
OnFailure: itemFailureCallback,
})
bi.Add(context.Background(), BulkIndexerItem{
Action: "index",
DocumentID: strconv.Itoa(3),
Body: strings.NewReader(`{"title":"foo3"}`),
OnSuccess: itemSuccessCallback,
OnFailure: itemFailureCallback,
})
bi.Add(context.Background(), BulkIndexerItem{
Action: "index",
DocumentID: strconv.Itoa(4),
Body: strings.NewReader(`{"title":"foo4"}`),
OnSuccess: itemSuccessCallback,
OnFailure: itemFailureCallback,
})
bi.Close(context.Background())
logdata := logbuf.Bytes()
if !bytes.Contains(logdata, []byte(`{"title":"foo1"}`)) {
t.Fatalf("Expected doc 1 to be flushed, got: \n%s", logbuf.String())
}
if !bytes.Contains(logdata, []byte(`{"title":"foo2"}`)) {
t.Fatalf("Expected doc 2 to be flushed, got: \n%s", logbuf.String())
}
if !bytes.Contains(logdata, []byte(`{"title":"foo3"}`)) {
t.Fatalf("Expected doc 3 to be flushed, got: \n%s", logbuf.String())
}
if !bytes.Contains(logdata, []byte(`{"title":"foo4"}`)) {
t.Fatalf("Expected doc 4 to be flushed, got: \n%s", logbuf.String())
}
}Metadata
Metadata
Assignees
Labels
Area: HelpersImprovements or additions to the helpImprovements or additions to the helpCategory: BugSomething isn't workingSomething isn't working