Skip to content

Commit

Permalink
Added version support to update requests
Browse files Browse the repository at this point in the history
Moved version handling from RobinEngine into VersionType. This avoids code re-use and makes it cleaner and easier to read.

Closes #3111
  • Loading branch information
bleskes committed Jun 20, 2013
1 parent 7184966 commit 1786293
Show file tree
Hide file tree
Showing 13 changed files with 472 additions and 165 deletions.
5 changes: 4 additions & 1 deletion src/main/java/org/elasticsearch/action/bulk/BulkRequest.java
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentFactory;
Expand Down Expand Up @@ -155,6 +156,7 @@ BulkRequest internalAdd(UpdateRequest request, @Nullable Object payload) {
}
return this;
}

/**
* Adds an {@link DeleteRequest} to the list of actions to execute.
*/
Expand Down Expand Up @@ -272,7 +274,7 @@ public BulkRequest add(BytesReference data, boolean contentUnsafe, @Nullable Str
String timestamp = null;
Long ttl = null;
String opType = null;
long version = 0;
long version = Versions.MATCH_ANY;
VersionType versionType = VersionType.INTERNAL;
String percolate = null;
int retryOnConflict = 0;
Expand Down Expand Up @@ -345,6 +347,7 @@ public BulkRequest add(BytesReference data, boolean contentUnsafe, @Nullable Str
.percolate(percolate), payload);
} else if ("update".equals(action)) {
internalAdd(new UpdateRequest(index, type, id).routing(routing).parent(parent).retryOnConflict(retryOnConflict)
.version(version).versionType(versionType)
.source(data.slice(from, nextMarker - from))
.percolate(percolate), payload);
}
Expand Down
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.internal.TimestampFieldMapper;
Expand Down Expand Up @@ -130,7 +131,7 @@ public static OpType fromId(byte id) {
private OpType opType = OpType.INDEX;

private boolean refresh = false;
private long version = 0;
private long version = Versions.MATCH_ANY;
private VersionType versionType = VersionType.INTERNAL;
private String percolate;

Expand Down
24 changes: 20 additions & 4 deletions src/main/java/org/elasticsearch/action/update/UpdateHelper.java
Expand Up @@ -14,8 +14,10 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.index.engine.DocumentSourceMissingException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.get.GetField;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
Expand Down Expand Up @@ -78,9 +80,23 @@ public Result prepare(UpdateRequest request, IndexShard indexShard) {
.refresh(request.refresh())
.replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel());
indexRequest.operationThreaded(false);
if (request.versionType() == VersionType.EXTERNAL) {
// in external versioning mode, we want to create the new document using the given version.
indexRequest.version(request.version()).versionType(VersionType.EXTERNAL);
}
return new Result(indexRequest, Operation.UPSERT, null, null);
}

if (request.versionType().isVersionConflict(getResult.getVersion(), request.version())) {
throw new VersionConflictEngineException(new ShardId(request.index(), request.shardId()), request.type(), request.id(),
getResult.getVersion(), request.version());
}

long updateVersion = getResult.getVersion();
if (request.versionType() == VersionType.EXTERNAL) {
updateVersion = request.version(); // remember, match_any is excluded by the conflict test
}

if (getResult.internalSourceRef() == null) {
// no source, we can't do nothing, through a failure...
throw new DocumentSourceMissingException(new ShardId(request.index(), request.shardId()), request.type(), request.id());
Expand Down Expand Up @@ -148,20 +164,20 @@ public Result prepare(UpdateRequest request, IndexShard indexShard) {
}
}

// TODO: external version type, does it make sense here? does not seem like it...
// TODO: because we use getResult.getVersion we loose the doc.version. The question is where is the right place?
if (operation == null || "index".equals(operation)) {
final IndexRequest indexRequest = Requests.indexRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent)
.source(updatedSourceAsMap, updateSourceContentType)
.version(getResult.getVersion()).replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel())
.version(updateVersion).versionType(request.versionType())
.replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel())
.timestamp(timestamp).ttl(ttl)
.percolate(request.percolate())
.refresh(request.refresh());
indexRequest.operationThreaded(false);
return new Result(indexRequest, Operation.INDEX, updatedSourceAsMap, updateSourceContentType);
} else if ("delete".equals(operation)) {
DeleteRequest deleteRequest = Requests.deleteRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent)
.version(getResult.getVersion()).replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel());
.version(updateVersion).versionType(request.versionType())
.replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel());
deleteRequest.operationThreaded(false);
return new Result(deleteRequest, Operation.DELETE, updatedSourceAsMap, updateSourceContentType);
} else if ("none".equals(operation)) {
Expand Down
61 changes: 54 additions & 7 deletions src/main/java/org/elasticsearch/action/update/UpdateRequest.java
Expand Up @@ -31,10 +31,12 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;

import java.io.IOException;
import java.util.Map;
Expand All @@ -59,7 +61,9 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>

private String[] fields;

int retryOnConflict = 0;
private long version = Versions.MATCH_ANY;
private VersionType versionType = VersionType.INTERNAL;
private int retryOnConflict = 0;

private String percolate;

Expand All @@ -69,7 +73,7 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;

private IndexRequest upsertRequest;

private boolean docAsUpsert = false;

@Nullable
Expand All @@ -94,14 +98,19 @@ public ActionRequestValidationException validate() {
if (id == null) {
validationException = addValidationError("id is missing", validationException);
}

if (version != Versions.MATCH_ANY && retryOnConflict > 0) {
validationException = addValidationError("can't provide both retry_on_conflict and a specific version", validationException);
}

if (script == null && doc == null) {
validationException = addValidationError("script or doc is missing", validationException);
}
if (script != null && doc != null) {
validationException = addValidationError("can't provide both script and doc", validationException);
}
if(doc == null && docAsUpsert){
validationException = addValidationError("can't say to upsert doc without providing doc", validationException);
if (doc == null && docAsUpsert) {
validationException = addValidationError("can't say to upsert doc without providing doc", validationException);
}
return validationException;
}
Expand Down Expand Up @@ -285,6 +294,31 @@ public int retryOnConflict() {
return this.retryOnConflict;
}

/**
* Sets the version, which will cause the index operation to only be performed if a matching
* version exists and no changes happened on the doc since then.
*/
public UpdateRequest version(long version) {
this.version = version;
return this;
}

public long version() {
return this.version;
}

/**
* Sets the versioning type. Defaults to {@link VersionType#INTERNAL}.
*/
public UpdateRequest versionType(VersionType versionType) {
this.versionType = versionType;
return this;
}

public VersionType versionType() {
return this.versionType;
}

/**
* Causes the update request document to be percolated. The parameter is the percolate query
* to use to reduce the percolated queries that are going to run against this doc. Can be
Expand Down Expand Up @@ -396,6 +430,14 @@ public UpdateRequest doc(byte[] source, int offset, int length) {
return this;
}

/**
* Sets the doc to use for updates when a script is not specified.
*/
public UpdateRequest doc(String field, Object value) {
safeDoc().source(field, value);
return this;
}

public IndexRequest doc() {
return this.doc;
}
Expand Down Expand Up @@ -513,8 +555,8 @@ public UpdateRequest source(BytesReference source) throws Exception {
XContentBuilder docBuilder = XContentFactory.contentBuilder(xContentType);
docBuilder.copyCurrentStructure(parser);
safeDoc().source(docBuilder);
} else if("doc_as_upsert".equals(currentFieldName)){
docAsUpsert(parser.booleanValue());
} else if ("doc_as_upsert".equals(currentFieldName)) {
docAsUpsert(parser.booleanValue());
}
}
} finally {
Expand All @@ -526,9 +568,10 @@ public UpdateRequest source(BytesReference source) throws Exception {
public boolean docAsUpsert() {
return this.docAsUpsert;
}

public void docAsUpsert(boolean shouldUpsertDoc) {
this.docAsUpsert = shouldUpsertDoc;
if(this.doc != null && this.upsertRequest == null){
if (this.doc != null && this.upsertRequest == null) {
upsert(doc);
}
}
Expand Down Expand Up @@ -565,6 +608,8 @@ public void readFrom(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_0_90_2)) {
docAsUpsert = in.readBoolean();
}
version = in.readLong();
versionType = VersionType.fromValue(in.readByte());
}

@Override
Expand Down Expand Up @@ -612,6 +657,8 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_0_90_2)) {
out.writeBoolean(docAsUpsert);
}
out.writeLong(version);
out.writeByte(versionType.getValue());
}

}
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;

import java.util.Map;

Expand Down Expand Up @@ -124,6 +125,24 @@ public UpdateRequestBuilder setRetryOnConflict(int retryOnConflict) {
return this;
}

/**
* Sets the version, which will cause the index operation to only be performed if a matching
* version exists and no changes happened on the doc since then.
*/
public UpdateRequestBuilder setVersion(long version) {
request.version(version);
return this;
}

/**
* Sets the versioning type. Defaults to {@link org.elasticsearch.index.VersionType#INTERNAL}.
*/
public UpdateRequestBuilder setVersionType(VersionType versionType) {
request.versionType(versionType);
return this;
}


/**
* Should a refresh be executed post this update operation causing the operation to
* be searchable. Note, heavy indexing should not set this to <tt>true</tt>. Defaults
Expand Down Expand Up @@ -216,6 +235,14 @@ public UpdateRequestBuilder setDoc(byte[] source, int offset, int length) {
return this;
}

/**
* Sets the doc to use for updates when a script is not specified.
*/
public UpdateRequestBuilder setDoc(String field, Object value) {
request.doc(field, value);
return this;
}

/**
* Sets the index request to be used if the document does not exists. Otherwise, a {@link org.elasticsearch.index.engine.DocumentMissingException}
* is thrown.
Expand Down Expand Up @@ -305,5 +332,4 @@ public UpdateRequestBuilder setDocAsUpsert(boolean shouldUpsertDoc) {
protected void doExecute(ActionListener<UpdateResponse> listener) {
((Client) client).update(request, listener);
}

}
52 changes: 50 additions & 2 deletions src/main/java/org/elasticsearch/index/VersionType.java
Expand Up @@ -19,13 +19,47 @@
package org.elasticsearch.index;

import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.lucene.uid.Versions;

/**
*
*/
public enum VersionType {
INTERNAL((byte) 0),
EXTERNAL((byte) 1);
INTERNAL((byte) 0) {
/**
* - always returns false if currentVersion == {@link Versions#NOT_SET}
* - always accepts expectedVersion == {@link Versions#MATCH_ANY}
* - if expectedVersion is set, always conflict if currentVersion == {@link Versions#NOT_FOUND}
*/
@Override
public boolean isVersionConflict(long currentVersion, long expectedVersion) {
return currentVersion != Versions.NOT_SET && expectedVersion != Versions.MATCH_ANY
&& (currentVersion == Versions.NOT_FOUND || currentVersion != expectedVersion);
}

@Override
public long updateVersion(long currentVersion, long expectedVersion) {
return (currentVersion == Versions.NOT_SET || currentVersion == Versions.NOT_FOUND) ? 1 : currentVersion + 1;
}

},
EXTERNAL((byte) 1) {
/**
* - always returns false if currentVersion == {@link Versions#NOT_SET}
* - always conflict if expectedVersion == {@link Versions#MATCH_ANY} (we need something to set)
* - accepts currentVersion == {@link Versions#NOT_FOUND}
*/
@Override
public boolean isVersionConflict(long currentVersion, long expectedVersion) {
return currentVersion != Versions.NOT_SET && currentVersion != Versions.NOT_FOUND
&& (expectedVersion == Versions.MATCH_ANY || currentVersion >= expectedVersion);
}

@Override
public long updateVersion(long currentVersion, long expectedVersion) {
return expectedVersion;
}
};

private final byte value;

Expand All @@ -37,6 +71,20 @@ public byte getValue() {
return value;
}

/**
* Checks whether the current version conflicts with the expected version, based on the current version type.
*
* @return true if versions conflict false o.w.
*/
public abstract boolean isVersionConflict(long currentVersion, long expectedVersion);

/**
* Returns the new version for a document, based on it's current one and the specified in the request
*
* @return new version
*/
public abstract long updateVersion(long currentVersion, long expectedVersion);

public static VersionType fromString(String versionType) {
if ("internal".equals(versionType)) {
return INTERNAL;
Expand Down

0 comments on commit 1786293

Please sign in to comment.