Skip to content

Commit

Permalink
Update API enhancement - add support for scripted upserts.
Browse files Browse the repository at this point in the history
In the case of inserts the UpdateHelper class will now allow the script used to apply updates to run on the upsert doc provided by clients. This allows the logic for managing the internal state of the data item to be managed by the script and is not reliant on clients performing the initialisation of data structures managed by the script.

Closes #7143
  • Loading branch information
markharwood committed Aug 5, 2014
1 parent 418ce50 commit e6b459c
Show file tree
Hide file tree
Showing 7 changed files with 192 additions and 17 deletions.
36 changes: 35 additions & 1 deletion docs/reference/docs/update.asciidoc
Expand Up @@ -126,6 +126,7 @@ curl -XPOST 'localhost:9200/test/type1/1/_update' -d '{
If `name` was `new_name` before the request was sent then the entire update
request is ignored.

=== Upserts
There is also support for `upsert`. If the document does
not already exists, the content of the `upsert` element will be used to
index the fresh doc:
Expand All @@ -142,8 +143,38 @@ curl -XPOST 'localhost:9200/test/type1/1/_update' -d '{
}
}'
--------------------------------------------------
added[1.4.0]

Last it also supports `doc_as_upsert`. So that the
If the document does not exist you may want your update script to
run anyway in order to initialize the document contents using
business logic unknown to the client. In this case pass the
new `scripted_upsert` parameter with the value `true`.

[source,js]
--------------------------------------------------
curl -XPOST 'localhost:9200/sessions/session/dh3sgudg8gsrgl/_update' -d '{
"script_id" : "my_web_session_summariser",
"scripted_upsert":true,
"params" : {
"pageViewEvent" : {
"url":"foo.com/bar",
"response":404,
"time":"2014-01-01 12:32"
}
},
"upsert" : {
}
}'
--------------------------------------------------
The default `scripted_upsert` setting is `false` meaning the script is not executed for inserts.
However, in scenarios like the one above we may be using a non-trivial script stored
using the new "indexed scripts" feature. The script may be deriving properties
like the duration of our web session based on observing multiple page view events so the
client can supply a blank "upsert" document and allow the script to fill in most of the details
using the events passed in the `params` element.


Last, the upsert facility also supports `doc_as_upsert`. So that the
provided document will be inserted if the document does not already
exist. This will reduce the amount of data that needs to be sent to
elasticsearch.
Expand All @@ -158,6 +189,9 @@ curl -XPOST 'localhost:9200/test/type1/1/_update' -d '{
}'
--------------------------------------------------


=== Parameters

The update operation supports similar parameters as the index API,
including:

Expand Down
7 changes: 7 additions & 0 deletions rest-api-spec/api/update.json
Expand Up @@ -61,6 +61,13 @@
"script": {
"description": "The URL-encoded script definition (instead of using request body)"
},
"script_id": {
"description": "The id of a stored script"
},
"scripted_upsert": {
"type": "boolean",
"description": "True if the script referenced in script or script_id should be called to perform inserts - defaults to false"
},
"timeout": {
"type": "time",
"description": "Explicit operation timeout"
Expand Down
19 changes: 19 additions & 0 deletions rest-api-spec/test/update/25_script_upsert.yaml
Expand Up @@ -37,5 +37,24 @@
id: 1

- match: { _source.foo: xxx }

- do:
update:
index: test_1
type: test
id: 2
body:
script: "ctx._source.foo = bar"
params: { bar: 'xxx' }
upsert: { foo: baz }
scripted_upsert: true

- do:
get:
index: test_1
type: test
id: 2

- match: { _source.foo: xxx }


65 changes: 54 additions & 11 deletions src/main/java/org/elasticsearch/action/update/UpdateHelper.java
Expand Up @@ -90,11 +90,49 @@ public Result prepare(UpdateRequest request, IndexShard indexShard) {
if (request.upsertRequest() == null && !request.docAsUpsert()) {
throw new DocumentMissingException(new ShardId(request.index(), request.shardId()), request.type(), request.id());
}
Long ttl = null;
IndexRequest indexRequest = request.docAsUpsert() ? request.doc() : request.upsertRequest();
if (request.scriptedUpsert() && (request.script() != null)) {
// Run the script to perform the create logic
IndexRequest upsert = request.upsertRequest();
Map<String, Object> upsertDoc = upsert.sourceAsMap();
Map<String, Object> ctx = new HashMap<>(2);
// Tell the script that this is a create and not an update
ctx.put("op", "create");
ctx.put("_source", upsertDoc);
try {
ExecutableScript script = scriptService.executable(request.scriptLang, request.script, request.scriptType, request.scriptParams);
script.setNextVar("ctx", ctx);
script.run();
// we need to unwrap the ctx...
ctx = (Map<String, Object>) script.unwrap(ctx);
} catch (Exception e) {
throw new ElasticsearchIllegalArgumentException("failed to execute script", e);
}
//Allow the script to set TTL using ctx._ttl
ttl = getTTLFromScriptContext(ctx);
//Allow the script to abort the create by setting "op" to "none"
String scriptOpChoice = (String) ctx.get("op");

// Only valid options for an upsert script are "create"
// (the default) or "none", meaning abort upsert
if (!"create".equals(scriptOpChoice)) {
if (!"none".equals(scriptOpChoice)) {
logger.warn("Used upsert operation [{}] for script [{}], doing nothing...", scriptOpChoice, request.script);
}
UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(),
getResult.getVersion(), false);
update.setGetResult(getResult);
return new Result(update, Operation.NONE, upsertDoc, XContentType.JSON);
}
indexRequest.source((Map)ctx.get("_source"));
}

indexRequest.index(request.index()).type(request.type()).id(request.id())
// it has to be a "create!"
.create(true)
.create(true)
.routing(request.routing())
.ttl(ttl)
.refresh(request.refresh())
.replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel());
indexRequest.operationThreaded(false);
Expand All @@ -121,7 +159,6 @@ public Result prepare(UpdateRequest request, IndexShard indexShard) {
String operation = null;
String timestamp = null;
Long ttl = null;
Object fetchedTTL = null;
final Map<String, Object> updatedSourceAsMap;
final XContentType updateSourceContentType = sourceAndContent.v1();
String routing = getResult.getFields().containsKey(RoutingFieldMapper.NAME) ? getResult.field(RoutingFieldMapper.NAME).getValue().toString() : null;
Expand Down Expand Up @@ -164,15 +201,8 @@ public Result prepare(UpdateRequest request, IndexShard indexShard) {
operation = (String) ctx.get("op");
timestamp = (String) ctx.get("_timestamp");

fetchedTTL = ctx.get("_ttl");
if (fetchedTTL != null) {
if (fetchedTTL instanceof Number) {
ttl = ((Number) fetchedTTL).longValue();
} else {
ttl = TimeValue.parseTimeValue((String) fetchedTTL, null).millis();
}
}

ttl = getTTLFromScriptContext(ctx);

updatedSourceAsMap = (Map<String, Object>) ctx.get("_source");
}

Expand Down Expand Up @@ -211,6 +241,19 @@ public Result prepare(UpdateRequest request, IndexShard indexShard) {
}
}

private Long getTTLFromScriptContext(Map<String, Object> ctx) {
Long ttl = null;
Object fetchedTTL = ctx.get("_ttl");
if (fetchedTTL != null) {
if (fetchedTTL instanceof Number) {
ttl = ((Number) fetchedTTL).longValue();
} else {
ttl = TimeValue.parseTimeValue((String) fetchedTTL, null).millis();
}
}
return ttl;
}

/**
* Extracts the fields from the updated document to be returned in a update response
*/
Expand Down
18 changes: 18 additions & 0 deletions src/main/java/org/elasticsearch/action/update/UpdateRequest.java
Expand Up @@ -76,6 +76,7 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>

private IndexRequest upsertRequest;

private boolean scriptedUpsert = false;
private boolean docAsUpsert = false;
private boolean detectNoop = false;

Expand Down Expand Up @@ -596,6 +597,8 @@ public UpdateRequest source(BytesReference source) throws Exception {
scriptParams = parser.map();
} else if ("lang".equals(currentFieldName)) {
scriptLang = parser.text();
} else if ("scripted_upsert".equals(currentFieldName)) {
scriptedUpsert = parser.booleanValue();
} else if ("upsert".equals(currentFieldName)) {
XContentBuilder builder = XContentFactory.contentBuilder(xContentType);
builder.copyCurrentStructure(parser);
Expand All @@ -621,6 +624,15 @@ public boolean docAsUpsert() {
public void docAsUpsert(boolean shouldUpsertDoc) {
this.docAsUpsert = shouldUpsertDoc;
}

public boolean scriptedUpsert(){
return this.scriptedUpsert;
}

public void scriptedUpsert(boolean scriptedUpsert) {
this.scriptedUpsert = scriptedUpsert;
}


@Override
public void readFrom(StreamInput in) throws IOException {
Expand Down Expand Up @@ -663,6 +675,9 @@ public void readFrom(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_1_3_0)) {
detectNoop = in.readBoolean();
}
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
scriptedUpsert = in.readBoolean();
}
}

@Override
Expand Down Expand Up @@ -715,6 +730,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_1_3_0)) {
out.writeBoolean(detectNoop);
}
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
out.writeBoolean(scriptedUpsert);
}
}

}
Expand Up @@ -353,6 +353,15 @@ public UpdateRequestBuilder setDetectNoop(boolean detectNoop) {
request.detectNoop(detectNoop);
return this;
}


/**
* Sets whether the script should be run in the case of an insert
*/
public UpdateRequestBuilder setScriptedUpsert(boolean scriptedUpsert) {
request.scriptedUpsert(scriptedUpsert);
return this;
}

@Override
protected void doExecute(ActionListener<UpdateResponse> listener) {
Expand Down
55 changes: 50 additions & 5 deletions src/test/java/org/elasticsearch/update/UpdateTests.java
Expand Up @@ -19,18 +19,17 @@

package org.elasticsearch.update;

import org.apache.lucene.document.Field;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.transport.NoNodeAvailableException;
Expand All @@ -44,14 +43,15 @@
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.merge.policy.AbstractMergePolicyProvider;
import org.elasticsearch.index.merge.policy.MergePolicyModule;
import org.elasticsearch.index.merge.policy.TieredMergePolicyProvider;
import org.elasticsearch.index.store.CorruptedFileTest;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;

import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
Expand Down Expand Up @@ -179,6 +179,51 @@ public void testUpsert() throws Exception {
assertThat(getResponse.getSourceAsMap().get("field").toString(), equalTo("2"));
}
}

@Test
public void testScriptedUpsert() throws Exception {
createIndex();
ensureGreen();

// Script logic is
// 1) New accounts take balance from "balance" in upsert doc and first payment is charged at 50%
// 2) Existing accounts subtract full payment from balance stored in elasticsearch

String script="int oldBalance=ctx._source.balance;"+
"int deduction=ctx.op == \"create\" ? (payment/2) : payment;"+
"ctx._source.balance=oldBalance-deduction;";
int openingBalance=10;

// Pay money from what will be a new account and opening balance comes from upsert doc
// provided by client
UpdateResponse updateResponse = client().prepareUpdate("test", "type1", "1")
.setUpsert(XContentFactory.jsonBuilder().startObject().field("balance", openingBalance).endObject())
.setScriptedUpsert(true)
.addScriptParam("payment", 2)
.setScript(script, ScriptService.ScriptType.INLINE)
.execute().actionGet();
assertTrue(updateResponse.isCreated());

for (int i = 0; i < 5; i++) {
GetResponse getResponse = client().prepareGet("test", "type1", "1").execute().actionGet();
assertThat(getResponse.getSourceAsMap().get("balance").toString(), equalTo("9"));
}

// Now pay money for an existing account where balance is stored in es
updateResponse = client().prepareUpdate("test", "type1", "1")
.setUpsert(XContentFactory.jsonBuilder().startObject().field("balance", openingBalance).endObject())
.setScriptedUpsert(true)
.addScriptParam("payment", 2)
.setScript(script, ScriptService.ScriptType.INLINE)
.execute().actionGet();
assertFalse(updateResponse.isCreated());


for (int i = 0; i < 5; i++) {
GetResponse getResponse = client().prepareGet("test", "type1", "1").execute().actionGet();
assertThat(getResponse.getSourceAsMap().get("balance").toString(), equalTo("7"));
}
}

@Test
public void testUpsertDoc() throws Exception {
Expand Down

0 comments on commit e6b459c

Please sign in to comment.