-
Notifications
You must be signed in to change notification settings - Fork 24.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bulk operation fail to replicate operations when a mapping update times out #30244
Conversation
Pinging @elastic/es-distributed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
first round. pretty intense, thanks for getting your hands dirty here
return new BulkItemResultHolder(null, indexResult, bulkItemRequest); | ||
} else { | ||
IndexResponse response = new IndexResponse(primary.shardId(), indexRequest.type(), indexRequest.id(), | ||
switch (indexResult.getResultType()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what was wrong with a boolean here? What does this result type buy us?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, that's a good question. I was doubting on how to do this and ended up with the enum but I'm not 100% happy with it either. I wanted IndexShard.applyIndexOperationOnPrimary
to always return without the extra callbacks. That method can end with success, failure or a required mapping change (I didn't want to make this a failure). The problem with the booleans is you need to coordinate them - make sure that there is no failure and a required mapping update. I hope this clarifies things. I'm open to suggestions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am ok with it. stuff like this is always tricky
default: | ||
throw new IllegalStateException("Unexpected request operation type on replica: " | ||
+ docWriteRequest.opType().getLowercase()); | ||
} | ||
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't we have a second boolean instead of this context sensitive type?
opsRecovered++; | ||
recoveryState.getTranslog().incrementRecoveredOperations(); | ||
} catch (Exception e) { | ||
if (ExceptionsHelper.status(e) == RestStatus.BAD_REQUEST) { | ||
// mainly for MapperParsingException and Failure to detect xcontent | ||
logger.info("ignoring recovery of a corrupt translog entry", e); | ||
} else if (e instanceof RuntimeException) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe use ExceptionsHelper#convertToRuntime
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, I failed to find that utility. Thanks for pointing out.
return new BulkItemResultHolder(response, deleteResult, bulkItemRequest); | ||
return new BulkItemResultHolder(response, deleteResult, bulkItemRequest); | ||
case FAILURE: | ||
return new BulkItemResultHolder(null, deleteResult, bulkItemRequest); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should have all options listed here and don't use default. Be explicit here please
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure. I'll add en explicit line + a default for future additions in the enum
request.getAutoGeneratedTimestamp(), request.isRetry(), update -> mappingUpdater.verifyMappings(update, primary.shardId())); | ||
Engine.IndexResult result; | ||
|
||
result = primary.applyIndexOperationOnPrimary(request.version(), request.versionType(), sourceToParse, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks pretty much like the code in the delete part below. Can we maybe break it out in a shared routine and pass a closure to it to actually process the operation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. I'll do this once the other discussion settles.
case MAPPING_UPDATE_REQUIRED: | ||
throw new IllegalArgumentException("unexpected mapping update: " + result.getRequiredMappingUpdate()); | ||
case SUCCESS: | ||
break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add a default case here to make sure we fail hard if we miss it.
postIndex(shardId, index, result.getFailure()); | ||
switch (result.getResultType()) { | ||
case SUCCESS: | ||
if (!index.origin().isRecovery()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use == false
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤣
postDelete(shardId, delete, result.getFailure()); | ||
switch (result.getResultType()) { | ||
case SUCCESS: | ||
if (!delete.origin().isRecovery()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use == false
@@ -449,6 +456,56 @@ public void testVerifyApiBlocksDuringPartition() throws Exception { | |||
|
|||
} | |||
|
|||
@TestLogging( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
really?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it help if I fold it into one line? :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't we remove it? why is it there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because any failure of this tests is useless without these logs. I thought this is what we agreed on - i.e., we selectively enable detailed debug logging on these types of tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok maybe leave a comment
request.getAutoGeneratedTimestamp(), request.isRetry()); | ||
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { | ||
// double mapping update. We assume that the successful mapping update wasn't yet processed on the node | ||
// and retry the entire request again. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how do we expect this to happen? The request must have been processed on the node, otherwise it would not have been acked?
Do we have any tests that cover the behavior here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a fair statement. I don't think this is possible given the current state of the code. I copied it here from the verify method of the mapping updater in order to preserve behavior. See [here])
elasticsearch/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
Line 583 in 5e4d0b4
throw new ReplicationOperation.RetryOnPrimaryException(shardId, |
throw new ReplicationOperation.RetryOnPrimaryException(shardId, | ||
"Dynamic mappings are not available on the node that holds the primary yet"); | ||
} | ||
Objects.requireNonNull(update); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be an assertion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure.
private Translog.Location translogLocation; | ||
private long took; | ||
|
||
protected Result(Operation.TYPE operationType, Exception failure, long version, long seqNo) { | ||
Objects.requireNonNull(failure); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can inline this directly with the assignment.
this.failure = Objects.requireNonNull(failure);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@ywelsch thanks. I addressed your comments. Care to take another look? |
sample packaging tests |
run sample packaging tests |
We have encountered the same issue with elasticsearch v6.2.4, can you merge it to the 6.2 branch also? |
Hi This would be extremely helpful. Thanks! |
we may fail to properly replicate operation when a mapping update on master fails. If a bulk operations needs a mapping update half way, it will send a request to the master before continuing to index the operations. If that request times out or isn't acked (i.e., even one node in the cluster didn't process it within 30s), we end up throwing the exception and aborting the entire bulk. This is a problem because all operations that were processed so far are not replicated any more to the replicas. Although these operations were never "acked" to the user (we threw an error) it cause the local checkpoint on the replicas to lag (on 6.x) and the primary and replica to diverge. This PR changes the logic to treat *any* mapping update failure as a document level failure, meaning only the relevant indexing operation will fail. Backport of elastic#30244
…es out (#30379) Starting with the refactoring in #22778 (released in 5.3) we may fail to properly replicate operation when a mapping update on master fails. If a bulk operations needs a mapping update half way, it will send a request to the master before continuing to index the operations. If that request times out or isn't acked (i.e., even one node in the cluster didn't process it within 30s), we end up throwing the exception and aborting the entire bulk. This is a problem because all operations that were processed so far are not replicated any more to the replicas. Although these operations were never "acked" to the user (we threw an error) it cause the local checkpoint on the replicas to lag (on 6.x) and the primary and replica to diverge. This PR changes the logic to treat *any* mapping update failure as a document level failure, meaning only the relevant indexing operation will fail. Back port of #30244 * remove
@bleskes any plans to backport it to 6.2? Thanks for your amazing help. |
We are actively working on releasing 6.3.0, which will contain this fix. Once the 6.3.0 is out, we will no longer release a new patch release to the 6.2.x series. The reason is that 6.3 is minor release and is fully compatible with previous 6.x releases. You just as easily upgrade to it as you would to a 6.2.x using the standard rolling upgrade mechansim. |
Thanks @bleskes, is it safe to assume that 6.3.0 will be released in the next few days? |
We can't say anything concrete. I don't think in the next few days is realistic. |
Got it. Thaks for responding. |
@bleskes, do you have any ideia about the 6.3 launch? I mean, the plan is to launch in a few days or few months or several months? |
@candreoliveira I can't say for sure but several months is extremely unlikely. |
@bleskes any more updates on when 6.3 will be released? This specific issue is causing us a lot of problems. |
@jt6211 still working on stabilizing it.. soon is all I can say at this point. |
Synced-flush consists of three steps: (1) force-flush on every active copy; (2) check for ongoing indexing operations; (3) seal copies if there's no change since step 1. If some indexing operations are completed on the primary but not replicas, then Lucene commits from step 1 on replicas won't be the same as the primary's. And step 2 would pass if it's executed when all pending operations are done. Once step 2 passes, we will incorrectly emit the "out of sync" warning message although nothing wrong here. Relates #28464 Relates #30244
Synced-flush consists of three steps: (1) force-flush on every active copy; (2) check for ongoing indexing operations; (3) seal copies if there's no change since step 1. If some indexing operations are completed on the primary but not replicas, then Lucene commits from step 1 on replicas won't be the same as the primary's. And step 2 would pass if it's executed when all pending operations are done. Once step 2 passes, we will incorrectly emit the "out of sync" warning message although nothing wrong here. Relates #28464 Relates #30244
Synced-flush consists of three steps: (1) force-flush on every active copy; (2) check for ongoing indexing operations; (3) seal copies if there's no change since step 1. If some indexing operations are completed on the primary but not replicas, then Lucene commits from step 1 on replicas won't be the same as the primary's. And step 2 would pass if it's executed when all pending operations are done. Once step 2 passes, we will incorrectly emit the "out of sync" warning message although nothing wrong here. Relates #28464 Relates #30244
Synced-flush consists of three steps: (1) force-flush on every active copy; (2) check for ongoing indexing operations; (3) seal copies if there's no change since step 1. If some indexing operations are completed on the primary but not replicas, then Lucene commits from step 1 on replicas won't be the same as the primary's. And step 2 would pass if it's executed when all pending operations are done. Once step 2 passes, we will incorrectly emit the "out of sync" warning message although nothing wrong here. Relates #28464 Relates #30244
Synced-flush consists of three steps: (1) force-flush on every active copy; (2) check for ongoing indexing operations; (3) seal copies if there's no change since step 1. If some indexing operations are completed on the primary but not replicas, then Lucene commits from step 1 on replicas won't be the same as the primary's. And step 2 would pass if it's executed when all pending operations are done. Once step 2 passes, we will incorrectly emit the "out of sync" warning message although nothing wrong here. Relates elastic#28464 Relates elastic#30244
Synced-flush consists of three steps: (1) force-flush on every active copy; (2) check for ongoing indexing operations; (3) seal copies if there's no change since step 1. If some indexing operations are completed on the primary but not replicas, then Lucene commits from step 1 on replicas won't be the same as the primary's. And step 2 would pass if it's executed when all pending operations are done. Once step 2 passes, we will incorrectly emit the "out of sync" warning message although nothing wrong here. Relates elastic#28464 Relates elastic#30244
Starting with the refactoring in #22778 (released in 5.3) we may fail to properly replicate operation when a mapping update on master fails. If a bulk operations needs a mapping update half way, it will send a request to the master before continuing to index the operations. If that request times out or isn't acked (i.e., even one node in the cluster didn't process it within 30s), we end up throwing the exception and aborting the entire bulk. This is a problem because all operations that were processed so far are not replicated any more to the replicas. Although these operations were never "acked" to the user (we threw an error) it cause the local checkpoint on the replicas to lag (on 6.x) and the primary and replica to diverge.
This PR does a couple of things:
IndexShard.applyIndexOperationOnPrimary
and similar methods for simpler execution. We don't use exceptions any more when a mapping update was successful.I think we need to do more work here (the fact that a single slow node can prevent those mappings updates from being acked and thus fail operations is bad), but I want to keep this as small as I can (it is already too big).
Note that this needs to go to 5.x but I'm not sure how cleanly it will back port. I'll evaluate once this has been reviewed and put into 7.0 & 6.x