Skip to content

Commit

Permalink
Core: simultaneous create/delete against same id can cause silently i…
Browse files Browse the repository at this point in the history
…nconsistent replica

If simultaneous create & delete operations arrive against the same id,
it's possible that primary and replica see those operations in
different orders, which may result in replica throwing
DocumentAlreadyExistsException when the primary didn't which would
lead to replica being inconsistent (missing a document that primary
had indexed).

This push fixes the issue, by never throwing DAEE from the replica on
create.

Closes #7146 #7142
  • Loading branch information
mikemccand committed Aug 4, 2014
1 parent 8989d06 commit a58d9a1
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 20 deletions.
Expand Up @@ -46,7 +46,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.service.IndexShard;
Expand Down Expand Up @@ -168,14 +167,10 @@ boolean ignoreReplicaException(Throwable e) {
}
Throwable cause = ExceptionsHelper.unwrapCause(e);
// on version conflict or document missing, it means
// that a news change has crept into the replica, and its fine
// that a new change has crept into the replica, and it's fine
if (cause instanceof VersionConflictEngineException) {
return true;
}
// same here
if (cause instanceof DocumentAlreadyExistsException) {
return true;
}
return false;
}

Expand Down
Expand Up @@ -445,30 +445,36 @@ private void innerCreateNoLock(Create create, IndexWriter writer, long currentVe
}
updatedVersion = create.versionType().updateVersion(currentVersion, expectedVersion);

// if the doc does not exist or it exists but is not deleted
if (versionValue != null) {
if (!versionValue.delete()) {
if (create.origin() == Operation.Origin.RECOVERY) {
return;
} else {
throw new DocumentAlreadyExistsException(shardId, create.type(), create.id());
}
}
} else if (currentVersion != Versions.NOT_FOUND) {
// its not deleted, its already there
// if the doc exists
boolean doUpdate = false;
if ((versionValue != null && versionValue.delete() == false) || (versionValue == null && currentVersion != Versions.NOT_FOUND)) {
if (create.origin() == Operation.Origin.RECOVERY) {
return;
} else if (create.origin() == Operation.Origin.REPLICA) {
// #7142: the primary already determined it's OK to index this document, and we confirmed above that the version doesn't
// conflict, so we must also update here on the replica to remain consistent:
doUpdate = true;
} else {
// On primary, we throw DAEE if the _uid is already in the index with an older version:
assert create.origin() == Operation.Origin.PRIMARY;
throw new DocumentAlreadyExistsException(shardId, create.type(), create.id());
}
}

create.updateVersion(updatedVersion);

if (create.docs().size() > 1) {
writer.addDocuments(create.docs(), create.analyzer());
if (doUpdate) {
if (create.docs().size() > 1) {
writer.updateDocuments(create.uid(), create.docs(), create.analyzer());
} else {
writer.updateDocument(create.uid(), create.docs().get(0), create.analyzer());
}
} else {
writer.addDocument(create.docs().get(0), create.analyzer());
if (create.docs().size() > 1) {
writer.addDocuments(create.docs(), create.analyzer());
} else {
writer.addDocument(create.docs().get(0), create.analyzer());
}
}
Translog.Location translogLocation = translog.add(new Translog.Create(create));

Expand Down

0 comments on commit a58d9a1

Please sign in to comment.