-
Notifications
You must be signed in to change notification settings - Fork 5k
Description
The interactions between the otelconsumer and the ES exporter likely lose data in cases where there is a document level failure in a batch request and the collector restarts. It is likely that most of these problems are actually bugs in the ES exporter, but we should add detailed failure mode testing here to ensure we preserve Filebeat's at least once delivery guarantees.
For reference, when you make a _bulk request to Elasticsearch the response will give you document level HTTP status codes. This lets you detect partial failure along with which specific documents failed. The Beats Elasticsearch output uses this to drop permanent failures (e.g. mapping conflict on a single document) and retry individual documents (e.g. document level 429s). This happens here:
beats/libbeat/outputs/elasticsearch/client.go
Lines 263 to 271 in d73c6c9
| eventsToRetry, stats := client.bulkCollectPublishFails(bulkResult) | |
| stats.reportToObserver(client.observer) | |
| if len(eventsToRetry) > 0 { | |
| span.Context.SetLabel("events_failed", len(eventsToRetry)) | |
| batch.RetryEvents(eventsToRetry) | |
| } else { | |
| batch.ACK() | |
| } |
The bulk indexer used in the ES exporter has similar logic here.
The otelconsumer only accepts an error which is for the whole batch of logs it tried to ingest:
beats/x-pack/libbeat/outputs/otelconsumer/otelconsumer.go
Lines 153 to 170 in d73c6c9
| err := out.logsConsumer.ConsumeLogs(ctx, pLogs) | |
| if err != nil { | |
| // Permanent errors shouldn't be retried. This tipically means | |
| // the data cannot be serialized by the exporter that is attached | |
| // to the pipeline or when the destination refuses the data because | |
| // it cannot decode it. Retrying in this case is useless. | |
| // | |
| // See https://github.com/open-telemetry/opentelemetry-collector/blob/1c47d89/receiver/doc.go#L23-L40 | |
| if consumererror.IsPermanent(err) { | |
| st.PermanentErrors(len(events)) | |
| batch.Drop() | |
| } else { | |
| st.RetryableErrors(len(events)) | |
| batch.Retry() | |
| } | |
| return fmt.Errorf("failed to send batch events to otel collector: %w", err) | |
| } |
It can detect permanent failures but I think this is only going to be for the entire batch.
The bulk indexer can return nil on flush if there were document level errors. Problem one is that our otelconsumer can’t see this and will ack the batch when this happens but the document in this case hasn't been persisted in Elasticsearch yet.
The bulk indexer logic linked earlier to retry failed individual documents covers for this by re-adding failed documents to the indexer’s writer, but this also has the effect that those logs will be get Flushed on a different pushDataLogs call (ES exporters ConsumeLogs function) in the future decoupling any error from the original call in our otelconsumer. This is problem two.
To observe this causing problems you’d need to cause partial _bulk failures that exhaust any configured retries and/or restart the collector so that the effect of our ack in otelconsumer has caused us to move past the uningested data in Filebeats registry file when ingesting a log file.
We need to do some targeted testing here where we force partial _bulk failures (e.g. permanent mapping conflict on only one doc and long lived but recoverable 429 response on only one doc).
I think the fundamental problem is that otelconsumer can’t see that only some documents may have succeeded, we are relying on retries in the ES exporter that can be stopped via a collector restart. We could consider partial failures as total failures and that would avoid data loss but cause us to re-index a lot more documents than we do today.