Skip to content

Commit

Permalink
Rename seq# powered OCC parameters to ifSeqNo/ifPrimaryTerm
Browse files Browse the repository at this point in the history
  • Loading branch information
seut authored and mergify[bot] committed Oct 25, 2019
1 parent d081048 commit 4810705
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 68 deletions.
Expand Up @@ -1114,23 +1114,23 @@ public static class Index extends Operation {
private final ParsedDocument doc;
private final long autoGeneratedIdTimestamp;
private final boolean isRetry;
private final long ifSeqNoMatch;
private final long ifPrimaryTermMatch;
private final long ifSeqNo;
private final long ifPrimaryTerm;

public Index(Term uid, ParsedDocument doc, long seqNo, long primaryTerm, long version, VersionType versionType, Origin origin,
long startTime, long autoGeneratedIdTimestamp, boolean isRetry, long ifSeqNoMatch, long ifPrimaryTermMatch) {
long startTime, long autoGeneratedIdTimestamp, boolean isRetry, long ifSeqNo, long ifPrimaryTerm) {
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin;
assert ifPrimaryTermMatch >= 0 : "ifPrimaryTermMatch [" + ifPrimaryTermMatch + "] must be non negative";
assert ifSeqNoMatch == SequenceNumbers.UNASSIGNED_SEQ_NO || ifSeqNoMatch >=0 :
"ifSeqNoMatch [" + ifSeqNoMatch + "] must be non negative or unset";
assert (origin == Origin.PRIMARY) || (ifSeqNoMatch == SequenceNumbers.UNASSIGNED_SEQ_NO && ifPrimaryTermMatch == 0) :
assert ifPrimaryTerm >= 0 : "ifPrimaryTerm [" + ifPrimaryTerm + "] must be non negative";
assert ifSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || ifSeqNo >= 0 :
"ifSeqNo [" + ifSeqNo + "] must be non negative or unset";
assert (origin == Origin.PRIMARY) || (ifSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO && ifPrimaryTerm == 0) :
"cas operations are only allowed if origin is primary. get [" + origin + "]";
this.doc = doc;
this.isRetry = isRetry;
this.autoGeneratedIdTimestamp = autoGeneratedIdTimestamp;
this.ifSeqNoMatch = ifSeqNoMatch;
this.ifPrimaryTermMatch = ifPrimaryTermMatch;
this.ifSeqNo = ifSeqNo;
this.ifPrimaryTerm = ifPrimaryTerm;
}

public ParsedDocument parsedDoc() {
Expand Down Expand Up @@ -1186,35 +1186,35 @@ public boolean isRetry() {
return isRetry;
}

public long getIfSeqNoMatch() {
return ifSeqNoMatch;
public long getIfSeqNo() {
return ifSeqNo;
}

public long getIfPrimaryTermMatch() {
return ifPrimaryTermMatch;
public long getIfPrimaryTerm() {
return ifPrimaryTerm;
}
}

public static class Delete extends Operation {

private final String type;
private final String id;
private final long ifSeqNoMatch;
private final long ifPrimaryTermMatch;
private final long ifSeqNo;
private final long ifPrimaryTerm;

public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, VersionType versionType,
Origin origin, long startTime, long ifSeqNoMatch, long ifPrimaryTermMatch) {
Origin origin, long startTime, long ifSeqNo, long ifPrimaryTerm) {
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin;
assert ifPrimaryTermMatch >= 0 : "ifPrimaryTermMatch [" + ifPrimaryTermMatch + "] must be non negative";
assert ifSeqNoMatch == SequenceNumbers.UNASSIGNED_SEQ_NO || ifSeqNoMatch >=0 :
"ifSeqNoMatch [" + ifSeqNoMatch + "] must be non negative or unset";
assert (origin == Origin.PRIMARY) || (ifSeqNoMatch == SequenceNumbers.UNASSIGNED_SEQ_NO && ifPrimaryTermMatch == 0) :
assert ifPrimaryTerm >= 0 : "ifPrimaryTerm [" + ifPrimaryTerm + "] must be non negative";
assert ifSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || ifSeqNo >= 0 :
"ifSeqNo [" + ifSeqNo + "] must be non negative or unset";
assert (origin == Origin.PRIMARY) || (ifSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO && ifPrimaryTerm == 0) :
"cas operations are only allowed if origin is primary. get [" + origin + "]";
this.type = Objects.requireNonNull(type);
this.id = Objects.requireNonNull(id);
this.ifSeqNoMatch = ifSeqNoMatch;
this.ifPrimaryTermMatch = ifPrimaryTermMatch;
this.ifSeqNo = ifSeqNo;
this.ifPrimaryTerm = ifPrimaryTerm;
}

@Override
Expand All @@ -1237,12 +1237,12 @@ public int estimatedSizeInBytes() {
return (uid().field().length() + uid().text().length()) * 2 + 20;
}

public long getIfSeqNoMatch() {
return ifSeqNoMatch;
public long getIfSeqNo() {
return ifSeqNo;
}

public long getIfPrimaryTermMatch() {
return ifPrimaryTermMatch;
public long getIfPrimaryTerm() {
return ifPrimaryTerm;
}
}

Expand Down
Expand Up @@ -982,7 +982,7 @@ protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOExc
versionMap.enforceSafeAccess();
// resolves incoming version
final VersionValue versionValue =
resolveDocVersion(index, index.getIfSeqNoMatch() != SequenceNumbers.UNASSIGNED_SEQ_NO);
resolveDocVersion(index, index.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO);
final long currentVersion;
final boolean currentNotFoundOrDeleted;
if (versionValue == null) {
Expand All @@ -992,24 +992,24 @@ protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOExc
currentVersion = versionValue.version;
currentNotFoundOrDeleted = versionValue.isDelete();
}
if (index.getIfSeqNoMatch() != SequenceNumbers.UNASSIGNED_SEQ_NO && versionValue == null) {
if (index.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && versionValue == null) {
final VersionConflictEngineException e = new VersionConflictEngineException(
shardId,
index.id(),
index.getIfSeqNoMatch(),
index.getIfPrimaryTermMatch(),
index.getIfSeqNo(),
index.getIfPrimaryTerm(),
SequenceNumbers.UNASSIGNED_SEQ_NO,
0
);
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion, getPrimaryTerm());
} else if (versionValue != null && index.getIfSeqNoMatch() != SequenceNumbers.UNASSIGNED_SEQ_NO && (
versionValue.seqNo != index.getIfSeqNoMatch() || versionValue.term != index.getIfPrimaryTermMatch()
} else if (versionValue != null && index.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && (
versionValue.seqNo != index.getIfSeqNo() || versionValue.term != index.getIfPrimaryTerm()
)) {
final VersionConflictEngineException e = new VersionConflictEngineException(
shardId,
index.id(),
index.getIfSeqNoMatch(),
index.getIfPrimaryTermMatch(),
index.getIfSeqNo(),
index.getIfPrimaryTerm(),
versionValue.seqNo,
versionValue.term
);
Expand Down Expand Up @@ -1328,7 +1328,7 @@ protected final DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOE
assert delete.origin() == Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin();
assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized";
// resolve operation from external to internal
final VersionValue versionValue = resolveDocVersion(delete, delete.getIfSeqNoMatch() != SequenceNumbers.UNASSIGNED_SEQ_NO);
final VersionValue versionValue = resolveDocVersion(delete, delete.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO);
assert incrementVersionLookup();
final long currentVersion;
final boolean currentlyDeleted;
Expand All @@ -1340,24 +1340,24 @@ protected final DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOE
currentlyDeleted = versionValue.isDelete();
}
final DeletionStrategy plan;
if (delete.getIfSeqNoMatch() != SequenceNumbers.UNASSIGNED_SEQ_NO && versionValue == null) {
if (delete.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && versionValue == null) {
final VersionConflictEngineException e = new VersionConflictEngineException(
shardId,
delete.id(),
delete.getIfSeqNoMatch(),
delete.getIfPrimaryTermMatch(),
delete.getIfSeqNo(),
delete.getIfPrimaryTerm(),
SequenceNumbers.UNASSIGNED_SEQ_NO,
0
);
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, getPrimaryTerm(), currentlyDeleted);
} else if (versionValue != null && delete.getIfSeqNoMatch() != SequenceNumbers.UNASSIGNED_SEQ_NO && (
versionValue.seqNo != delete.getIfSeqNoMatch() || versionValue.term != delete.getIfPrimaryTermMatch()
} else if (versionValue != null && delete.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && (
versionValue.seqNo != delete.getIfSeqNo() || versionValue.term != delete.getIfPrimaryTerm()
)) {
final VersionConflictEngineException e = new VersionConflictEngineException(
shardId,
delete.id(),
delete.getIfSeqNoMatch(),
delete.getIfPrimaryTermMatch(),
delete.getIfSeqNo(),
delete.getIfPrimaryTerm(),
versionValue.seqNo,
versionValue.term
);
Expand Down
Expand Up @@ -598,13 +598,13 @@ private IndexShardState changeState(IndexShardState newState, String reason) {
public Engine.IndexResult applyIndexOperationOnPrimary(long version,
VersionType versionType,
SourceToParse sourceToParse,
long ifSeqNoMatch,
long ifPrimaryTermMatch,
long ifSeqNo,
long ifPrimaryTerm,
long autoGeneratedTimestamp,
boolean isRetry)
throws IOException {
return applyIndexOperation(getEngine(), UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, versionType, ifSeqNoMatch,
ifPrimaryTermMatch, autoGeneratedTimestamp, isRetry, Engine.Operation.Origin.PRIMARY, sourceToParse);
return applyIndexOperation(getEngine(), UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, versionType, ifSeqNo,
ifPrimaryTerm, autoGeneratedTimestamp, isRetry, Engine.Operation.Origin.PRIMARY, sourceToParse);
}

public Engine.IndexResult applyIndexOperationOnReplica(long seqNo,
Expand All @@ -622,8 +622,8 @@ private Engine.IndexResult applyIndexOperation(Engine engine,
long opPrimaryTerm,
long version,
@Nullable VersionType versionType,
long ifSeqNoMatch,
long ifPrimaryTermMatch,
long ifSeqNo,
long ifPrimaryTerm,
long autoGeneratedTimeStamp,
boolean isRetry,
Engine.Operation.Origin origin,
Expand All @@ -643,8 +643,8 @@ private Engine.IndexResult applyIndexOperation(Engine engine,
origin,
autoGeneratedTimeStamp,
isRetry,
ifSeqNoMatch,
ifPrimaryTermMatch
ifSeqNo,
ifPrimaryTerm
);
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
Expand All @@ -671,13 +671,13 @@ public static Engine.Index prepareIndex(DocumentMapper docMapper,
Engine.Operation.Origin origin,
long autoGeneratedIdTimestamp,
boolean isRetry,
long ifSeqNoMatch,
long ifPrimaryTermMatch) {
long ifSeqNo,
long ifPrimaryTerm) {
long startTime = System.nanoTime();
ParsedDocument doc = docMapper.parse(source);
Term uid = new Term(IdFieldMapper.NAME, Uid.encodeId(doc.id()));
return new Engine.Index(uid, doc, seqNo, primaryTerm, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry,
ifSeqNoMatch, ifPrimaryTermMatch);
ifSeqNo, ifPrimaryTerm);
}

private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOException {
Expand Down Expand Up @@ -760,11 +760,11 @@ public Engine.DeleteResult applyDeleteOperationOnPrimary(long version,
String type,
String id,
VersionType versionType,
long ifSeqNoMatch,
long ifPrimaryTermMatch)
long ifSeqNo,
long ifPrimaryTerm)
throws IOException {
return applyDeleteOperation(getEngine(), UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, type, id, versionType,
ifSeqNoMatch, ifPrimaryTermMatch, Engine.Operation.Origin.PRIMARY);
ifSeqNo, ifPrimaryTerm, Engine.Operation.Origin.PRIMARY);
}

public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo,
Expand All @@ -782,8 +782,8 @@ private Engine.DeleteResult applyDeleteOperation(Engine engine,
String type,
String id,
@Nullable VersionType versionType,
long ifSeqNoMatch,
long ifPrimaryTermMatch,
long ifSeqNo,
long ifPrimaryTerm,
Engine.Operation.Origin origin) throws IOException {
assert opPrimaryTerm <= this.operationPrimaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.operationPrimaryTerm
+ "]";
Expand All @@ -805,8 +805,8 @@ private Engine.DeleteResult applyDeleteOperation(Engine engine,
version,
versionType,
origin,
ifSeqNoMatch,
ifPrimaryTermMatch
ifSeqNo,
ifPrimaryTerm
);
return delete(engine, delete);
}
Expand All @@ -819,11 +819,11 @@ private Engine.Delete prepareDelete(String type,
long version,
VersionType versionType,
Engine.Operation.Origin origin,
long ifSeqNoMatch,
long ifPrimaryTermMatch) {
long ifSeqNo,
long ifPrimaryTerm) {
long startTime = System.nanoTime();
return new Engine.Delete(type, id, uid, seqNo, primaryTerm, version, versionType, origin, startTime,
ifSeqNoMatch, ifPrimaryTermMatch);
ifSeqNo, ifPrimaryTerm);
}

private Term extractUidForDelete(String type, String id) {
Expand Down
Expand Up @@ -69,7 +69,6 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -1612,8 +1611,8 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops,
index.startTime(),
index.getAutoGeneratedIdTimestamp(),
index.isRetry(),
index.getIfSeqNoMatch(),
index.getIfPrimaryTermMatch());
index.getIfSeqNo(),
index.getIfPrimaryTerm());
Function<Engine.Delete, Engine.Delete> deleteWithCurrentTerm = delete -> new Engine.Delete(
delete.type(),
delete.id(),
Expand All @@ -1624,8 +1623,8 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops,
delete.versionType(),
delete.origin(),
delete.startTime(),
delete.getIfSeqNoMatch(),
delete.getIfPrimaryTermMatch());
delete.getIfSeqNo(),
delete.getIfPrimaryTerm());
for (Engine.Operation op : ops) {
final boolean versionConflict = rarely();
final boolean versionedOp = versionConflict || randomBoolean();
Expand Down

0 comments on commit 4810705

Please sign in to comment.