Skip to content

Commit

Permalink
Add seq no powered optimistic locking support to the index and delete…
Browse files Browse the repository at this point in the history
… transport actions (#36619)

This commit add support for using sequence numbers to power [optimistic concurrency control](http://en.wikipedia.org/wiki/Optimistic_concurrency_control)
in the delete and index transport actions and requests. A follow up will come with adding sequence
numbers to the update and get results.

Relates #36148
Relates #10708
  • Loading branch information
bleskes committed Dec 18, 2018
1 parent eaec994 commit 7a6a79d
Show file tree
Hide file tree
Showing 20 changed files with 298 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.mapper.TextFieldMapper;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.termvectors.TermVectorsService;
import org.elasticsearch.indices.IndicesService;
Expand Down Expand Up @@ -135,7 +136,7 @@ public void testAnnotationInjection() throws IOException {

IndexShard shard = indexService.getShard(0);
shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL,
sourceToParse, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
sourceToParse, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
shard.refresh("test");
try (Engine.Searcher searcher = shard.acquireSearcher("test")) {
LeafReader leaf = searcher.getDirectoryReader().leaves().get(0).reader();
Expand Down Expand Up @@ -190,7 +191,7 @@ public void testToleranceForBadAnnotationMarkup() throws IOException {

IndexShard shard = indexService.getShard(0);
shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL,
sourceToParse, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
sourceToParse, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
shard.refresh("test");
try (Engine.Searcher searcher = shard.acquireSearcher("test")) {
LeafReader leaf = searcher.getDirectoryReader().leaves().get(0).reader();
Expand Down Expand Up @@ -389,7 +390,7 @@ public void testDefaultPositionIncrementGap() throws IOException {

IndexShard shard = indexService.getShard(0);
shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL,
sourceToParse, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
sourceToParse, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
shard.refresh("test");
try (Engine.Searcher searcher = shard.acquireSearcher("test")) {
LeafReader leaf = searcher.getDirectoryReader().leaves().get(0).reader();
Expand Down Expand Up @@ -431,7 +432,7 @@ public void testPositionIncrementGap() throws IOException {

IndexShard shard = indexService.getShard(0);
shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL,
sourceToParse, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
sourceToParse, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
shard.refresh("test");
try (Engine.Searcher searcher = shard.acquireSearcher("test")) {
LeafReader leaf = searcher.getDirectoryReader().leaves().get(0).reader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;

import java.io.IOException;
Expand Down Expand Up @@ -83,6 +84,8 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
private static final ParseField PIPELINE = new ParseField("pipeline");
private static final ParseField FIELDS = new ParseField("fields");
private static final ParseField SOURCE = new ParseField("_source");
private static final ParseField IF_SEQ_NO_MATCH = new ParseField("if_seq_no_match");
private static final ParseField IF_PRIMARY_TERM_MATCH = new ParseField("if_primary_term_match");

/**
* Requests that are part of this request. It is only possible to add things that are both {@link ActionRequest}s and
Expand Down Expand Up @@ -354,6 +357,8 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null
String opType = null;
long version = Versions.MATCH_ANY;
VersionType versionType = VersionType.INTERNAL;
long ifSeqNoMatch = SequenceNumbers.UNASSIGNED_SEQ_NO;
long ifPrimaryTermMatch = 0;
int retryOnConflict = 0;
String pipeline = valueOrDefault(defaultPipeline, globalPipeline);

Expand Down Expand Up @@ -386,6 +391,10 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null
version = parser.longValue();
} else if (VERSION_TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
versionType = VersionType.fromString(parser.text());
} else if (IF_SEQ_NO_MATCH.match(currentFieldName, parser.getDeprecationHandler())) {
ifSeqNoMatch = parser.longValue();
} else if (IF_PRIMARY_TERM_MATCH.match(currentFieldName, parser.getDeprecationHandler())) {
ifPrimaryTermMatch = parser.longValue();
} else if (RETRY_ON_CONFLICT.match(currentFieldName, parser.getDeprecationHandler())) {
retryOnConflict = parser.intValue();
} else if (PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) {
Expand Down Expand Up @@ -423,7 +432,7 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null

if ("delete".equals(action)) {
add(new DeleteRequest(index, type, id).routing(routing).parent(parent).version(version)
.versionType(versionType), payload);
.versionType(versionType).setIfMatch(ifSeqNoMatch, ifPrimaryTermMatch), payload);
} else {
nextMarker = findNextMarker(marker, from, data, length);
if (nextMarker == -1) {
Expand All @@ -436,16 +445,17 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null
if ("index".equals(action)) {
if (opType == null) {
internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).version(version)
.versionType(versionType).setPipeline(pipeline)
.versionType(versionType).setPipeline(pipeline).ifMatch(ifSeqNoMatch, ifPrimaryTermMatch)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker,xContentType), xContentType), payload);
} else {
internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).version(version)
.versionType(versionType).create("create".equals(opType)).setPipeline(pipeline)
.ifMatch(ifSeqNoMatch, ifPrimaryTermMatch)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType), payload);
}
} else if ("create".equals(action)) {
internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).version(version)
.versionType(versionType).create(true).setPipeline(pipeline)
.versionType(versionType).create(true).setPipeline(pipeline).ifMatch(ifSeqNoMatch, ifPrimaryTermMatch)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType), payload);
} else if ("update".equals(action)) {
UpdateRequest updateRequest = new UpdateRequest(index, type, id).routing(routing).parent(parent)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

package org.elasticsearch.action.bulk;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.DocWriteRequest;
Expand Down Expand Up @@ -459,7 +459,7 @@ private static void executeIndexRequestOnPrimary(BulkPrimaryExecutionContext con
executeOnPrimaryWhileHandlingMappingUpdates(context,
() ->
primary.applyIndexOperationOnPrimary(request.version(), request.versionType(), sourceToParse,
request.getAutoGeneratedTimestamp(), request.isRetry()),
request.ifSeqNoMatch(), request.ifPrimaryTermMatch(), request.getAutoGeneratedTimestamp(), request.isRetry()),
e -> primary.getFailedIndexResult(e, request.version()),
context::markOperationAsExecuted,
mapping -> mappingUpdater.updateMappings(mapping, primary.shardId(), request.type()));
Expand All @@ -470,7 +470,8 @@ private static void executeDeleteRequestOnPrimary(BulkPrimaryExecutionContext co
final DeleteRequest request = context.getRequestToExecute();
final IndexShard primary = context.getPrimary();
executeOnPrimaryWhileHandlingMappingUpdates(context,
() -> primary.applyDeleteOperationOnPrimary(request.version(), request.type(), request.id(), request.versionType()),
() -> primary.applyDeleteOperationOnPrimary(request.version(), request.type(), request.id(), request.versionType(),
request.ifSeqNoMatch(), request.ifPrimaryTermMatch()),
e -> primary.getFailedDeleteResult(e, request.version()),
context::markOperationAsExecuted,
mapping -> mappingUpdater.updateMappings(mapping, primary.shardId(), request.type()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.delete;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.CompositeIndicesRequest;
import org.elasticsearch.action.DocWriteRequest;
Expand All @@ -29,6 +30,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;
Expand Down Expand Up @@ -57,6 +59,8 @@ public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
private String parent;
private long version = Versions.MATCH_ANY;
private VersionType versionType = VersionType.INTERNAL;
private long ifSeqNoMatch = SequenceNumbers.UNASSIGNED_SEQ_NO;
private long ifPrimaryTermMatch = 0;

public DeleteRequest() {
}
Expand Down Expand Up @@ -98,6 +102,12 @@ public ActionRequestValidationException validate() {
if (versionType == VersionType.FORCE) {
validationException = addValidationError("version type [force] may no longer be used", validationException);
}

if (ifSeqNoMatch != SequenceNumbers.UNASSIGNED_SEQ_NO && (
versionType != VersionType.INTERNAL || version != Versions.MATCH_ANY
)) {
validationException = addValidationError("compare and write operations can not use versioning", validationException);
}
return validationException;
}

Expand Down Expand Up @@ -190,6 +200,32 @@ public DeleteRequest versionType(VersionType versionType) {
return this;
}

public long ifSeqNoMatch() {
return ifSeqNoMatch;
}

public long ifPrimaryTermMatch() {
return ifPrimaryTermMatch;
}

public DeleteRequest setIfMatch(long seqNo, long term) {
if (term == 0 && seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
throw new IllegalArgumentException("seqNo is set, but primary term is [0]");
}
if (term != 0 && seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) {
throw new IllegalArgumentException("seqNo is unassigned, but primary term is [" + term + "]");
}
if (seqNo < 0 && seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
throw new IllegalArgumentException("sequence numbers must be non negative. got [" + seqNo + "].");
}
if (term < 0) {
throw new IllegalArgumentException("primary term must be non negative. got [" + term + "]");
}
ifSeqNoMatch = seqNo;
ifPrimaryTermMatch = term;
return this;
}

@Override
public VersionType versionType() {
return this.versionType;
Expand All @@ -209,6 +245,13 @@ public void readFrom(StreamInput in) throws IOException {
parent = in.readOptionalString();
version = in.readLong();
versionType = VersionType.fromValue(in.readByte());
if (in.getVersion().onOrAfter(Version.V_6_0_0)) {
ifSeqNoMatch = in.readZLong();
ifPrimaryTermMatch = in.readVLong();
} else {
ifSeqNoMatch = SequenceNumbers.UNASSIGNED_SEQ_NO;
ifPrimaryTermMatch = 0;
}
}

@Override
Expand All @@ -220,6 +263,15 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(parent());
out.writeLong(version);
out.writeByte(versionType.getValue());
if (out.getVersion().onOrAfter(Version.V_6_0_0)) {
out.writeZLong(ifSeqNoMatch);
out.writeVLong(ifPrimaryTermMatch);
} else if (ifSeqNoMatch != SequenceNumbers.UNASSIGNED_SEQ_NO || ifPrimaryTermMatch != 0) {
assert false : "setIfMatch [" + ifSeqNoMatch + "], currentDocTem [" + ifPrimaryTermMatch + "]";
throw new IllegalStateException(
"sequence number based compare and write is not supported until all nodes are on version 7.0 or higher. " +
"Stream version [" + out.getVersion() + "]");
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,14 @@ public DeleteRequestBuilder setVersionType(VersionType versionType) {
request.versionType(versionType);
return this;
}

/**
* only performs this delete request if the document was last modification was assigned the given
* sequence number and primary term
*/
public DeleteRequestBuilder setIfMatch(long seqNo, long term) {
request.setIfMatch(seqNo, term);
return this;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;
Expand Down Expand Up @@ -106,6 +107,8 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
private long autoGeneratedTimestamp = UNSET_AUTO_GENERATED_TIMESTAMP;

private boolean isRetry = false;
private long ifSeqNoMatch = SequenceNumbers.UNASSIGNED_SEQ_NO;
private long ifPrimaryTermMatch = 0;


public IndexRequest() {
Expand Down Expand Up @@ -166,6 +169,12 @@ public ActionRequestValidationException validate() {
validationException);
return validationException;
}

if (ifSeqNoMatch != SequenceNumbers.UNASSIGNED_SEQ_NO || ifPrimaryTermMatch != 0) {
validationException = addValidationError("create operations do not support compare and set. use index instead",
validationException);
return validationException;
}
}

if (opType() != OpType.INDEX && id == null) {
Expand Down Expand Up @@ -194,6 +203,12 @@ public ActionRequestValidationException validate() {
validationException = addValidationError("pipeline cannot be an empty string", validationException);
}

if (ifSeqNoMatch != SequenceNumbers.UNASSIGNED_SEQ_NO && (
versionType != VersionType.INTERNAL || version != Versions.MATCH_ANY
)) {
validationException = addValidationError("compare and write operations can not use versioning", validationException);
}

return validationException;
}

Expand Down Expand Up @@ -486,6 +501,33 @@ public IndexRequest versionType(VersionType versionType) {
return this;
}

public IndexRequest ifMatch(long seqNo, long term) {
if (term == 0 && seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
throw new IllegalArgumentException("seqNo is set, but primary term is [0]");
}

if (term != 0 && seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) {
throw new IllegalArgumentException("seqNo is unassigned, but primary term is [" + term + "]");
}
if (seqNo < 0 && seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
throw new IllegalArgumentException("sequence numbers must be non negative. got [" + seqNo + "].");
}
if (term < 0) {
throw new IllegalArgumentException("primary term must be non negative. got [" + term + "]");
}
ifSeqNoMatch = seqNo;
ifPrimaryTermMatch = term;
return this;
}

public long ifSeqNoMatch() {
return ifSeqNoMatch;
}

public long ifPrimaryTermMatch() {
return ifPrimaryTermMatch;
}

@Override
public VersionType versionType() {
return this.versionType;
Expand Down Expand Up @@ -515,6 +557,8 @@ public void process(Version indexCreatedVersion, @Nullable MappingMetaData mappi
// generate id if not already provided
if (id == null) {
assert autoGeneratedTimestamp == -1 : "timestamp has already been generated!";
assert ifSeqNoMatch == SequenceNumbers.UNASSIGNED_SEQ_NO;
assert ifPrimaryTermMatch == 0;
autoGeneratedTimestamp = Math.max(0, System.currentTimeMillis()); // extra paranoia
String uid;
if (indexCreatedVersion.onOrAfter(Version.V_6_0_0_beta1)) {
Expand Down Expand Up @@ -554,6 +598,13 @@ public void readFrom(StreamInput in) throws IOException {
} else {
contentType = null;
}
if (in.getVersion().onOrAfter(Version.V_6_6_0)) {
ifSeqNoMatch = in.readZLong();
ifPrimaryTermMatch = in.readVLong();
} else {
ifSeqNoMatch = SequenceNumbers.UNASSIGNED_SEQ_NO;
ifPrimaryTermMatch = 0;
}
}

@Override
Expand Down Expand Up @@ -583,6 +634,15 @@ public void writeTo(StreamOutput out) throws IOException {
} else {
out.writeBoolean(false);
}
if (out.getVersion().onOrAfter(Version.V_6_6_0)) {
out.writeZLong(ifSeqNoMatch);
out.writeVLong(ifPrimaryTermMatch);
} else if (ifSeqNoMatch != SequenceNumbers.UNASSIGNED_SEQ_NO || ifPrimaryTermMatch != 0) {
assert false : "setIfMatch [" + ifSeqNoMatch + "], currentDocTem [" + ifPrimaryTermMatch + "]";
throw new IllegalStateException(
"sequence number based compare and write is not supported until all nodes are on version 7.0 or higher. " +
"Stream version [" + out.getVersion() + "]");
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,15 @@ public IndexRequestBuilder setVersionType(VersionType versionType) {
return this;
}

/**
* only performs this indexing request if the document was last modification was assigned the given
* sequence number and primary term
*/
public IndexRequestBuilder setIfMatch(long seqNo, long term) {
request.ifMatch(seqNo, term);
return this;
}

/**
* Sets the ingest pipeline to be executed before indexing the document
*/
Expand Down

0 comments on commit 7a6a79d

Please sign in to comment.