Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update API: Add support for scripted upserts. #7144

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 34 additions & 0 deletions docs/reference/docs/update.asciidoc
Expand Up @@ -126,6 +126,7 @@ curl -XPOST 'localhost:9200/test/type1/1/_update' -d '{
If `name` was `new_name` before the request was sent then the entire update
request is ignored.

=== Upserts
There is also support for `upsert`. If the document does
not already exists, the content of the `upsert` element will be used to
index the fresh doc:
Expand All @@ -142,6 +143,36 @@ curl -XPOST 'localhost:9200/test/type1/1/_update' -d '{
}
}'
--------------------------------------------------
added[1.4.0]

If the document does not exist you may want your update script to
run anyway in order to initialize the document contents using
business logic unknown to the client. In this case pass the
new `scripted_upsert` parameter with the value `true`.

[source,js]
--------------------------------------------------
curl -XPOST 'localhost:9200/sessions/session/dh3sgudg8gsrgl/_update' -d '{
"script_id" : "myWebSessionSummariser",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use underscore_case for consistency with the rest of the docs?

"scripted_upsert":true,
"params" : {
"pageViewEvent" : {
"url":"foo.com/bar",
"response":404,
"time":"2014-01-01 12:32"
}
},
"upsert" : {
}
}'
--------------------------------------------------
The default `scripted_upsert` setting is `false` meaning the script is not executed for inserts.
However, in scenarios like the one above we may be using a non-trivial script stored
using the new "indexed scripts" feature. The script may be deriving properties
like the duration of our web session based on observing multiple page view events so the
client can supply a blank "upsert" document and allow the script to fill in most of the details
using the events passed in the `params` element.


Last it also supports `doc_as_upsert`. So that the
provided document will be inserted if the document does not already
Expand All @@ -158,6 +189,9 @@ curl -XPOST 'localhost:9200/test/type1/1/_update' -d '{
}'
--------------------------------------------------


=== Parameters

The update operation supports similar parameters as the index API,
including:

Expand Down
7 changes: 7 additions & 0 deletions rest-api-spec/api/update.json
Expand Up @@ -61,6 +61,13 @@
"script": {
"description": "The URL-encoded script definition (instead of using request body)"
},
"script_id": {
"description": "The id of a stored script"
},
"scripted_upsert": {
"type": "boolean",
"description": "True if the script referenced in script or script_id should be called to perform inserts - defaults to false"
},
"timeout": {
"type": "time",
"description": "Explicit operation timeout"
Expand Down
19 changes: 19 additions & 0 deletions rest-api-spec/test/update/25_script_upsert.yaml
Expand Up @@ -37,5 +37,24 @@
id: 1

- match: { _source.foo: xxx }

- do:
update:
index: test_1
type: test
id: 2
body:
script: "ctx._source.foo = bar"
params: { bar: 'xxx' }
upsert: { foo: baz }
scripted_upsert: true

- do:
get:
index: test_1
type: test
id: 2

- match: { _source.foo: xxx }


65 changes: 54 additions & 11 deletions src/main/java/org/elasticsearch/action/update/UpdateHelper.java
Expand Up @@ -90,11 +90,49 @@ public Result prepare(UpdateRequest request, IndexShard indexShard) {
if (request.upsertRequest() == null && !request.docAsUpsert()) {
throw new DocumentMissingException(new ShardId(request.index(), request.shardId()), request.type(), request.id());
}
Long ttl = null;
IndexRequest indexRequest = request.docAsUpsert() ? request.doc() : request.upsertRequest();
if (request.scriptedUpsert() && (request.script() != null)) {
// Run the script to perform the create logic
IndexRequest upsert = request.upsertRequest();
Map<String, Object> upsertDoc = upsert.sourceAsMap();
Map<String, Object> ctx = new HashMap<>(2);
// Tell the script that this is a create and not an update
ctx.put("op", "create");
ctx.put("_source", upsertDoc);
try {
ExecutableScript script = scriptService.executable(request.scriptLang, request.script, request.scriptType, request.scriptParams);
script.setNextVar("ctx", ctx);
script.run();
// we need to unwrap the ctx...
ctx = (Map<String, Object>) script.unwrap(ctx);
} catch (Exception e) {
throw new ElasticsearchIllegalArgumentException("failed to execute script", e);
}
//Allow the script to set TTL using ctx._ttl
ttl = getTTLFromScriptContext(ctx);
//Allow the script to abort the create by setting "op" to "none"
String scriptOpChoice = (String) ctx.get("op");

// Only valid options for an upsert script are "create"
// (the default) or "none", meaning abort upsert
if (!"create".equals(scriptOpChoice)) {
if (!"none".equals(scriptOpChoice)) {
logger.warn("Used upsert operation [{}] for script [{}], doing nothing...", scriptOpChoice, request.script);
}
UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(),
getResult.getVersion(), false);
update.setGetResult(getResult);
return new Result(update, Operation.NONE, upsertDoc, XContentType.JSON);
}
indexRequest.source((Map)ctx.get("_source"));
}

indexRequest.index(request.index()).type(request.type()).id(request.id())
// it has to be a "create!"
.create(true)
.create(true)
.routing(request.routing())
.ttl(ttl)
.refresh(request.refresh())
.replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel());
indexRequest.operationThreaded(false);
Expand All @@ -121,7 +159,6 @@ public Result prepare(UpdateRequest request, IndexShard indexShard) {
String operation = null;
String timestamp = null;
Long ttl = null;
Object fetchedTTL = null;
final Map<String, Object> updatedSourceAsMap;
final XContentType updateSourceContentType = sourceAndContent.v1();
String routing = getResult.getFields().containsKey(RoutingFieldMapper.NAME) ? getResult.field(RoutingFieldMapper.NAME).getValue().toString() : null;
Expand Down Expand Up @@ -164,15 +201,8 @@ public Result prepare(UpdateRequest request, IndexShard indexShard) {
operation = (String) ctx.get("op");
timestamp = (String) ctx.get("_timestamp");

fetchedTTL = ctx.get("_ttl");
if (fetchedTTL != null) {
if (fetchedTTL instanceof Number) {
ttl = ((Number) fetchedTTL).longValue();
} else {
ttl = TimeValue.parseTimeValue((String) fetchedTTL, null).millis();
}
}

ttl = getTTLFromScriptContext(ctx);

updatedSourceAsMap = (Map<String, Object>) ctx.get("_source");
}

Expand Down Expand Up @@ -211,6 +241,19 @@ public Result prepare(UpdateRequest request, IndexShard indexShard) {
}
}

private Long getTTLFromScriptContext(Map<String, Object> ctx) {
Long ttl = null;
Object fetchedTTL = ctx.get("_ttl");
if (fetchedTTL != null) {
if (fetchedTTL instanceof Number) {
ttl = ((Number) fetchedTTL).longValue();
} else {
ttl = TimeValue.parseTimeValue((String) fetchedTTL, null).millis();
}
}
return ttl;
}

/**
* Extracts the fields from the updated document to be returned in a update response
*/
Expand Down
18 changes: 18 additions & 0 deletions src/main/java/org/elasticsearch/action/update/UpdateRequest.java
Expand Up @@ -76,6 +76,7 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>

private IndexRequest upsertRequest;

private boolean scriptedUpsert = false;
private boolean docAsUpsert = false;
private boolean detectNoop = false;

Expand Down Expand Up @@ -596,6 +597,8 @@ public UpdateRequest source(BytesReference source) throws Exception {
scriptParams = parser.map();
} else if ("lang".equals(currentFieldName)) {
scriptLang = parser.text();
} else if ("scripted_upsert".equals(currentFieldName)) {
scriptedUpsert = parser.booleanValue();
} else if ("upsert".equals(currentFieldName)) {
XContentBuilder builder = XContentFactory.contentBuilder(xContentType);
builder.copyCurrentStructure(parser);
Expand All @@ -621,6 +624,15 @@ public boolean docAsUpsert() {
public void docAsUpsert(boolean shouldUpsertDoc) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should return this; (though it's not part of the same PR) :)

this.docAsUpsert = shouldUpsertDoc;
}

public boolean scriptedUpsert(){
return this.scriptedUpsert;
}

public void scriptedUpsert(boolean scriptedUpsert) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should return this;

this.scriptedUpsert = scriptedUpsert;
}


@Override
public void readFrom(StreamInput in) throws IOException {
Expand Down Expand Up @@ -663,6 +675,9 @@ public void readFrom(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_1_3_0)) {
detectNoop = in.readBoolean();
}
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
scriptedUpsert = in.readBoolean();
}
}

@Override
Expand Down Expand Up @@ -715,6 +730,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_1_3_0)) {
out.writeBoolean(detectNoop);
}
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
out.writeBoolean(scriptedUpsert);
}
}

}
Expand Up @@ -353,6 +353,15 @@ public UpdateRequestBuilder setDetectNoop(boolean detectNoop) {
request.detectNoop(detectNoop);
return this;
}


/**
* Sets whether the script should be run in the case of an insert
*/
public UpdateRequestBuilder setScriptedUpsert(boolean scriptedUpsert) {
request.scriptedUpsert(scriptedUpsert);
return this;
}

@Override
protected void doExecute(ActionListener<UpdateResponse> listener) {
Expand Down
55 changes: 50 additions & 5 deletions src/test/java/org/elasticsearch/update/UpdateTests.java
Expand Up @@ -19,18 +19,17 @@

package org.elasticsearch.update;

import org.apache.lucene.document.Field;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.transport.NoNodeAvailableException;
Expand All @@ -44,14 +43,15 @@
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.merge.policy.AbstractMergePolicyProvider;
import org.elasticsearch.index.merge.policy.MergePolicyModule;
import org.elasticsearch.index.merge.policy.TieredMergePolicyProvider;
import org.elasticsearch.index.store.CorruptedFileTest;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;

import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
Expand Down Expand Up @@ -179,6 +179,51 @@ public void testUpsert() throws Exception {
assertThat(getResponse.getSourceAsMap().get("field").toString(), equalTo("2"));
}
}

@Test
public void testScriptedUpsert() throws Exception {
createIndex();
ensureGreen();

// Script logic is
// 1) New accounts take balance from "balance" in upsert doc and first payment is charged at 50%
// 2) Existing accounts subtract full payment from balance stored in elasticsearch

String script="int oldBalance=ctx._source.balance;"+
"int deduction=ctx.op == \"create\" ? (payment/2) : payment;"+
"ctx._source.balance=oldBalance-deduction;";
int openingBalance=10;

// Pay money from what will be a new account and opening balance comes from upsert doc
// provided by client
UpdateResponse updateResponse = client().prepareUpdate("test", "type1", "1")
.setUpsert(XContentFactory.jsonBuilder().startObject().field("balance", openingBalance).endObject())
.setScriptedUpsert(true)
.addScriptParam("payment", 2)
.setScript(script, ScriptService.ScriptType.INLINE)
.execute().actionGet();
assertTrue(updateResponse.isCreated());

for (int i = 0; i < 5; i++) {
GetResponse getResponse = client().prepareGet("test", "type1", "1").execute().actionGet();
assertThat(getResponse.getSourceAsMap().get("balance").toString(), equalTo("9"));
}

// Now pay money for an existing account where balance is stored in es
updateResponse = client().prepareUpdate("test", "type1", "1")
.setUpsert(XContentFactory.jsonBuilder().startObject().field("balance", openingBalance).endObject())
.setScriptedUpsert(true)
.addScriptParam("payment", 2)
.setScript(script, ScriptService.ScriptType.INLINE)
.execute().actionGet();
assertFalse(updateResponse.isCreated());


for (int i = 0; i < 5; i++) {
GetResponse getResponse = client().prepareGet("test", "type1", "1").execute().actionGet();
assertThat(getResponse.getSourceAsMap().get("balance").toString(), equalTo("7"));
}
}

@Test
public void testUpsertDoc() throws Exception {
Expand Down