Skip to content

Commit

Permalink
Update API: Allow to upsert, provide a doc and index it if the doc do…
Browse files Browse the repository at this point in the history
…es not exists, closes #2008.
  • Loading branch information
kimchy committed Jun 8, 2012
1 parent 898fef1 commit 9905eab
Show file tree
Hide file tree
Showing 5 changed files with 234 additions and 2 deletions.
Expand Up @@ -165,7 +165,48 @@ protected void shardOperation(final UpdateRequest request, final ActionListener<

// no doc, what to do, what to do...
if (!getResult.exists()) {
listener.onFailure(new DocumentMissingException(new ShardId(request.index(), request.shardId()), request.type(), request.id()));
if (request.indexRequest() == null) {
listener.onFailure(new DocumentMissingException(new ShardId(request.index(), request.shardId()), request.type(), request.id()));
return;
}
IndexRequest indexRequest = request.indexRequest();
indexRequest.index(request.index()).type(request.type()).id(request.id())
// it has to be a "create!"
.create(true)
.routing(request.routing())
.percolate(request.percolate())
.refresh(request.refresh())
.replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel());
indexRequest.operationThreaded(false);
// we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
final BytesHolder updateSourceBytes = indexRequest.underlyingSourceBytes();
indexAction.execute(indexRequest, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse response) {
UpdateResponse update = new UpdateResponse(response.index(), response.type(), response.id(), response.version());
update.matches(response.matches());
// TODO: we can parse the index _source and extractGetResult if applicable
update.getResult(null);
listener.onResponse(update);
}

@Override
public void onFailure(Throwable e) {
e = ExceptionsHelper.unwrapCause(e);
if (e instanceof VersionConflictEngineException) {
if (retryCount < request.retryOnConflict()) {
threadPool.executor(executor()).execute(new Runnable() {
@Override
public void run() {
shardOperation(request, listener, retryCount + 1);
}
});
return;
}
}
listener.onFailure(e);
}
});
return;
}

Expand Down
87 changes: 87 additions & 0 deletions src/main/java/org/elasticsearch/action/update/UpdateRequest.java
Expand Up @@ -22,12 +22,15 @@
import com.google.common.collect.Maps;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.action.support.single.instance.InstanceShardOperationRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;

import java.io.IOException;
import java.util.Map;
Expand Down Expand Up @@ -60,6 +63,8 @@ public class UpdateRequest extends InstanceShardOperationRequest {
private ReplicationType replicationType = ReplicationType.DEFAULT;
private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;

private IndexRequest indexRequest;

UpdateRequest() {

}
Expand Down Expand Up @@ -330,6 +335,74 @@ public UpdateRequest consistencyLevel(WriteConsistencyLevel consistencyLevel) {
return this;
}

/**
* Sets the index request to be used if the document does not exists. Otherwise, a {@link org.elasticsearch.index.engine.DocumentMissingException}
* is thrown.
*/
public UpdateRequest doc(IndexRequest indexRequest) {
this.indexRequest = indexRequest;
return this;
}

/**
* Sets the doc source of the update request to be used when the document does not exists.
*/
public UpdateRequest doc(XContentBuilder source) {
safeIndexRequest().source(source);
return this;
}

/**
* Sets the doc source of the update request to be used when the document does not exists.
*/
public UpdateRequest doc(Map source) {
safeIndexRequest().source(source);
return this;
}

/**
* Sets the doc source of the update request to be used when the document does not exists.
*/
public UpdateRequest doc(Map source, XContentType contentType) {
safeIndexRequest().source(source, contentType);
return this;
}

/**
* Sets the doc source of the update request to be used when the document does not exists.
*/
public UpdateRequest doc(String source) {
safeIndexRequest().source(source);
return this;
}

/**
* Sets the doc source of the update request to be used when the document does not exists.
*/
public UpdateRequest doc(byte[] source) {
safeIndexRequest().source(source);
return this;
}

/**
* Sets the doc source of the update request to be used when the document does not exists.
*/
public UpdateRequest doc(byte[] source, int offset, int length) {
safeIndexRequest().source(source, offset, length);
return this;
}

public IndexRequest indexRequest() {
return this.indexRequest;
}

private IndexRequest safeIndexRequest() {
if (indexRequest == null) {
indexRequest = new IndexRequest();
}
return indexRequest;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
Expand Down Expand Up @@ -357,6 +430,10 @@ public void readFrom(StreamInput in) throws IOException {
fields[i] = in.readUTF();
}
}
if (in.readBoolean()) {
indexRequest = new IndexRequest();
indexRequest.readFrom(in);
}
}

@Override
Expand Down Expand Up @@ -396,5 +473,15 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeUTF(field);
}
}
if (indexRequest == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
// make sure the basics are set
indexRequest.index(index);
indexRequest.type(type);
indexRequest.id(id);
indexRequest.writeTo(out);
}
}
}
Expand Up @@ -21,10 +21,13 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.BaseRequestBuilder;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;

import java.util.Map;

Expand Down Expand Up @@ -180,6 +183,63 @@ public UpdateRequestBuilder setPercolate(String percolate) {
return this;
}

/**
* Sets the index request to be used if the document does not exists. Otherwise, a {@link org.elasticsearch.index.engine.DocumentMissingException}
* is thrown.
*/
public UpdateRequestBuilder setDoc(IndexRequest indexRequest) {
request.doc(indexRequest);
return this;
}

/**
* Sets the doc source of the update request to be used when the document does not exists.
*/
public UpdateRequestBuilder setDoc(XContentBuilder source) {
request.doc(source);
return this;
}

/**
* Sets the doc source of the update request to be used when the document does not exists.
*/
public UpdateRequestBuilder setDoc(Map source) {
request.doc(source);
return this;
}

/**
* Sets the doc source of the update request to be used when the document does not exists.
*/
public UpdateRequestBuilder setDoc(Map source, XContentType contentType) {
request.doc(source, contentType);
return this;
}

/**
* Sets the doc source of the update request to be used when the document does not exists.
*/
public UpdateRequestBuilder setDoc(String source) {
request.doc(source);
return this;
}

/**
* Sets the doc source of the update request to be used when the document does not exists.
*/
public UpdateRequestBuilder setDoc(byte[] source) {
request.doc(source);
return this;
}

/**
* Sets the doc source of the update request to be used when the document does not exists.
*/
public UpdateRequestBuilder setDoc(byte[] source, int offset, int length) {
request.doc(source, offset, length);
return this;
}

@Override
protected void doExecute(ActionListener<UpdateResponse> listener) {
client.update(request, listener);
Expand Down
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
Expand All @@ -32,7 +33,9 @@
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestActions;
import org.elasticsearch.rest.action.support.RestXContentBuilder;

import java.io.IOException;
Expand Down Expand Up @@ -91,7 +94,7 @@ public void handleRequest(final RestRequest request, final RestChannel channel)
if (xContentType != null) {
try {
Map<String, Object> content = XContentFactory.xContent(xContentType)
.createParser(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength()).mapAndClose();
.createParser(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength()).mapOrderedAndClose();
if (content.containsKey("script")) {
updateRequest.script(content.get("script").toString());
}
Expand All @@ -101,6 +104,19 @@ public void handleRequest(final RestRequest request, final RestChannel channel)
if (content.containsKey("params")) {
updateRequest.scriptParams((Map<String, Object>) content.get("params"));
}
if (content.containsKey("doc")) {
IndexRequest indexRequest = new IndexRequest();
indexRequest.source((Map) content.get("doc"), xContentType);
indexRequest.routing(request.param("routing"));
indexRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing
indexRequest.timestamp(request.param("timestamp"));
if (request.hasParam("ttl")) {
indexRequest.ttl(request.paramAsTime("ttl", null).millis());
}
indexRequest.version(RestActions.parseVersion(request));
indexRequest.versionType(VersionType.fromString(request.param("version_type"), indexRequest.versionType()));
updateRequest.doc(indexRequest);
}
} catch (Exception e) {
try {
channel.sendResponse(new XContentThrowableRestResponse(request, e));
Expand Down
Expand Up @@ -84,6 +84,34 @@ protected Client getClient() {
return client("node1");
}

@Test
public void testUpsert() throws Exception {
createIndex();
ClusterHealthResponse clusterHealth = client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));

client.prepareUpdate("test", "type1", "1")
.setDoc(XContentFactory.jsonBuilder().startObject().field("field", 1).endObject())
.setScript("ctx._source.field += 1")
.execute().actionGet();

for (int i = 0; i < 5; i++) {
GetResponse getResponse = client.prepareGet("test", "type1", "1").execute().actionGet();
assertThat(getResponse.sourceAsMap().get("field").toString(), equalTo("1"));
}

client.prepareUpdate("test", "type1", "1")
.setDoc(XContentFactory.jsonBuilder().startObject().field("field", 1).endObject())
.setScript("ctx._source.field += 1")
.execute().actionGet();

for (int i = 0; i < 5; i++) {
GetResponse getResponse = client.prepareGet("test", "type1", "1").execute().actionGet();
assertThat(getResponse.sourceAsMap().get("field").toString(), equalTo("2"));
}
}

@Test
public void testUpdate() throws Exception {
createIndex();
Expand Down

0 comments on commit 9905eab

Please sign in to comment.