Skip to content

Commit

Permalink
Util: Reinstate item.Body after it is consumed in BulkIndexer
Browse files Browse the repository at this point in the history
To allow accessing the item body in success/error callbacks,
reinstate it when the callbacks are defined.

Closes #161

(cherry picked from commit 55d65cdfc77a15b545e12339d613f9a7270fef2f)
  • Loading branch information
Lindsey Redd authored and karmi committed Jun 30, 2020
1 parent d51b862 commit 09b44b1
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 5 deletions.
22 changes: 20 additions & 2 deletions esutil/bulk_indexer.go
Expand Up @@ -10,6 +10,7 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"runtime"
"strconv"
Expand Down Expand Up @@ -354,7 +355,7 @@ func (w *worker) run() {
continue
}

if err := w.writeBody(item); err != nil {
if err := w.writeBody(&item); err != nil {
if item.OnFailure != nil {
item.OnFailure(ctx, item, BulkIndexerResponseItem{}, err)
}
Expand Down Expand Up @@ -410,15 +411,32 @@ func (w *worker) writeMeta(item BulkIndexerItem) error {

// writeBody writes the item body to the buffer; it must be called under a lock.
//
func (w *worker) writeBody(item BulkIndexerItem) error {
func (w *worker) writeBody(item *BulkIndexerItem) error {
if item.Body != nil {

var getBody func() io.Reader

if item.OnSuccess != nil || item.OnFailure != nil {
var buf bytes.Buffer
buf.ReadFrom(item.Body)
getBody = func() io.Reader {
r := buf
return ioutil.NopCloser(&r)
}
item.Body = getBody()
}

if _, err := w.buf.ReadFrom(item.Body); err != nil {
if w.bi.config.OnError != nil {
w.bi.config.OnError(context.Background(), err)
}
return err
}
w.buf.WriteRune('\n')

if getBody != nil && (item.OnSuccess != nil || item.OnFailure != nil) {
item.Body = getBody()
}
}
return nil
}
Expand Down
29 changes: 26 additions & 3 deletions esutil/bulk_indexer_internal_test.go
Expand Up @@ -232,9 +232,11 @@ func TestBulkIndexer(t *testing.T) {

t.Run("Item Callbacks", func(t *testing.T) {
var (
countSuccessful uint64
countFailed uint64
failedIDs []string
countSuccessful uint64
countFailed uint64
failedIDs []string
successfulItemBodies []string
failedItemBodies []string

numItems = 4
numFailed = 2
Expand All @@ -256,10 +258,22 @@ func TestBulkIndexer(t *testing.T) {

successFunc := func(ctx context.Context, item BulkIndexerItem, res BulkIndexerResponseItem) {
atomic.AddUint64(&countSuccessful, 1)

buf, err := ioutil.ReadAll(item.Body)
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
successfulItemBodies = append(successfulItemBodies, string(buf))
}
failureFunc := func(ctx context.Context, item BulkIndexerItem, res BulkIndexerResponseItem, err error) {
atomic.AddUint64(&countFailed, 1)
failedIDs = append(failedIDs, item.DocumentID)

buf, err := ioutil.ReadAll(item.Body)
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
failedItemBodies = append(failedItemBodies, string(buf))
}

if err := bi.Add(context.Background(), BulkIndexerItem{
Expand All @@ -285,6 +299,7 @@ func TestBulkIndexer(t *testing.T) {
if err := bi.Add(context.Background(), BulkIndexerItem{
Action: "delete",
DocumentID: "2",
Body: strings.NewReader(`{"title":"baz"}`),
OnSuccess: successFunc,
OnFailure: failureFunc,
}); err != nil {
Expand Down Expand Up @@ -343,6 +358,14 @@ func TestBulkIndexer(t *testing.T) {
if !reflect.DeepEqual(failedIDs, []string{"1", "2"}) {
t.Errorf("Unexpected failedIDs: %#v", failedIDs)
}

if !reflect.DeepEqual(successfulItemBodies, []string{`{"title":"foo"}`, `{"doc":{"title":"qux"}}`}) {
t.Errorf("Unexpected successfulItemBodies: %#v", successfulItemBodies)
}

if !reflect.DeepEqual(failedItemBodies, []string{`{"title":"bar"}`, `{"title":"baz"}`}) {
t.Errorf("Unexpected failedItemBodies: %#v", failedItemBodies)
}
})

t.Run("OnFlush callbacks", func(t *testing.T) {
Expand Down

0 comments on commit 09b44b1

Please sign in to comment.