Skip to content

Commit

Permalink
Split large batches on error instead of dropping them (#34911)
Browse files Browse the repository at this point in the history
  • Loading branch information
faec committed Mar 31, 2023
1 parent 62a2700 commit df59745
Show file tree
Hide file tree
Showing 13 changed files with 598 additions and 96 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Expand Up @@ -60,6 +60,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Fix concurrent map writes when system/process code called from reporter code {pull}32491[32491]
- Fix panics when a processor is closed twice {pull}34647[34647]
- Update elastic-agent-system-metrics to v0.4.6 to allow builds on mips platforms. {pull}34674[34674]
- The Elasticsearch output now splits large requests instead of dropping them when it receives a StatusRequestEntityTooLarge error. {pull}34911[34911]
- Fix Beats started by agent do not respect the allow_older_versions: true configuration flag {issue}34227[34227] {pull}34964[34964]

*Auditbeat*
Expand Down
33 changes: 25 additions & 8 deletions libbeat/outputs/elasticsearch/client.go
Expand Up @@ -189,8 +189,23 @@ func (client *Client) Publish(ctx context.Context, batch publisher.Batch) error
rest, err := client.publishEvents(ctx, events)

switch {
case err == errPayloadTooLarge:
batch.Drop()
case errors.Is(err, errPayloadTooLarge):
if batch.SplitRetry() {
// Report that we split a batch
client.observer.Split()
} else {
// If the batch could not be split, there is no option left but
// to drop it and log the error state.
batch.Drop()
client.observer.Dropped(len(events))
err := apm.CaptureError(ctx, fmt.Errorf("failed to perform bulk index operation: %w", err))
err.Send()
client.log.Error(err)
}
// Returning an error from Publish forces a client close / reconnect,
// so don't pass this error through since it doesn't indicate anything
// wrong with the connection.
return nil
case len(rest) == 0:
batch.ACK()
default:
Expand Down Expand Up @@ -234,7 +249,9 @@ func (client *Client) publishEvents(ctx context.Context, data []publisher.Event)

if sendErr != nil {
if status == http.StatusRequestEntityTooLarge {
sendErr = errPayloadTooLarge
// This error must be handled by splitting the batch, propagate it
// back to Publish instead of reporting it directly
return data, errPayloadTooLarge
}
err := apm.CaptureError(ctx, fmt.Errorf("failed to perform any bulk index operations: %w", sendErr))
err.Send()
Expand All @@ -246,7 +263,7 @@ func (client *Client) publishEvents(ctx context.Context, data []publisher.Event)

client.log.Debugf("PublishEvents: %d events have been published to elasticsearch in %v.",
pubCount,
time.Now().Sub(begin))
time.Since(begin))

// check response for transient errors
var failedEvents []publisher.Event
Expand Down Expand Up @@ -312,13 +329,13 @@ func (client *Client) createEventBulkMeta(version version.V, event *beat.Event)

pipeline, err := client.getPipeline(event)
if err != nil {
err := fmt.Errorf("failed to select pipeline: %v", err)
err := fmt.Errorf("failed to select pipeline: %w", err)
return nil, err
}

index, err := client.index.Select(event)
if err != nil {
err := fmt.Errorf("failed to select event index: %v", err)
err := fmt.Errorf("failed to select event index: %w", err)
return nil, err
}

Expand Down Expand Up @@ -351,7 +368,7 @@ func (client *Client) createEventBulkMeta(version version.V, event *beat.Event)
func (client *Client) getPipeline(event *beat.Event) (string, error) {
if event.Meta != nil {
pipeline, err := events.GetMetaStringValue(*event, events.FieldMetaPipeline)
if err == mapstr.ErrKeyNotFound {
if errors.Is(err, mapstr.ErrKeyNotFound) {
return "", nil
}
if err != nil {
Expand Down Expand Up @@ -417,7 +434,7 @@ func (client *Client) bulkCollectPublishFails(result eslegclient.BulkResult, dat
dead_letter_marker_field: true,
}
} else {
data[i].Content.Meta.Put(dead_letter_marker_field, true)
data[i].Content.Meta[dead_letter_marker_field] = true
}
data[i].Content.Fields = mapstr.M{
"message": data[i].Content.Fields.String(),
Expand Down

0 comments on commit df59745

Please sign in to comment.