Skip to content

OAK-11765 - BulkProcessor unable to insert after a failure #2343

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jun 24, 2025

Conversation

nfsantos
Copy link
Contributor

@nfsantos nfsantos commented Jun 19, 2025

Recovering from Elasticsearch connection errors

Handling of errors on Elasticsearch bulk processor

The Bulk processor sends requests to elastic asynchronously using the BulkIngester class of the Elasticsearch Java driver. When the driver receives a response from a bulk request, the BulkIngester invokes a callback in our ElasticBulkProcessorHandler, indicating the result of the bulk. In case of errors, this sets internal state in ElasticBulkProcessorHandler with the indication of the error. The next time the client of the bulk processor executes an operation on the processor (adding a document or flushing/closing an index), this error is propagated.

Previously, the error status was not being cleared after being propagated to the callers. So once an error occurred, any further call to the bulk processor handler would fail with the same error. This is incorrect, because the bulk processor in the Elastic Java driver is capable of recovering from failed connections and other errors, and errors on the operations of a particular index should not affect the other non-faulty indexes being updated in the same bulk. This PR fixes this issue, by clearing the error status after it is propagated to the caller.

A bulk can fail with two types of errors:

  • A connection or server error - The server cannot be reached or cannot process the bulk. This means that no operation in the bulk was processed. In this case, we log the error and propagate the exception. It is up to the caller to decide to retry or to fail.
  • Individual documents in the bulk failed to process - This is a partial failure, as other operations in the bulk might have been successful. Additionally, these errors affect only the operations on a particular index (e.g., trying to add documents to an index that does not exist). We log this error and optionally (controlled by oak.indexer.elastic.bulkProcessor.failOnError) propagate the error to the callers that are performing operations on the index of the operations that failed. If the operations on other indexes in the same bulk succeeded, the callers for these indexes do not observe any error.

The incremental indexing (AsyncIndexUpdate already retries failed operations. If a cycle fails, the checkpoint is not advanced and the next cycle will retry the operations from the original checkpoint. Therefore, if there are transient failures, once the underlying connection problem is resolved, the incremental indexer will resume. So this PR does not make any changes to this logic, it is enough to fix the issue that prevented the bulk processor from clearing up the internal faulty state.

Retry policy

Added a retry policy to ElasticIndexWriter. This policy will retry the operations either for a maximum number of times or a maximum time, using exponential backoff with a cap on the maximum interval between retries.

The policy can be configured in two ways, depending on whether the instance of ElasticIndexWriter is created for off-line reindexing or by the ElasticIndexProviderService for incremental indexing.

Incremental indexing

The following property is exposed in the ElasticIndexProviderService:

  • maxRetryTime - for how long to retry after a failure. Default: 0 seconds.

Disabled by default because the incremental indexer already retries failed indexing cycles. In the future, we can enable retry at the level of the ElasticIndexWriter as an optimization.

Off-line reindexing

Currently, the Elastic off-line reindexers will abort reindexing in the case of a connection error. This PR adds the following system property that is read by ElasticDocumentStoreIndexer and ElasticOutOfBandIndexer:

  • oak.indexer.elastic.connectionRetrySeconds - for how long to retry after a failure. Default: 30 seconds

Additional changes:

  • Do not save the paths of failed documents in index definition in the node store. This feature is not being used and adds significant complexity.
  • Upgrade ES Java driver from 8.18.1 to 8.18.2.
  • Throttle logs of failed operations on bulks.

nfsantos added 8 commits June 18, 2025 12:17
- If a bulk request fails with a global error, log the error and propagate the exception. It is up to the caller to decide to retry or to fail.
- If a document in a bulk fails to index, log the error and use the value of the system property "oak.indexer.elastic.bulkProcessor.failOnError" to decide if the error is propagated to the caller that is operating on that specific index or if it is suppressed.
- Do not save failed documents on Oak. This is not being used.
…ment and OutOfBand/Editor-based indexers). These can be controlled by a new system property: oak.indexer.elastic.connectionRetrySeconds.
failure.printStackTrace(pw);
return ec.reason(failure.getMessage()).stackTrace(sw.toString());
}));
LOG.warn("ElasticIndex Update Bulk Failure : Bulk with id {} threw an error", executionId, failure);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we still log as error here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, it is a connection failure to Elastic. This can happen under "normal" conditions, when Elastic is scaling up/down, there is a failure on Elastic or there is some connection problem. All these are normal conditions that we handle. The exception is saved internally in the ElasticBulkProcessorHandler and then propagated to client code, which may retry or decide to give up. I think since at this point we are not sure there is a fatal error, we should just log as warn and let the higher levels decide what to do: retry or give up and log an ERROR. Once again, I don't have strong feelings about this, so I can change it to ERROR.

String logSilenceKey = indexInfo.indexName + ":" + type + ":" + reason;
if (!LOG_SILENCER.silence(logSilenceKey)) {
// Log entry to be used to parse logs to get the failed doc id/path if needed
LOG.warn("Failure Details: BulkItem ID: {}, Index: {}, Failure Cause: {} - {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error level here as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure. This message is printed when a document fails to index, so it is an error caused by the user providing a wrong/invalid document or index rule. This can happen quite often and pollute the logs with ERROR level messages which are not that critical. As it is not a failure on our side, I think WARN level is more appropriate. But I don't have a strong opinion here, so I'm willing to change it to ERROR if you think it's better.

@nfsantos nfsantos merged commit 1c85b3a into apache:trunk Jun 24, 2025
1 of 3 checks passed
@nfsantos nfsantos deleted the OAK-11765 branch June 24, 2025 06:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants