Skip to content

Commit

Permalink
Added doc_as_upsert option to update api.
Browse files Browse the repository at this point in the history
This option can reduce to amount of data being send to Elasticsearch.
Closes #3195
  • Loading branch information
pecke01 authored and martijnvg committed Jun 17, 2013
1 parent 2f616e3 commit b7cb479
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 21 deletions.
Expand Up @@ -66,10 +66,10 @@ public Result prepare(UpdateRequest request, IndexShard indexShard) {
new String[]{SourceFieldMapper.NAME, RoutingFieldMapper.NAME, ParentFieldMapper.NAME, TTLFieldMapper.NAME}, true);

if (!getResult.isExists()) {
if (request.upsertRequest() == null) {
if (request.upsertRequest() == null && !request.docAsUpsert()) {
throw new DocumentMissingException(new ShardId(request.index(), request.shardId()), request.type(), request.id());
}
IndexRequest indexRequest = request.upsertRequest();
IndexRequest indexRequest = request.docAsUpsert() ? request.doc() : request.upsertRequest();
indexRequest.index(request.index()).type(request.type()).id(request.id())
// it has to be a "create!"
.create(true)
Expand Down
25 changes: 25 additions & 0 deletions src/main/java/org/elasticsearch/action/update/UpdateRequest.java
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.action.update;

import com.google.common.collect.Maps;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.index.IndexRequest;
Expand Down Expand Up @@ -68,6 +69,8 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;

private IndexRequest upsertRequest;

private boolean docAsUpsert = false;

@Nullable
private IndexRequest doc;
Expand Down Expand Up @@ -97,6 +100,9 @@ public ActionRequestValidationException validate() {
if (script != null && doc != null) {
validationException = addValidationError("can't provide both script and doc", validationException);
}
if(doc == null && docAsUpsert == true){
validationException = addValidationError("can't say to upsert doc without providing doc", validationException);
}
return validationException;
}

Expand Down Expand Up @@ -507,6 +513,8 @@ public UpdateRequest source(BytesReference source) throws Exception {
XContentBuilder docBuilder = XContentFactory.contentBuilder(xContentType);
docBuilder.copyCurrentStructure(parser);
safeDoc().source(docBuilder);
} else if("doc_as_upsert".equals(currentFieldName)){
docAsUpsert(parser.booleanValue());
}
}
} finally {
Expand All @@ -515,6 +523,16 @@ public UpdateRequest source(BytesReference source) throws Exception {
return this;
}

public boolean docAsUpsert() {
return this.docAsUpsert;
}
public void docAsUpsert(boolean shouldUpsertDoc) {
this.docAsUpsert = shouldUpsertDoc;
if(this.doc != null && this.upsertRequest == null){
upsert(doc);
}
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
Expand Down Expand Up @@ -544,6 +562,9 @@ public void readFrom(StreamInput in) throws IOException {
upsertRequest = new IndexRequest();
upsertRequest.readFrom(in);
}
if (in.getVersion().onOrAfter(Version.V_0_90_2)) {
docAsUpsert = in.readBoolean();
}
}

@Override
Expand Down Expand Up @@ -588,5 +609,9 @@ public void writeTo(StreamOutput out) throws IOException {
upsertRequest.id(id);
upsertRequest.writeTo(out);
}
if (out.getVersion().onOrAfter(Version.V_0_90_2)) {
out.writeBoolean(docAsUpsert);
}
}

}
Expand Up @@ -293,8 +293,17 @@ public UpdateRequestBuilder setSource(BytesReference source) throws Exception {
return this;
}

/**
* Sets whether the specified doc parameter should be used as upsert document.
*/
public UpdateRequestBuilder setDocAsUpsert(boolean shouldUpsertDoc) {
request.docAsUpsert(shouldUpsertDoc);
return this;
}

@Override
protected void doExecute(ActionListener<UpdateResponse> listener) {
((Client) client).update(request, listener);
}

}
Expand Up @@ -70,6 +70,7 @@ public void handleRequest(final RestRequest request, final RestChannel channel)
updateRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel));
}
updateRequest.percolate(request.param("percolate", null));
updateRequest.docAsUpsert(request.paramAsBoolean("doc_as_upsert", updateRequest.docAsUpsert()));
updateRequest.script(request.param("script"));
updateRequest.scriptLang(request.param("lang"));
for (Map.Entry<String, String> entry : request.params().entrySet()) {
Expand Down Expand Up @@ -111,7 +112,7 @@ public void handleRequest(final RestRequest request, final RestChannel channel)
}
doc.version(RestActions.parseVersion(request));
doc.versionType(VersionType.fromString(request.param("version_type"), doc.versionType()));
}
}
} catch (Exception e) {
try {
channel.sendResponse(new XContentThrowableRestResponse(request, e));
Expand Down
Expand Up @@ -19,23 +19,6 @@

package org.elasticsearch.test.integration.update;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertTrue;
import static org.testng.AssertJUnit.fail;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
Expand All @@ -50,6 +33,16 @@
import org.elasticsearch.test.integration.AbstractSharedClusterTest;
import org.testng.annotations.Test;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.testng.AssertJUnit.*;

public class UpdateTests extends AbstractSharedClusterTest {


Expand Down Expand Up @@ -170,7 +163,22 @@ public void testUpsert() throws Exception {
assertThat(getResponse.getSourceAsMap().get("field").toString(), equalTo("2"));
}
}

@Test
public void testUpsertDoc() throws Exception {
createIndex();
ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));

UpdateResponse updateResponse = client().prepareUpdate("test", "type1", "1")
.setDoc(XContentFactory.jsonBuilder().startObject().field("bar", "baz").endObject())
.setDocAsUpsert(true)
.setFields("_source")
.execute().actionGet();
assertThat(updateResponse.getGetResult(), notNullValue());
assertThat(updateResponse.getGetResult().sourceAsMap().get("bar").toString(), equalTo("baz"));
}

@Test
public void testUpsertFields() throws Exception {
createIndex();
Expand Down Expand Up @@ -382,6 +390,25 @@ public void testUpdateRequestWithBothScriptAndDoc() throws Exception {
assertThat(e.getMessage(), containsString("can't provide both script and doc"));
}
}

@Test
public void testUpdateRequestWithScriptAndShouldUpsertDoc() throws Exception{
createIndex();
ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
try {
client().prepareUpdate("test", "type1", "1")
.setScript("ctx._source.field += 1")
.setDocAsUpsert(true)
.execute().actionGet();
fail("Should have thrown ActionRequestValidationException");
} catch (ActionRequestValidationException e) {
assertThat(e.validationErrors().size(), equalTo(1));
assertThat(e.validationErrors().get(0), containsString("can't say to upsert doc without providing doc"));
assertThat(e.getMessage(), containsString("can't say to upsert doc without providing doc"));
}
}

@Test
public void testConcurrentUpdateWithRetryOnConflict() throws Exception {
Expand Down

0 comments on commit b7cb479

Please sign in to comment.