-
Notifications
You must be signed in to change notification settings - Fork 24.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add seq no powered optimistic locking support to the index and delete transport actions #36619
Changes from 4 commits
7e33a1d
0f807b5
6d9e3ca
ec5129b
a8c1d40
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,6 +44,7 @@ | |
import org.elasticsearch.common.xcontent.XContentParser; | ||
import org.elasticsearch.common.xcontent.XContentType; | ||
import org.elasticsearch.index.VersionType; | ||
import org.elasticsearch.index.seqno.SequenceNumbers; | ||
import org.elasticsearch.search.fetch.subphase.FetchSourceContext; | ||
|
||
import java.io.IOException; | ||
|
@@ -77,6 +78,8 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques | |
private static final ParseField RETRY_ON_CONFLICT = new ParseField("retry_on_conflict"); | ||
private static final ParseField PIPELINE = new ParseField("pipeline"); | ||
private static final ParseField SOURCE = new ParseField("_source"); | ||
private static final ParseField IF_SEQ_NO_MATCH = new ParseField("if_seq_no_match"); | ||
private static final ParseField IF_PRIMARY_TERM_MATCH = new ParseField("if_primary_term_match"); | ||
|
||
/** | ||
* Requests that are part of this request. It is only possible to add things that are both {@link ActionRequest}s and | ||
|
@@ -347,6 +350,8 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null | |
String opType = null; | ||
long version = Versions.MATCH_ANY; | ||
VersionType versionType = VersionType.INTERNAL; | ||
long ifSeqNoMatch = SequenceNumbers.UNASSIGNED_SEQ_NO; | ||
long ifPrimaryTermMatch = 0; | ||
int retryOnConflict = 0; | ||
String pipeline = valueOrDefault(defaultPipeline, globalPipeline); | ||
|
||
|
@@ -377,6 +382,10 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null | |
version = parser.longValue(); | ||
} else if (VERSION_TYPE.match(currentFieldName, parser.getDeprecationHandler())) { | ||
versionType = VersionType.fromString(parser.text()); | ||
} else if (IF_SEQ_NO_MATCH.match(currentFieldName, parser.getDeprecationHandler())) { | ||
ifSeqNoMatch = parser.longValue(); | ||
} else if (IF_PRIMARY_TERM_MATCH.match(currentFieldName, parser.getDeprecationHandler())) { | ||
ifPrimaryTermMatch = parser.longValue(); | ||
} else if (RETRY_ON_CONFLICT.match(currentFieldName, parser.getDeprecationHandler())) { | ||
retryOnConflict = parser.intValue(); | ||
} else if (PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) { | ||
|
@@ -404,7 +413,8 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null | |
} | ||
|
||
if ("delete".equals(action)) { | ||
add(new DeleteRequest(index, type, id).routing(routing).version(version).versionType(versionType), payload); | ||
add(new DeleteRequest(index, type, id).routing(routing) | ||
.version(version).versionType(versionType).setIfMatch(ifSeqNoMatch, ifPrimaryTermMatch), payload); | ||
} else { | ||
nextMarker = findNextMarker(marker, from, data, length); | ||
if (nextMarker == -1) { | ||
|
@@ -417,16 +427,16 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null | |
if ("index".equals(action)) { | ||
if (opType == null) { | ||
internalAdd(new IndexRequest(index, type, id).routing(routing).version(version).versionType(versionType) | ||
.setPipeline(pipeline) | ||
.setPipeline(pipeline).ifMatch(ifSeqNoMatch, ifPrimaryTermMatch) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
.source(sliceTrimmingCarriageReturn(data, from, nextMarker,xContentType), xContentType), payload); | ||
} else { | ||
internalAdd(new IndexRequest(index, type, id).routing(routing).version(version).versionType(versionType) | ||
.create("create".equals(opType)).setPipeline(pipeline) | ||
.create("create".equals(opType)).setPipeline(pipeline).ifMatch(ifSeqNoMatch, ifPrimaryTermMatch) | ||
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType), payload); | ||
} | ||
} else if ("create".equals(action)) { | ||
internalAdd(new IndexRequest(index, type, id).routing(routing).version(version).versionType(versionType) | ||
.create(true).setPipeline(pipeline) | ||
.create(true).setPipeline(pipeline).ifMatch(ifSeqNoMatch, ifPrimaryTermMatch) | ||
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType), payload); | ||
} else if ("update".equals(action)) { | ||
UpdateRequest updateRequest = new UpdateRequest(index, type, id).routing(routing).retryOnConflict(retryOnConflict) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,7 @@ | |
import org.elasticsearch.common.lucene.uid.Versions; | ||
import org.elasticsearch.index.VersionType; | ||
import org.elasticsearch.index.mapper.MapperService; | ||
import org.elasticsearch.index.seqno.SequenceNumbers; | ||
import org.elasticsearch.index.shard.ShardId; | ||
|
||
import java.io.IOException; | ||
|
@@ -57,6 +58,8 @@ public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest> | |
private String routing; | ||
private long version = Versions.MATCH_ANY; | ||
private VersionType versionType = VersionType.INTERNAL; | ||
private long ifSeqNoMatch = SequenceNumbers.UNASSIGNED_SEQ_NO; | ||
private long ifPrimaryTermMatch = 0; | ||
|
||
public DeleteRequest() { | ||
} | ||
|
@@ -112,6 +115,12 @@ public ActionRequestValidationException validate() { | |
if (versionType == VersionType.FORCE) { | ||
validationException = addValidationError("version type [force] may no longer be used", validationException); | ||
} | ||
|
||
if (ifSeqNoMatch != SequenceNumbers.UNASSIGNED_SEQ_NO && ( | ||
versionType != VersionType.INTERNAL || version != Versions.MATCH_ANY | ||
)) { | ||
validationException = addValidationError("compare and write operations can not use versioning", validationException); | ||
} | ||
return validationException; | ||
} | ||
|
||
|
@@ -194,6 +203,32 @@ public DeleteRequest versionType(VersionType versionType) { | |
return this; | ||
} | ||
|
||
public long ifSeqNoMatch() { | ||
return ifSeqNoMatch; | ||
} | ||
|
||
public long ifPrimaryTermMatch() { | ||
return ifPrimaryTermMatch; | ||
} | ||
|
||
public DeleteRequest setIfMatch(long seqNo, long term) { | ||
if (term == 0 && seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) { | ||
throw new IllegalArgumentException("seqNo is set, but primary term is [0]"); | ||
} | ||
if (term != 0 && seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) { | ||
throw new IllegalArgumentException("seqNo is unassigned, but primary term is [" + term + "]"); | ||
} | ||
if (seqNo < 0 && seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) { | ||
throw new IllegalArgumentException("sequence numbers must be non negative. got [" + seqNo + "]."); | ||
} | ||
if (term < 0) { | ||
throw new IllegalArgumentException("primary term must be non negative. got [" + term + "]"); | ||
} | ||
ifSeqNoMatch = seqNo; | ||
ifPrimaryTermMatch = term; | ||
return this; | ||
} | ||
|
||
@Override | ||
public VersionType versionType() { | ||
return this.versionType; | ||
|
@@ -215,6 +250,13 @@ public void readFrom(StreamInput in) throws IOException { | |
} | ||
version = in.readLong(); | ||
versionType = VersionType.fromValue(in.readByte()); | ||
if (in.getVersion().onOrAfter(Version.V_7_0_0)) { | ||
ifSeqNoMatch = in.readZLong(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can it be negative at this point? if not can we assert? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this can be -2 (unassigned). I wonder if we should call the validate method on these once deserialized and under assertion? I think that will give us a much better testing coverage? |
||
ifPrimaryTermMatch = in.readVLong(); | ||
} else { | ||
ifSeqNoMatch = SequenceNumbers.UNASSIGNED_SEQ_NO; | ||
ifPrimaryTermMatch = 0; | ||
} | ||
} | ||
|
||
@Override | ||
|
@@ -228,6 +270,15 @@ public void writeTo(StreamOutput out) throws IOException { | |
} | ||
out.writeLong(version); | ||
out.writeByte(versionType.getValue()); | ||
if (out.getVersion().onOrAfter(Version.V_7_0_0)) { | ||
out.writeZLong(ifSeqNoMatch); | ||
out.writeVLong(ifPrimaryTermMatch); | ||
} else if (ifSeqNoMatch != SequenceNumbers.UNASSIGNED_SEQ_NO || ifPrimaryTermMatch != 0) { | ||
assert false : "setIfMatch [" + ifSeqNoMatch + "], currentDocTem [" + ifPrimaryTermMatch + "]"; | ||
throw new IllegalStateException( | ||
"sequence number based compare and write is not supported until all nodes are on version 7.0 or higher. " + | ||
"Stream version [" + out.getVersion() + "]"); | ||
} | ||
} | ||
|
||
@Override | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
really? :D that name is very confusing. apparently
expected_seq_no
is not what you want do. bummer. I am definitely -1 on this one sorry. Same goes for anycas
variants. I think these names are really not good. What about"replace" : { "seq_no" : x, "primary_term" : y}
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:) this is hard. The problem with json structure is that these need to go in a url parameter.