diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index e5b7e706db8..d6b4212b428 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -60,6 +60,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Fix concurrent map writes when system/process code called from reporter code {pull}32491[32491] - Fix panics when a processor is closed twice {pull}34647[34647] - Update elastic-agent-system-metrics to v0.4.6 to allow builds on mips platforms. {pull}34674[34674] +- The Elasticsearch output now splits large requests instead of dropping them when it receives a StatusRequestEntityTooLarge error. {pull}34911[34911] - Fix Beats started by agent do not respect the allow_older_versions: true configuration flag {issue}34227[34227] {pull}34964[34964] *Auditbeat* diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 405b2e3228c..ce4b714215f 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -189,8 +189,23 @@ func (client *Client) Publish(ctx context.Context, batch publisher.Batch) error rest, err := client.publishEvents(ctx, events) switch { - case err == errPayloadTooLarge: - batch.Drop() + case errors.Is(err, errPayloadTooLarge): + if batch.SplitRetry() { + // Report that we split a batch + client.observer.Split() + } else { + // If the batch could not be split, there is no option left but + // to drop it and log the error state. + batch.Drop() + client.observer.Dropped(len(events)) + err := apm.CaptureError(ctx, fmt.Errorf("failed to perform bulk index operation: %w", err)) + err.Send() + client.log.Error(err) + } + // Returning an error from Publish forces a client close / reconnect, + // so don't pass this error through since it doesn't indicate anything + // wrong with the connection. + return nil case len(rest) == 0: batch.ACK() default: @@ -234,7 +249,9 @@ func (client *Client) publishEvents(ctx context.Context, data []publisher.Event) if sendErr != nil { if status == http.StatusRequestEntityTooLarge { - sendErr = errPayloadTooLarge + // This error must be handled by splitting the batch, propagate it + // back to Publish instead of reporting it directly + return data, errPayloadTooLarge } err := apm.CaptureError(ctx, fmt.Errorf("failed to perform any bulk index operations: %w", sendErr)) err.Send() @@ -246,7 +263,7 @@ func (client *Client) publishEvents(ctx context.Context, data []publisher.Event) client.log.Debugf("PublishEvents: %d events have been published to elasticsearch in %v.", pubCount, - time.Now().Sub(begin)) + time.Since(begin)) // check response for transient errors var failedEvents []publisher.Event @@ -312,13 +329,13 @@ func (client *Client) createEventBulkMeta(version version.V, event *beat.Event) pipeline, err := client.getPipeline(event) if err != nil { - err := fmt.Errorf("failed to select pipeline: %v", err) + err := fmt.Errorf("failed to select pipeline: %w", err) return nil, err } index, err := client.index.Select(event) if err != nil { - err := fmt.Errorf("failed to select event index: %v", err) + err := fmt.Errorf("failed to select event index: %w", err) return nil, err } @@ -351,7 +368,7 @@ func (client *Client) createEventBulkMeta(version version.V, event *beat.Event) func (client *Client) getPipeline(event *beat.Event) (string, error) { if event.Meta != nil { pipeline, err := events.GetMetaStringValue(*event, events.FieldMetaPipeline) - if err == mapstr.ErrKeyNotFound { + if errors.Is(err, mapstr.ErrKeyNotFound) { return "", nil } if err != nil { @@ -417,7 +434,7 @@ func (client *Client) bulkCollectPublishFails(result eslegclient.BulkResult, dat dead_letter_marker_field: true, } } else { - data[i].Content.Meta.Put(dead_letter_marker_field, true) + data[i].Content.Meta[dead_letter_marker_field] = true } data[i].Content.Fields = mapstr.M{ "message": data[i].Content.Fields.String(), diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go index 4f25813c513..63f0d59986c 100644 --- a/libbeat/outputs/elasticsearch/client_test.go +++ b/libbeat/outputs/elasticsearch/client_test.go @@ -23,6 +23,7 @@ package elasticsearch import ( "context" "fmt" + "io" "net/http" "net/http/httptest" "strings" @@ -37,9 +38,11 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" "github.com/elastic/beats/v7/libbeat/idxmgmt" + "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/outputs/outest" "github.com/elastic/beats/v7/libbeat/outputs/outil" "github.com/elastic/beats/v7/libbeat/publisher" + "github.com/elastic/beats/v7/libbeat/publisher/pipeline" "github.com/elastic/beats/v7/libbeat/version" c "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" @@ -54,13 +57,11 @@ func (testIndexSelector) Select(event *beat.Event) (string, error) { } type batchMock struct { - // we embed the interface so we are able to implement the interface partially, - // only functions needed for tests are implemented - // if you use a function that is not implemented in the mock it will panic - publisher.Batch events []publisher.Event ack bool drop bool + canSplit bool + didSplit bool retryEvents []publisher.Event } @@ -73,46 +74,68 @@ func (bm *batchMock) ACK() { func (bm *batchMock) Drop() { bm.drop = true } +func (bm *batchMock) Retry() { panic("unimplemented") } +func (bm *batchMock) Cancelled() { panic("unimplemented") } +func (bm *batchMock) FreeEntries() {} +func (bm *batchMock) SplitRetry() bool { + if bm.canSplit { + bm.didSplit = true + } + return bm.canSplit +} func (bm *batchMock) RetryEvents(events []publisher.Event) { bm.retryEvents = events } -func TestPublishStatusCode(t *testing.T) { +func TestPublish(t *testing.T) { + makePublishTestClient := func(t *testing.T, url string) *Client { + client, err := NewClient( + ClientSettings{ + Observer: outputs.NewNilObserver(), + ConnectionSettings: eslegclient.ConnectionSettings{URL: url}, + Index: testIndexSelector{}, + }, + nil, + ) + require.NoError(t, err) + return client + } + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - event := publisher.Event{Content: beat.Event{Fields: mapstr.M{"field": 1}}} - events := []publisher.Event{event} + event1 := publisher.Event{Content: beat.Event{Fields: mapstr.M{"field": 1}}} + event2 := publisher.Event{Content: beat.Event{Fields: mapstr.M{"field": 2}}} + event3 := publisher.Event{Content: beat.Event{Fields: mapstr.M{"field": 3}}} - t.Run("returns pre-defined error and drops batch when 413", func(t *testing.T) { + t.Run("splits large batches on status code 413", func(t *testing.T) { esMock := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusRequestEntityTooLarge) - w.Write([]byte("Request failed to get to the server (status code: 413)")) // actual response from ES + _, _ = w.Write([]byte("Request failed to get to the server (status code: 413)")) // actual response from ES })) defer esMock.Close() + client := makePublishTestClient(t, esMock.URL) - client, err := NewClient( - ClientSettings{ - ConnectionSettings: eslegclient.ConnectionSettings{ - URL: esMock.URL, - }, - Index: testIndexSelector{}, - }, - nil, - ) - assert.NoError(t, err) - - event := publisher.Event{Content: beat.Event{Fields: mapstr.M{"field": 1}}} - events := []publisher.Event{event} + // Try publishing a batch that can be split batch := &batchMock{ - events: events, + events: []publisher.Event{event1}, + canSplit: true, } + err := client.Publish(ctx, batch) + assert.NoError(t, err, "Publish should split the batch without error") + assert.True(t, batch.didSplit, "batch should be split") + + // Try publishing a batch that cannot be split + batch = &batchMock{ + events: []publisher.Event{event1}, + canSplit: false, + } err = client.Publish(ctx, batch) - assert.Error(t, err) - assert.Equal(t, errPayloadTooLarge, err, "should be a pre-defined error") - assert.True(t, batch.drop, "should must be dropped") + assert.NoError(t, err, "Publish should drop the batch without error") + assert.False(t, batch.didSplit, "batch should not be split") + assert.True(t, batch.drop, "unsplittable batch should be dropped") }) t.Run("retries the batch if bad HTTP status", func(t *testing.T) { @@ -120,33 +143,113 @@ func TestPublishStatusCode(t *testing.T) { w.WriteHeader(http.StatusInternalServerError) })) defer esMock.Close() - - client, err := NewClient( - ClientSettings{ - ConnectionSettings: eslegclient.ConnectionSettings{ - URL: esMock.URL, - }, - Index: testIndexSelector{}, - }, - nil, - ) - assert.NoError(t, err) + client := makePublishTestClient(t, esMock.URL) batch := &batchMock{ - events: events, + events: []publisher.Event{event1, event2}, } - err = client.Publish(ctx, batch) + err := client.Publish(ctx, batch) assert.Error(t, err) assert.False(t, batch.ack, "should not be acknowledged") - assert.Len(t, batch.retryEvents, len(events), "all events should be in retry") + assert.Len(t, batch.retryEvents, 2, "all events should be retried") + }) + + t.Run("live batches, still too big after split", func(t *testing.T) { + // Test a live (non-mocked) batch where both events by themselves are + // rejected by the server as too large after the initial split. + esMock := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusRequestEntityTooLarge) + _, _ = w.Write([]byte("Request failed to get to the server (status code: 413)")) // actual response from ES + })) + defer esMock.Close() + client := makePublishTestClient(t, esMock.URL) + + // Because our tests don't use a live eventConsumer routine, + // everything will happen synchronously and it's safe to track + // test results directly without atomics/mutexes. + done := false + retryCount := 0 + batch := pipeline.NewBatchForTesting( + []publisher.Event{event1, event2, event3}, + func(b publisher.Batch) { + // The retry function sends the batch back through Publish. + // In a live pipeline it would instead be sent to eventConsumer + // first and then back to Publish when an output worker was + // available. + retryCount++ + err := client.Publish(ctx, b) + assert.NoError(t, err, "Publish should return without error") + }, + func() { done = true }, + ) + err := client.Publish(ctx, batch) + assert.NoError(t, err, "Publish should return without error") + + // For three events there should be four retries in total: + // {[event1], [event2, event3]}, then {[event2], [event3]}. + // "done" should be true because after splitting into individual + // events, all 3 will fail and be dropped. + assert.Equal(t, 4, retryCount, "3-event batch should produce 4 total retries") + assert.True(t, done, "batch should be marked as done") + }) + + t.Run("live batches, one event too big after split", func(t *testing.T) { + // Test a live (non-mocked) batch where a single event is too large + // for the server to ingest but the others are ok. + esMock := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + b, _ := io.ReadAll(r.Body) + body := string(b) + // Reject the batch as too large only if it contains event1 + if strings.Contains(body, "\"field\":1") { + // Report batch too large + w.WriteHeader(http.StatusRequestEntityTooLarge) + _, _ = w.Write([]byte("Request failed to get to the server (status code: 413)")) // actual response from ES + } else { + // Report success with no events dropped + w.WriteHeader(200) + _, _ = io.WriteString(w, "{\"items\": []}") + } + })) + defer esMock.Close() + client := makePublishTestClient(t, esMock.URL) + + // Because our tests don't use a live eventConsumer routine, + // everything will happen synchronously and it's safe to track + // test results directly without atomics/mutexes. + done := false + retryCount := 0 + batch := pipeline.NewBatchForTesting( + []publisher.Event{event1, event2, event3}, + func(b publisher.Batch) { + // The retry function sends the batch back through Publish. + // In a live pipeline it would instead be sent to eventConsumer + // first and then back to Publish when an output worker was + // available. + retryCount++ + err := client.Publish(ctx, b) + assert.NoError(t, err, "Publish should return without error") + }, + func() { done = true }, + ) + err := client.Publish(ctx, batch) + assert.NoError(t, err, "Publish should return without error") + + // There should be two retries: {[event1], [event2, event3]}. + // The first split batch should fail and be dropped since it contains + // event1, the other one should succeed. + // "done" should be true because both split batches are completed + // (one with failure, one with success). + assert.Equal(t, 2, retryCount, "splitting with one large event should produce two retries") + assert.True(t, done, "batch should be marked as done") }) } func TestCollectPublishFailsNone(t *testing.T) { client, err := NewClient( ClientSettings{ + Observer: outputs.NewNilObserver(), NonIndexableAction: "drop", }, nil, @@ -170,6 +273,7 @@ func TestCollectPublishFailsNone(t *testing.T) { func TestCollectPublishFailMiddle(t *testing.T) { client, err := NewClient( ClientSettings{ + Observer: outputs.NewNilObserver(), NonIndexableAction: "drop", }, nil, @@ -199,6 +303,7 @@ func TestCollectPublishFailMiddle(t *testing.T) { func TestCollectPublishFailDeadLetterQueue(t *testing.T) { client, err := NewClient( ClientSettings{ + Observer: outputs.NewNilObserver(), NonIndexableAction: "dead_letter_index", }, nil, @@ -257,6 +362,7 @@ func TestCollectPublishFailDeadLetterQueue(t *testing.T) { func TestCollectPublishFailDrop(t *testing.T) { client, err := NewClient( ClientSettings{ + Observer: outputs.NewNilObserver(), NonIndexableAction: "drop", }, nil, @@ -300,6 +406,7 @@ func TestCollectPublishFailDrop(t *testing.T) { func TestCollectPublishFailAll(t *testing.T) { client, err := NewClient( ClientSettings{ + Observer: outputs.NewNilObserver(), NonIndexableAction: "drop", }, nil, @@ -324,10 +431,11 @@ func TestCollectPublishFailAll(t *testing.T) { } func TestCollectPipelinePublishFail(t *testing.T) { - logp.TestingSetup(logp.WithSelectors("elasticsearch")) + _ = logp.TestingSetup(logp.WithSelectors("elasticsearch")) client, err := NewClient( ClientSettings{ + Observer: outputs.NewNilObserver(), NonIndexableAction: "drop", }, nil, @@ -374,6 +482,7 @@ func TestCollectPipelinePublishFail(t *testing.T) { func BenchmarkCollectPublishFailsNone(b *testing.B) { client, err := NewClient( ClientSettings{ + Observer: outputs.NewNilObserver(), NonIndexableAction: "drop", }, nil, @@ -402,6 +511,7 @@ func BenchmarkCollectPublishFailsNone(b *testing.B) { func BenchmarkCollectPublishFailMiddle(b *testing.B) { client, err := NewClient( ClientSettings{ + Observer: outputs.NewNilObserver(), NonIndexableAction: "drop", }, nil, @@ -431,6 +541,7 @@ func BenchmarkCollectPublishFailMiddle(b *testing.B) { func BenchmarkCollectPublishFailAll(b *testing.B) { client, err := NewClient( ClientSettings{ + Observer: outputs.NewNilObserver(), NonIndexableAction: "drop", }, nil, @@ -479,6 +590,7 @@ func TestClientWithHeaders(t *testing.T) { defer ts.Close() client, err := NewClient(ClientSettings{ + Observer: outputs.NewNilObserver(), ConnectionSettings: eslegclient.ConnectionSettings{ URL: ts.URL, Headers: map[string]string{ @@ -491,7 +603,8 @@ func TestClientWithHeaders(t *testing.T) { assert.NoError(t, err) // simple ping - client.Connect() + err = client.Connect() + assert.NoError(t, err) assert.Equal(t, 1, requestCount) // bulk request @@ -555,6 +668,7 @@ func TestBulkEncodeEvents(t *testing.T) { client, err := NewClient( ClientSettings{ + Observer: outputs.NewNilObserver(), Index: index, Pipeline: pipeline, }, @@ -628,8 +742,9 @@ func TestBulkEncodeEventsWithOpType(t *testing.T) { } } - client, err := NewClient( + client, _ := NewClient( ClientSettings{ + Observer: outputs.NewNilObserver(), Index: index, Pipeline: pipeline, }, @@ -645,8 +760,8 @@ func TestBulkEncodeEventsWithOpType(t *testing.T) { if bulkEventIndex == -1 { continue } - caseOpType, _ := cases[i]["op_type"] - caseMessage, _ := cases[i]["message"].(string) + caseOpType := cases[i]["op_type"] + caseMessage := cases[i]["message"].(string) switch bulkItems[bulkEventIndex].(type) { case eslegclient.BulkCreateAction: validOpTypes := []interface{}{e.OpTypeCreate, nil} @@ -672,6 +787,7 @@ func TestClientWithAPIKey(t *testing.T) { defer ts.Close() client, err := NewClient(ClientSettings{ + Observer: outputs.NewNilObserver(), ConnectionSettings: eslegclient.ConnectionSettings{ URL: ts.URL, APIKey: "hyokHG4BfWk5viKZ172X:o45JUkyuS--yiSAuuxl8Uw", @@ -679,6 +795,10 @@ func TestClientWithAPIKey(t *testing.T) { }, nil) assert.NoError(t, err) + // This connection will fail since the server doesn't return a valid + // response. This is fine since we're just testing the headers in the + // original client request. + //nolint:errcheck // connection doesn't need to succeed client.Connect() assert.Equal(t, "ApiKey aHlva0hHNEJmV2s1dmlLWjE3Mlg6bzQ1SlVreXVTLS15aVNBdXV4bDhVdw==", headers.Get("Authorization")) } diff --git a/libbeat/outputs/metrics.go b/libbeat/outputs/metrics.go index cdcc8464242..77374df3e61 100644 --- a/libbeat/outputs/metrics.go +++ b/libbeat/outputs/metrics.go @@ -35,6 +35,9 @@ type Stats struct { dropped *monitoring.Uint // total number of invalid events dropped by the output tooMany *monitoring.Uint // total number of too many requests replies from output + // Output batch stats + split *monitoring.Uint // total number of batches split for being too large + // // Output network connection stats // @@ -59,6 +62,8 @@ func NewStats(reg *monitoring.Registry) *Stats { active: monitoring.NewUint(reg, "events.active"), tooMany: monitoring.NewUint(reg, "events.toomany"), + split: monitoring.NewUint(reg, "batches.split"), + writeBytes: monitoring.NewUint(reg, "write.bytes"), writeErrors: monitoring.NewUint(reg, "write.errors"), @@ -119,6 +124,12 @@ func (s *Stats) Cancelled(n int) { } } +func (s *Stats) Split() { + if s != nil { + s.split.Inc() + } +} + // ErrTooMany updates the number of Too Many Requests responses reported by the output. func (s *Stats) ErrTooMany(n int) { if s != nil { diff --git a/libbeat/outputs/observer.go b/libbeat/outputs/observer.go index ed14920182d..9d7a3aec4a0 100644 --- a/libbeat/outputs/observer.go +++ b/libbeat/outputs/observer.go @@ -26,6 +26,7 @@ type Observer interface { Dropped(int) // report number of dropped events Duplicate(int) // report number of events detected as duplicates (e.g. on resends) Cancelled(int) // report number of cancelled events + Split() // report a batch was split for being too large to ingest WriteError(error) // report an I/O error on write WriteBytes(int) // report number of bytes being written ReadError(error) // report an I/O error on read @@ -48,6 +49,7 @@ func (*emptyObserver) Duplicate(int) {} func (*emptyObserver) Failed(int) {} func (*emptyObserver) Dropped(int) {} func (*emptyObserver) Cancelled(int) {} +func (*emptyObserver) Split() {} func (*emptyObserver) WriteError(error) {} func (*emptyObserver) WriteBytes(int) {} func (*emptyObserver) ReadError(error) {} diff --git a/libbeat/outputs/outest/batch.go b/libbeat/outputs/outest/batch.go index a43c6da1e51..f0e6838e8fd 100644 --- a/libbeat/outputs/outest/batch.go +++ b/libbeat/outputs/outest/batch.go @@ -39,6 +39,7 @@ const ( BatchACK BatchSignalTag = iota BatchDrop BatchRetry + BatchSplitRetry BatchRetryEvents BatchCancelled ) @@ -71,6 +72,11 @@ func (b *Batch) RetryEvents(events []publisher.Event) { b.doSignal(BatchSignal{Tag: BatchRetryEvents, Events: events}) } +func (b *Batch) SplitRetry() bool { + b.doSignal(BatchSignal{Tag: BatchSplitRetry}) + return len(b.events) > 1 +} + func (b *Batch) FreeEntries() {} func (b *Batch) Cancelled() { diff --git a/libbeat/outputs/shipper/api/shipper_mock.go b/libbeat/outputs/shipper/api/shipper_mock.go index 6b26f100e67..52e49a72e32 100644 --- a/libbeat/outputs/shipper/api/shipper_mock.go +++ b/libbeat/outputs/shipper/api/shipper_mock.go @@ -42,12 +42,14 @@ type ProducerMock struct { uuid string AcceptedCount uint32 persistedIndex uint64 - Error error + ErrorCallback func(events []*messages.Event) error } func (p *ProducerMock) PublishEvents(ctx context.Context, r *messages.PublishRequest) (*messages.PublishReply, error) { - if p.Error != nil { - return nil, p.Error + if p.ErrorCallback != nil { + if err := p.ErrorCallback(r.Events); err != nil { + return nil, err + } } if r.Uuid != p.uuid { diff --git a/libbeat/outputs/shipper/shipper.go b/libbeat/outputs/shipper/shipper.go index 7164ca42826..2cfd42ee737 100644 --- a/libbeat/outputs/shipper/shipper.go +++ b/libbeat/outputs/shipper/shipper.go @@ -203,13 +203,16 @@ func (s *shipper) publish(ctx context.Context, batch publisher.Batch) error { if status.Code(err) == codes.ResourceExhausted { // This error can only come from the gRPC connection, and // most likely indicates this request exceeds the shipper's - // RPC size limit. The correct thing to do here is split - // the batch as in https://github.com/elastic/beats/issues/29778. - // Since this isn't supported yet, we drop this batch to avoid - // permanently blocking the pipeline. - s.log.Errorf("dropping %d events because of RPC failure: %v", len(events), err) - batch.Drop() - s.observer.Dropped(len(events)) + // RPC size limit. Split the batch if possible, otherwise we + // need to drop it. + if batch.SplitRetry() { + // Report that we split a batch + s.observer.Split() + } else { + batch.Drop() + s.observer.Dropped(len(events)) + s.log.Errorf("dropping %d events because of RPC failure: %v", len(events), err) + } return nil } // All other known errors are, in theory, retryable once the diff --git a/libbeat/outputs/shipper/shipper_test.go b/libbeat/outputs/shipper/shipper_test.go index cd77ec1fa1b..845815e6f71 100644 --- a/libbeat/outputs/shipper/shipper_test.go +++ b/libbeat/outputs/shipper/shipper_test.go @@ -40,6 +40,7 @@ import ( "github.com/elastic/beats/v7/libbeat/outputs/outest" "github.com/elastic/beats/v7/libbeat/outputs/shipper/api" "github.com/elastic/beats/v7/libbeat/publisher" + "github.com/elastic/beats/v7/libbeat/publisher/pipeline" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-shipper-client/pkg/helpers" @@ -143,22 +144,18 @@ func TestToShipperEvent(t *testing.T) { } func TestPublish(t *testing.T) { - //logp.DevelopmentSetup() events := []beat.Event{ { - Timestamp: time.Now(), - Meta: mapstr.M{"event": "first"}, - Fields: mapstr.M{"a": "b"}, + Meta: mapstr.M{"event": "first"}, + Fields: mapstr.M{"a": "b"}, }, { - Timestamp: time.Now(), - Meta: nil, // see failMarshal() - Fields: mapstr.M{"a": "b"}, + Meta: nil, // see failMarshal() + Fields: mapstr.M{"a": "b"}, }, { - Timestamp: time.Now(), - Meta: mapstr.M{"event": "third"}, - Fields: mapstr.M{"e": "f"}, + Meta: mapstr.M{"event": "third"}, + Fields: mapstr.M{"e": "f"}, }, } @@ -179,9 +176,7 @@ func TestPublish(t *testing.T) { events: events, marshalMethod: toShipperEvent, expSignals: []outest.BatchSignal{ - { - Tag: outest.BatchACK, - }, + {Tag: outest.BatchACK}, }, qSize: 3, observerExpected: &TestObserver{batch: 3, acked: 3}, @@ -190,9 +185,7 @@ func TestPublish(t *testing.T) { name: "retries not accepted events", events: events, expSignals: []outest.BatchSignal{ - { - Tag: outest.BatchACK, - }, + {Tag: outest.BatchACK}, }, marshalMethod: failMarshal, // emulate a dropped event qSize: 3, @@ -202,9 +195,7 @@ func TestPublish(t *testing.T) { name: "cancels the batch if server error", events: events, expSignals: []outest.BatchSignal{ - { - Tag: outest.BatchCancelled, - }, + {Tag: outest.BatchCancelled}, }, marshalMethod: toShipperEvent, qSize: 3, @@ -213,16 +204,26 @@ func TestPublish(t *testing.T) { expError: "failed to publish the batch to the shipper, none of the 3 events were accepted", }, { - name: "drops the batch on resource exceeded error", + name: "splits the batch on resource exceeded error", events: events, expSignals: []outest.BatchSignal{ - { - Tag: outest.BatchDrop, - }, + {Tag: outest.BatchSplitRetry}, }, marshalMethod: toShipperEvent, qSize: 3, - observerExpected: &TestObserver{batch: 3, dropped: 3}, + observerExpected: &TestObserver{batch: 3, split: 1}, + serverError: status.Error(codes.ResourceExhausted, "rpc size limit exceeded"), + }, + { + name: "drops an unsplittable batch on resource exceeded error", + events: events[:1], // only 1 event so SplitRetry returns false + expSignals: []outest.BatchSignal{ + {Tag: outest.BatchSplitRetry}, + {Tag: outest.BatchDrop}, + }, + marshalMethod: toShipperEvent, + qSize: 1, + observerExpected: &TestObserver{batch: 1, dropped: 1}, serverError: status.Error(codes.ResourceExhausted, "rpc size limit exceeded"), }, } @@ -238,7 +239,8 @@ func TestPublish(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - addr, producer, stop := runServer(t, tc.qSize, tc.serverError, "localhost:0") + addr, producer, stop := runServer( + t, tc.qSize, constErrorCallback(tc.serverError), "localhost:0") defer stop() cfg, err := config.NewConfigFrom(map[string]interface{}{ @@ -372,6 +374,119 @@ func TestPublish(t *testing.T) { require.Equal(t, expSignals, batch3.Signals, "batch3") require.Equal(t, expectedObserver, observer) }) + + t.Run("live batches where all events are too large to ingest", func(t *testing.T) { + // This tests recursive retry using live `ttlBatch` structs instead of mocks + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + errCallback := constErrorCallback(status.Error(codes.ResourceExhausted, "rpc size limit exceeded")) + addr, _, stop := runServer(t, 9, errCallback, "localhost:0") + defer stop() + cfg, err := config.NewConfigFrom(map[string]interface{}{ + "server": addr, + }) + require.NoError(t, err) + observer := &TestObserver{} + + client := createShipperClient(t, cfg, observer) + + // Since we retry directly instead of going through a live pipeline, + // the Publish call is synchronous and we can track state by modifying + // local variables directly. + retryCount := 0 + done := false + batch := pipeline.NewBatchForTesting( + []publisher.Event{ + {Content: events[0]}, {Content: events[1]}, {Content: events[2]}, + }, + func(b publisher.Batch) { + // Retry by sending directly back to Publish. In a live + // pipeline, this would be sent through eventConsumer first + // before calling Publish on the next free output worker. + retryCount++ + err := client.Publish(ctx, b) + assert.NoError(t, err, "Publish shouldn't return an error") + }, + func() { done = true }, + ) + err = client.Publish(ctx, batch) + assert.NoError(t, err, "Publish shouldn't return an error") + + // For three events there should be four retries in total: + // {[event1], [event2, event3]}, then {[event2], [event3]}. + // "done" should be true because after splitting into individual + // events, all 3 will fail and be dropped. + assert.Equal(t, 4, retryCount, "three-event batch should produce four total retries") + assert.True(t, done, "batch should be done after Publish") + + // "batch" adds up all events passed into publish, including repeats, + // so it should be 3 + 2 + 1 + 1 + 1 = 8 + expectedObserver := &TestObserver{split: 2, dropped: 3, batch: 8} + require.Equal(t, expectedObserver, observer) + }) + + t.Run("live batches where only one event is too large to ingest", func(t *testing.T) { + // This tests retry using live `ttlBatch` structs instead of mocks, + // where one event is too large too ingest but the others are ok. + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + errCallback := func(batchEvents []*messages.Event) error { + // Treat only the first event (which contains the metadata + // string "first") as too large to ingest, and accept otherwise. + for _, e := range batchEvents { + if strings.Contains(e.String(), "\"first\"") { + return status.Error(codes.ResourceExhausted, "rpc size limit exceeded") + } + } + return nil + } + addr, _, stop := runServer(t, 9, errCallback, "localhost:0") + defer stop() + cfg, err := config.NewConfigFrom(map[string]interface{}{ + "server": addr, + }) + require.NoError(t, err) + observer := &TestObserver{} + + client := createShipperClient(t, cfg, observer) + + // Since we retry directly instead of going through a live pipeline, + // the Publish call is synchronous and we can track state by modifying + // local variables directly. + retryCount := 0 + done := false + batch := pipeline.NewBatchForTesting( + []publisher.Event{ + {Content: events[0]}, {Content: events[1]}, {Content: events[2]}, + }, + func(b publisher.Batch) { + // Retry by sending directly back to Publish. In a live + // pipeline, this would be sent through eventConsumer first + // before calling Publish on the next free output worker. + retryCount++ + err := client.Publish(ctx, b) + assert.NoError(t, err, "Publish shouldn't return an error") + }, + func() { done = true }, + ) + err = client.Publish(ctx, batch) + assert.NoError(t, err, "Publish shouldn't return an error") + + // Only the first event is too large -- it will be retried by + // itself and the other batch will succeed, so retryCount should + // be 2. + // "done" should be false because the shipper output doesn't call done + // until upstream ingestion is confirmed via PersistedIndex. + assert.Equal(t, 2, retryCount, "three-event batch should produce four total retries") + assert.False(t, done, "batch should be acknowledged after Publish") + + // "batch" adds up all events passed into publish, including repeats, + // so it should be 3 + 1 + 2 = 6 + expectedObserver := &TestObserver{split: 1, dropped: 1, batch: 6} + require.Equal(t, expectedObserver, observer) + }) } // BenchmarkToShipperEvent is used to detect performance regression when the conversion function is changed. @@ -429,9 +544,14 @@ func BenchmarkToShipperEvent(b *testing.B) { // `err` is a preset error that the server will serve to the client // `listenAddr` is the address for the server to listen // returns `actualAddr` where the listener actually is and the `stop` function to stop the server -func runServer(t *testing.T, qSize int, err error, listenAddr string) (actualAddr string, mock *api.ProducerMock, stop func()) { +func runServer( + t *testing.T, + qSize int, + errCallback func([]*messages.Event) error, + listenAddr string, +) (actualAddr string, mock *api.ProducerMock, stop func()) { producer := api.NewProducerMock(qSize) - producer.Error = err + producer.ErrorCallback = errCallback grpcServer := grpc.NewServer() pb.RegisterProducerServer(grpcServer, producer) @@ -450,6 +570,12 @@ func runServer(t *testing.T, qSize int, err error, listenAddr string) (actualAdd return actualAddr, producer, stop } +func constErrorCallback(err error) func([]*messages.Event) error { + return func(_ []*messages.Event) error { + return err + } +} + func createShipperClient(t *testing.T, cfg *config.C, observer outputs.Observer) outputs.NetworkClient { group, err := makeShipper( nil, @@ -499,6 +625,7 @@ type TestObserver struct { batch int duplicate int failed int + split int writeError error readError error @@ -515,6 +642,7 @@ func (to *TestObserver) Duplicate(duplicate int) { to.duplicate += duplicate } func (to *TestObserver) Failed(failed int) { to.failed += failed } func (to *TestObserver) Dropped(dropped int) { to.dropped += dropped } func (to *TestObserver) Cancelled(cancelled int) { to.cancelled += cancelled } +func (to *TestObserver) Split() { to.split++ } func (to *TestObserver) WriteError(we error) { to.writeError = we } func (to *TestObserver) WriteBytes(wb int) { to.writeBytes += wb } func (to *TestObserver) ReadError(re error) { to.readError = re } diff --git a/libbeat/publisher/event.go b/libbeat/publisher/event.go index 502635851d1..83dbb22f777 100644 --- a/libbeat/publisher/event.go +++ b/libbeat/publisher/event.go @@ -41,6 +41,12 @@ type Batch interface { // Try sending the events in this list again; all others are acknowledged. RetryEvents(events []Event) + // Split this batch's events into two smaller batches and retry them both. + // If SplitRetry returns false, the batch could not be split, and the + // caller is responsible for reporting the error (including calling + // batch.Drop() if necessary). + SplitRetry() bool + // Release the internal pointer to this batch's events but do not yet // acknowledge this batch. This exists specifically for the shipper output, // where there is potentially a long gap between when events are handed off diff --git a/libbeat/publisher/pipeline/testing.go b/libbeat/publisher/pipeline/testing.go index 1935454c88d..3d2cb52535d 100644 --- a/libbeat/publisher/pipeline/testing.go +++ b/libbeat/publisher/pipeline/testing.go @@ -71,11 +71,12 @@ func (b *mockBatch) Events() []publisher.Event { return b.events } -func (b *mockBatch) ACK() { signalFn(b.onACK) } -func (b *mockBatch) Drop() { signalFn(b.onDrop) } -func (b *mockBatch) Retry() { signalFn(b.onRetry) } -func (b *mockBatch) Cancelled() { signalFn(b.onCancelled) } -func (b *mockBatch) FreeEntries() {} +func (b *mockBatch) ACK() { signalFn(b.onACK) } +func (b *mockBatch) Drop() { signalFn(b.onDrop) } +func (b *mockBatch) Retry() { signalFn(b.onRetry) } +func (b *mockBatch) SplitRetry() bool { return false } +func (b *mockBatch) Cancelled() { signalFn(b.onCancelled) } +func (b *mockBatch) FreeEntries() {} func (b *mockBatch) RetryEvents(events []publisher.Event) { b.updateEvents(events) diff --git a/libbeat/publisher/pipeline/ttl_batch.go b/libbeat/publisher/pipeline/ttl_batch.go index 69423ddc439..c374ac88d72 100644 --- a/libbeat/publisher/pipeline/ttl_batch.go +++ b/libbeat/publisher/pipeline/ttl_batch.go @@ -18,6 +18,8 @@ package pipeline import ( + "sync/atomic" + "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/queue" ) @@ -41,6 +43,22 @@ type ttlBatch struct { // The cached events returned from original.Events(). If some but not // all of the events are ACKed, those ones are removed from the list. events []publisher.Event + + // If split is non-nil, this batch was created by splitting another + // batch when the output determined it was too large. In this case, + // all split batches descending from the same original batch will + // point to the same metadata. + split *batchSplitData +} + +type batchSplitData struct { + // The original done callback, to be invoked when all split batches + // descending from it have been completed. + originalDone func() + + // The number of events awaiting acknowledgment from the original + // batch. When this reaches zero, originalDone should be invoked. + outstandingEvents atomic.Int64 } func newBatch(retryer retryer, original queue.Batch, ttl int) *ttlBatch { @@ -82,6 +100,55 @@ func (b *ttlBatch) Drop() { b.done() } +// SplitRetry is called by the output to report that the batch is +// too large to ingest. It splits the events into two separate batches +// and sends both of them back to the retryer. Returns false if the +// batch could not be split. +func (b *ttlBatch) SplitRetry() bool { + if len(b.events) < 2 { + // This batch is already as small as it can get + return false + } + splitData := b.split + if splitData == nil { + // Splitting a previously unsplit batch, create the metadata + splitData = &batchSplitData{ + originalDone: b.done, + } + // Initialize to the number of events in the original batch + splitData.outstandingEvents.Add(int64(len(b.events))) + } + splitIndex := len(b.events) / 2 + events1 := b.events[:splitIndex] + events2 := b.events[splitIndex:] + b.retryer.retry(&ttlBatch{ + events: events1, + done: splitData.doneCallback(len(events1)), + retryer: b.retryer, + ttl: b.ttl, + split: splitData, + }, false) + b.retryer.retry(&ttlBatch{ + events: events2, + done: splitData.doneCallback(len(events2)), + retryer: b.retryer, + ttl: b.ttl, + split: splitData, + }, false) + return true +} + +// returns a callback to acknowledge the given number of events from +// a batch fragment. +func (splitData *batchSplitData) doneCallback(eventCount int) func() { + return func() { + remaining := splitData.outstandingEvents.Add(-int64(eventCount)) + if remaining == 0 { + splitData.originalDone() + } + } +} + func (b *ttlBatch) Retry() { b.retryer.retry(b, true) } @@ -111,7 +178,7 @@ func (b *ttlBatch) reduceTTL() bool { return true } - // filter for evens with guaranteed send flags + // filter for events with guaranteed send flags events := b.events[:0] for _, event := range b.events { if event.Guaranteed() { @@ -128,3 +195,41 @@ func (b *ttlBatch) reduceTTL() bool { // all events have been dropped: return false } + +/////////////////////////////////////////////////////////////////////// +// Testing support helpers + +// NewBatchForTesting creates a ttlBatch (exposed through its publisher +// interface). This is exposed publicly to support testing of ttlBatch +// with other pipeline components, it should never be used to create +// a batch in live pipeline code. +// +// - events: the publisher events contained in the batch +// - ttl: the number of retries left until the batch is dropped. -1 means it +// can't be dropped. +// - retryCallback: the callback invoked when a batch needs to be retried. +// In a live pipeline, this points to the retry method on eventConsumer, +// the helper object that distributes pending batches to output workers. +// - done: the callback invoked on receiving batch.Done +func NewBatchForTesting( + events []publisher.Event, + retryCallback func(batch publisher.Batch), + done func(), +) publisher.Batch { + return &ttlBatch{ + events: events, + done: done, + retryer: testingRetryer{retryCallback}, + } +} + +// testingRetryer is a simple wrapper of the retryer interface that is +// used by NewBatchForTesting, to allow tests in other packages to interoperate +// with the internal type ttlBatch. +type testingRetryer struct { + retryCallback func(batch publisher.Batch) +} + +func (tr testingRetryer) retry(batch *ttlBatch, _ bool) { + tr.retryCallback(batch) +} diff --git a/libbeat/publisher/pipeline/ttl_batch_test.go b/libbeat/publisher/pipeline/ttl_batch_test.go new file mode 100644 index 00000000000..a56f4b0fca1 --- /dev/null +++ b/libbeat/publisher/pipeline/ttl_batch_test.go @@ -0,0 +1,100 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pipeline + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/publisher" +) + +func TestBatchSplitRetry(t *testing.T) { + // SplitRetry should: + // - send two batches to the retryer, each with half the events + // - give each a callback that will fall through to the original one only + // after all descendant batches are acknowledged + + retryer := &mockRetryer{} + events := make([]publisher.Event, 2) + doneWasCalled := false + + rootBatch := ttlBatch{ + events: events, + retryer: retryer, + done: func() { doneWasCalled = true }, + } + + rootBatch.SplitRetry() + + require.Equal(t, 2, len(retryer.batches), "SplitRetry should retry 2 batches") + require.Equal(t, 1, len(retryer.batches[0].events), "Retried batches should have one event each") + require.Equal(t, 1, len(retryer.batches[1].events), "Retried batches should have one event each") + assert.Same(t, &events[0], &retryer.batches[0].events[0], "Retried batch events should match original") + assert.Same(t, &events[1], &retryer.batches[1].events[0], "Retried batch events should match original") + + assert.False(t, doneWasCalled, "No batch callbacks should be received yet") + retryer.batches[0].done() + assert.False(t, doneWasCalled, "Original callback shouldn't be invoked until both children are") + retryer.batches[1].done() + assert.True(t, doneWasCalled, "Original callback should be invoked when all children are") +} + +func TestNestedBatchSplit(t *testing.T) { + // Test splitting the same original batch multiple times + + retryer := &mockRetryer{} + events := make([]publisher.Event, 4) + doneWasCalled := false + + rootBatch := ttlBatch{ + events: events, + retryer: retryer, + done: func() { doneWasCalled = true }, + } + + rootBatch.SplitRetry() + require.Equal(t, 2, len(retryer.batches), "SplitRetry should retry 2 batches") + // Clear out the first-level batches from the retry buffer and retry both of them + batches := retryer.batches + retryer.batches = []*ttlBatch{} + batches[0].SplitRetry() + batches[1].SplitRetry() + + require.Equal(t, 4, len(retryer.batches), "two SplitRetry calls should generate four retrys") + + for i := 0; i < 4; i++ { + assert.False(t, doneWasCalled, "Original callback shouldn't be invoked until all children are") + require.Equal(t, 1, len(retryer.batches[i].events), "Retried batches should have one event each") + + // We expect the indices in the retried batches to match because we retried them in order + assert.Same(t, &events[i], &retryer.batches[i].events[0], "Retried batch events should match original") + retryer.batches[i].done() + } + assert.True(t, doneWasCalled, "Original callback should be invoked when all children are") +} + +type mockRetryer struct { + batches []*ttlBatch +} + +func (r *mockRetryer) retry(batch *ttlBatch, decreaseTTL bool) { + r.batches = append(r.batches, batch) +}