Skip to content

Commit

Permalink
Always use primary term of operation in InternalEngine (#45083)
Browse files Browse the repository at this point in the history
We keep adding the current primary term to operations for which we do not assign a sequence
number. This does not make sense anymore as all operations which we care about have
sequence numbers now. The goal of this commit is to clean things up in InternalEngine and
reduce the complexity.
  • Loading branch information
ywelsch committed Aug 1, 2019
1 parent dec76d9 commit 48d3119
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ static boolean executeBulkItemRequest(BulkPrimaryExecutionContext context, Updat
// we may fail translating a update to index or delete operation
// we use index result to communicate failure while translating update request
final Engine.Result result =
new Engine.IndexResult(failure, updateRequest.version(), SequenceNumbers.UNASSIGNED_SEQ_NO);
new Engine.IndexResult(failure, updateRequest.version());
context.setRequestToExecute(updateRequest);
context.markOperationAsExecuted(result);
context.markAsCompleted(context.getExecutionResult());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,8 +518,8 @@ public IndexResult(long version, long term, long seqNo, boolean created) {
/**
* use in case of the index operation failed before getting to internal engine
**/
public IndexResult(Exception failure, long version, long term) {
this(failure, version, term, UNASSIGNED_SEQ_NO);
public IndexResult(Exception failure, long version) {
this(failure, version, UNASSIGNED_PRIMARY_TERM, UNASSIGNED_SEQ_NO);
}

public IndexResult(Exception failure, long version, long term, long seqNo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -828,10 +828,6 @@ long doGenerateSeqNoForOperation(final Operation operation) {
return localCheckpointTracker.generateSeqNo();
}

private long getPrimaryTerm() {
return engineConfig.getPrimaryTermSupplier().getAsLong();
}

@Override
public IndexResult index(Index index) throws IOException {
assert Objects.equals(index.uid().field(), IdFieldMapper.NAME) : index.uid().field();
Expand Down Expand Up @@ -895,7 +891,7 @@ public IndexResult index(Index index) throws IOException {
indexResult = indexIntoLucene(index, plan);
} else {
indexResult = new IndexResult(
plan.versionForIndexing, getPrimaryTerm(), index.seqNo(), plan.currentNotFoundOrDeleted);
plan.versionForIndexing, index.primaryTerm(), index.seqNo(), plan.currentNotFoundOrDeleted);
}
}
if (index.origin().isFromTranslog() == false) {
Expand Down Expand Up @@ -1013,19 +1009,20 @@ private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
}
if (index.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && versionValue == null) {
final VersionConflictEngineException e = new VersionConflictEngineException(shardId, index.id(),
index.getIfSeqNo(), index.getIfPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
plan = IndexingStrategy.skipDueToVersionConflict(e, true, currentVersion, getPrimaryTerm());
index.getIfSeqNo(), index.getIfPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
plan = IndexingStrategy.skipDueToVersionConflict(e, true, currentVersion);
} else if (index.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && (
versionValue.seqNo != index.getIfSeqNo() || versionValue.term != index.getIfPrimaryTerm()
)) {
final VersionConflictEngineException e = new VersionConflictEngineException(shardId, index.id(),
index.getIfSeqNo(), index.getIfPrimaryTerm(), versionValue.seqNo, versionValue.term);
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion, getPrimaryTerm());
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion);
} else if (index.versionType().isVersionConflictForWrites(
currentVersion, index.version(), currentNotFoundOrDeleted)) {
final VersionConflictEngineException e =
new VersionConflictEngineException(shardId, index, currentVersion, currentNotFoundOrDeleted);
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion, getPrimaryTerm());
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion);
} else {
plan = IndexingStrategy.processNormally(currentNotFoundOrDeleted,
index.versionType().updateVersion(currentVersion, index.version())
Expand Down Expand Up @@ -1167,8 +1164,8 @@ static IndexingStrategy optimizedAppendOnly(long versionForIndexing) {
}

public static IndexingStrategy skipDueToVersionConflict(
VersionConflictEngineException e, boolean currentNotFoundOrDeleted, long currentVersion, long term) {
final IndexResult result = new IndexResult(e, currentVersion, term);
VersionConflictEngineException e, boolean currentNotFoundOrDeleted, long currentVersion) {
final IndexResult result = new IndexResult(e, currentVersion);
return new IndexingStrategy(
currentNotFoundOrDeleted, false, false, false,
Versions.NOT_FOUND, result);
Expand Down Expand Up @@ -1269,7 +1266,7 @@ public DeleteResult delete(Delete delete) throws IOException {
deleteResult = deleteInLucene(delete, plan);
} else {
deleteResult = new DeleteResult(
plan.versionOfDeletion, getPrimaryTerm(), delete.seqNo(), plan.currentlyDeleted == false);
plan.versionOfDeletion, delete.primaryTerm(), delete.seqNo(), plan.currentlyDeleted == false);
}
}
if (delete.origin().isFromTranslog() == false && deleteResult.getResultType() == Result.Type.SUCCESS) {
Expand Down Expand Up @@ -1350,17 +1347,17 @@ private DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException
final DeletionStrategy plan;
if (delete.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && versionValue == null) {
final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete.id(),
delete.getIfSeqNo(), delete.getIfPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, getPrimaryTerm(), true);
delete.getIfSeqNo(), delete.getIfPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, true);
} else if (delete.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && (
versionValue.seqNo != delete.getIfSeqNo() || versionValue.term != delete.getIfPrimaryTerm()
)) {
final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete.id(),
delete.getIfSeqNo(), delete.getIfPrimaryTerm(), versionValue.seqNo, versionValue.term);
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, getPrimaryTerm(), currentlyDeleted);
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted);
} else if (delete.versionType().isVersionConflictForWrites(currentVersion, delete.version(), currentlyDeleted)) {
final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete, currentVersion, currentlyDeleted);
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, getPrimaryTerm(), currentlyDeleted);
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted);
} else {
plan = DeletionStrategy.processNormally(currentlyDeleted, delete.versionType().updateVersion(currentVersion, delete.version()));
}
Expand Down Expand Up @@ -1396,7 +1393,7 @@ private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) throws
engineConfig.getThreadPool().relativeTimeInMillis()));
}
return new DeleteResult(
plan.versionOfDeletion, getPrimaryTerm(), delete.seqNo(), plan.currentlyDeleted == false);
plan.versionOfDeletion, delete.primaryTerm(), delete.seqNo(), plan.currentlyDeleted == false);
} catch (Exception ex) {
if (ex instanceof AlreadyClosedException == false && indexWriter.getTragicException() == null) {
throw new AssertionError("delete operation should never fail at document level", ex);
Expand Down Expand Up @@ -1428,9 +1425,9 @@ private DeletionStrategy(boolean deleteFromLucene, boolean addStaleOpToLucene, b
}

public static DeletionStrategy skipDueToVersionConflict(
VersionConflictEngineException e, long currentVersion, long term, boolean currentlyDeleted) {
final long unassignedSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
final DeleteResult deleteResult = new DeleteResult(e, currentVersion, term, unassignedSeqNo, currentlyDeleted == false);
VersionConflictEngineException e, long currentVersion, boolean currentlyDeleted) {
final DeleteResult deleteResult = new DeleteResult(e, currentVersion, SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
SequenceNumbers.UNASSIGNED_SEQ_NO, currentlyDeleted == false);
return new DeletionStrategy(false, false, currentlyDeleted, Versions.NOT_FOUND, deleteResult);
}

Expand Down Expand Up @@ -1483,7 +1480,8 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException {
final NoOpResult noOpResult;
final Optional<Exception> preFlightError = preFlightCheckForNoOp(noOp);
if (preFlightError.isPresent()) {
noOpResult = new NoOpResult(getPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, preFlightError.get());
noOpResult = new NoOpResult(SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
SequenceNumbers.UNASSIGNED_SEQ_NO, preFlightError.get());
} else {
markSeqNoAsSeen(noOp.seqNo());
if (softDeleteEnabled && hasBeenProcessedBefore(noOp) == false) {
Expand All @@ -1507,7 +1505,7 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException {
throw ex;
}
}
noOpResult = new NoOpResult(getPrimaryTerm(), noOp.seqNo());
noOpResult = new NoOpResult(noOp.primaryTerm(), noOp.seqNo());
if (noOp.origin().isFromTranslog() == false && noOpResult.getResultType() == Result.Type.SUCCESS) {
final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
noOpResult.setTranslogLocation(location);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,7 @@ private Engine.NoOpResult noOp(Engine engine, Engine.NoOp noOp) throws IOExcepti
}

public Engine.IndexResult getFailedIndexResult(Exception e, long version) {
return new Engine.IndexResult(e, version, getOperationPrimaryTerm());
return new Engine.IndexResult(e, version);
}

public Engine.DeleteResult getFailedDeleteResult(Exception e, long version) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void testAbortedSkipped() {
visitedRequests.add(context.getCurrent());
context.setRequestToExecute(context.getCurrent());
// using failures prevents caring about types
context.markOperationAsExecuted(new Engine.IndexResult(new ElasticsearchException("bla"), 1, 1));
context.markOperationAsExecuted(new Engine.IndexResult(new ElasticsearchException("bla"), 1));
context.markAsCompleted(context.getExecutionResult());
}

Expand Down Expand Up @@ -122,7 +122,7 @@ public void testTranslogLocation() {
case CREATE:
context.setRequestToExecute(current);
if (failure) {
result = new Engine.IndexResult(new ElasticsearchException("bla"), 1, 1);
result = new Engine.IndexResult(new ElasticsearchException("bla"), 1);
} else {
result = new FakeIndexResult(1, 1, randomLongBetween(0, 200), randomBoolean(), location);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,7 @@ public void testRetries() throws Exception {

Exception err = new VersionConflictEngineException(shardId, "id",
"I'm conflicted <(;_;)>");
Engine.IndexResult conflictedResult = new Engine.IndexResult(err, 0, 0);
Engine.IndexResult conflictedResult = new Engine.IndexResult(err, 0);
Engine.IndexResult mappingUpdate =
new Engine.IndexResult(new Mapping(null, null, new MetadataFieldMapper[0], Collections.emptyMap()));
Translog.Location resultLocation = new Translog.Location(42, 42, 42);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Ind
*/
final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException(
shardId, index.seqNo(), lookupPrimaryTerm(index.seqNo()));
return IndexingStrategy.skipDueToVersionConflict(error, false, index.version(), index.primaryTerm());
return IndexingStrategy.skipDueToVersionConflict(error, false, index.version());
} else {
return planIndexingAsNonPrimary(index);
}
Expand All @@ -90,7 +90,7 @@ protected InternalEngine.DeletionStrategy deletionStrategyForOperation(final Del
// See the comment in #indexingStrategyForOperation for the explanation why we can safely skip this operation.
final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException(
shardId, delete.seqNo(), lookupPrimaryTerm(delete.seqNo()));
return DeletionStrategy.skipDueToVersionConflict(error, delete.version(), delete.primaryTerm(), false);
return DeletionStrategy.skipDueToVersionConflict(error, delete.version(), false);
} else {
return planDeletionAsNonPrimary(delete);
}
Expand Down

0 comments on commit 48d3119

Please sign in to comment.