-
Notifications
You must be signed in to change notification settings - Fork 415
OAK-11765 - BulkProcessor unable to insert after a failure #2343
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- If a bulk request fails with a global error, log the error and propagate the exception. It is up to the caller to decide to retry or to fail. - If a document in a bulk fails to index, log the error and use the value of the system property "oak.indexer.elastic.bulkProcessor.failOnError" to decide if the error is propagated to the caller that is operating on that specific index or if it is suppressed. - Do not save failed documents on Oak. This is not being used.
…ment and OutOfBand/Editor-based indexers). These can be controlled by a new system property: oak.indexer.elastic.connectionRetrySeconds.
failure.printStackTrace(pw); | ||
return ec.reason(failure.getMessage()).stackTrace(sw.toString()); | ||
})); | ||
LOG.warn("ElasticIndex Update Bulk Failure : Bulk with id {} threw an error", executionId, failure); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we still log as error here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, it is a connection failure to Elastic. This can happen under "normal" conditions, when Elastic is scaling up/down, there is a failure on Elastic or there is some connection problem. All these are normal conditions that we handle. The exception is saved internally in the ElasticBulkProcessorHandler and then propagated to client code, which may retry or decide to give up. I think since at this point we are not sure there is a fatal error, we should just log as warn and let the higher levels decide what to do: retry or give up and log an ERROR. Once again, I don't have strong feelings about this, so I can change it to ERROR.
String logSilenceKey = indexInfo.indexName + ":" + type + ":" + reason; | ||
if (!LOG_SILENCER.silence(logSilenceKey)) { | ||
// Log entry to be used to parse logs to get the failed doc id/path if needed | ||
LOG.warn("Failure Details: BulkItem ID: {}, Index: {}, Failure Cause: {} - {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
error level here as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure. This message is printed when a document fails to index, so it is an error caused by the user providing a wrong/invalid document or index rule. This can happen quite often and pollute the logs with ERROR level messages which are not that critical. As it is not a failure on our side, I think WARN level is more appropriate. But I don't have a strong opinion here, so I'm willing to change it to ERROR if you think it's better.
Recovering from Elasticsearch connection errors
Handling of errors on Elasticsearch bulk processor
The Bulk processor sends requests to elastic asynchronously using the
BulkIngester
class of the Elasticsearch Java driver. When the driver receives a response from a bulk request, theBulkIngester
invokes a callback in ourElasticBulkProcessorHandler
, indicating the result of the bulk. In case of errors, this sets internal state inElasticBulkProcessorHandler
with the indication of the error. The next time the client of the bulk processor executes an operation on the processor (adding a document or flushing/closing an index), this error is propagated.Previously, the error status was not being cleared after being propagated to the callers. So once an error occurred, any further call to the bulk processor handler would fail with the same error. This is incorrect, because the bulk processor in the Elastic Java driver is capable of recovering from failed connections and other errors, and errors on the operations of a particular index should not affect the other non-faulty indexes being updated in the same bulk. This PR fixes this issue, by clearing the error status after it is propagated to the caller.
A bulk can fail with two types of errors:
oak.indexer.elastic.bulkProcessor.failOnError
) propagate the error to the callers that are performing operations on the index of the operations that failed. If the operations on other indexes in the same bulk succeeded, the callers for these indexes do not observe any error.The incremental indexing (AsyncIndexUpdate already retries failed operations. If a cycle fails, the checkpoint is not advanced and the next cycle will retry the operations from the original checkpoint. Therefore, if there are transient failures, once the underlying connection problem is resolved, the incremental indexer will resume. So this PR does not make any changes to this logic, it is enough to fix the issue that prevented the bulk processor from clearing up the internal faulty state.
Retry policy
Added a retry policy to ElasticIndexWriter. This policy will retry the operations either for a maximum number of times or a maximum time, using exponential backoff with a cap on the maximum interval between retries.
The policy can be configured in two ways, depending on whether the instance of
ElasticIndexWriter
is created for off-line reindexing or by theElasticIndexProviderService
for incremental indexing.Incremental indexing
The following property is exposed in the ElasticIndexProviderService:
maxRetryTime
- for how long to retry after a failure. Default: 0 seconds.Disabled by default because the incremental indexer already retries failed indexing cycles. In the future, we can enable retry at the level of the ElasticIndexWriter as an optimization.
Off-line reindexing
Currently, the Elastic off-line reindexers will abort reindexing in the case of a connection error. This PR adds the following system property that is read by ElasticDocumentStoreIndexer and ElasticOutOfBandIndexer:
oak.indexer.elastic.connectionRetrySeconds
- for how long to retry after a failure. Default: 30 secondsAdditional changes: