Skip to content

Commit

Permalink
Script: Metadata for update context (#88333)
Browse files Browse the repository at this point in the history
Adds the `metadata()` API call and a Metadata class for the Update context.

There are different metadata available in the update context depending
on whether it is an update or an insert (via upsert).

For update, scripts can read `index`, `id`, `routing`, `version` and `timestamp`.

For insert, scripts can read `index`, `id` and `timestamp`.

Scripts can always read and write the `op` but the available ops are different.

Updates allow 'noop', 'index' and 'delete'.
Inserts allow 'noop' and 'create'.

Refs: #86472
  • Loading branch information
stu-elastic committed Jul 19, 2022
1 parent 2f5ec51 commit fa25c31
Show file tree
Hide file tree
Showing 17 changed files with 634 additions and 57 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/88333.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 88333
summary: "Script: Metadata for update context"
area: Infra/Scripting
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,17 @@ class org.elasticsearch.painless.api.Json {
String dump(def)
String dump(def, boolean)
}

class org.elasticsearch.script.Metadata {
String getIndex()
String getId()
String getRouting()
long getVersion()
String getOp()
void setOp(String)
ZonedDateTime getTimestamp()
}

class org.elasticsearch.script.UpdateScript {
Metadata metadata()
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,47 @@
- match: { error.root_cause.0.type: "illegal_argument_exception" }
- match: { error.type: "illegal_argument_exception" }
- match: { error.reason: "Iterable object is self-referencing itself" }

---
"Script Update Metadata":
- skip:
version: " - 8.3.99"
reason: "update metadata introduced in 8.4.0"

- do:
update:
index: test_1
id: "2"
body:
script:
source: "ctx._source.bar = metadata().id + '-extra'"
lang: "painless"
upsert: {}
scripted_upsert: true

- do:
get:
index: test_1
id: "2"

- match: { _source.bar: 2-extra }
- match: { found: true }

- do:
update:
index: test_1
id: "2"
body:
script:
source: "metadata().op = 'delete'"
lang: "painless"
upsert: {}
scripted_upsert: true

- do:
catch: missing
get:
index: test_1
id: "2"

- match: { found: false }
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,97 @@
id: "4"

- match: { _source.within_one_minute: true }

---
"Script Upsert Metadata":
- skip:
version: " - 8.3.99"
reason: "update metadata introduced in 8.4.0"

- do:
catch: /routing is unavailable for insert/
update:
index: test_1
id: "1"
body:
script:
source: "ctx._source.foo = metadata().routing"
lang: "painless"
upsert: {}
scripted_upsert: true

- do:
update:
index: test_1
id: "2"
body:
script:
source: "ctx._source.foo = metadata().index + '_1'; ctx._source.bar = 'nothing'"
lang: "painless"
upsert: {}
scripted_upsert: true

- do:
get:
index: test_1
id: "2"

- match: { _source.foo: test_1_1 }
- match: { _source.bar: nothing }

- do:
update:
index: test_1
id: "3"
body:
script:
source: "metadata().op = 'noop'; ctx._source.bar = 'skipped?'"
lang: "painless"
upsert: {}
scripted_upsert: true

- do:
catch: missing
get:
index: test_1
id: "3"

- match: { found: false }

- do:
update:
index: test_1
id: "3"
body:
script:
source: "metadata().op = 'create'; ctx._source.bar = 'skipped?'"
lang: "painless"
upsert: {}
scripted_upsert: true

- do:
get:
index: test_1
id: "3"

- match: { found: true }
- match: { _source.bar: "skipped?" }

# update
- do:
update:
index: test_1
id: "2"
body:
script:
source: "ctx._source.bar = metadata().op + '-extra'"
lang: "painless"
upsert: {}
scripted_upsert: true

- do:
get:
index: test_1
id: "2"

- match: { _source.bar: index-extra }
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ protected <T extends ActionRequest> T applyScript(Consumer<Map<String, Object>>
(params, ctx) -> new UpdateScript(Collections.emptyMap(), ctx) {
@Override
public void execute() {
scriptBody.accept(getCtx());
scriptBody.accept(ctx);
}
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Tuple;
Expand All @@ -32,7 +31,9 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.UpdateCtxMap;
import org.elasticsearch.script.UpdateScript;
import org.elasticsearch.script.UpsertCtxMap;
import org.elasticsearch.search.lookup.SourceLookup;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentType;
Expand Down Expand Up @@ -87,25 +88,16 @@ protected Result prepare(ShardId shardId, UpdateRequest request, final GetResult
* Execute a scripted upsert, where there is an existing upsert document and a script to be executed. The script is executed and a new
* Tuple of operation and updated {@code _source} is returned.
*/
Tuple<UpdateOpType, Map<String, Object>> executeScriptedUpsert(Map<String, Object> upsertDoc, Script script, LongSupplier nowInMillis) {
Map<String, Object> ctx = Maps.newMapWithExpectedSize(3);
// Tell the script that this is a create and not an update
ctx.put(ContextFields.OP, UpdateOpType.CREATE.toString());
ctx.put(ContextFields.SOURCE, upsertDoc);
ctx.put(ContextFields.NOW, nowInMillis.getAsLong());
ctx = executeScript(script, ctx);

UpdateOpType operation = UpdateOpType.lenientFromString((String) ctx.get(ContextFields.OP), logger, script.getIdOrCode());
@SuppressWarnings("unchecked")
Map<String, Object> newSource = (Map<String, Object>) ctx.get(ContextFields.SOURCE);

Tuple<UpdateOpType, Map<String, Object>> executeScriptedUpsert(Script script, UpsertCtxMap ctxMap) {
ctxMap = executeScript(script, ctxMap);
UpdateOpType operation = UpdateOpType.lenientFromString(ctxMap.getMetadata().getOp(), logger, script.getIdOrCode());
if (operation != UpdateOpType.CREATE && operation != UpdateOpType.NONE) {
// Only valid options for an upsert script are "create" (the default) or "none", meaning abort upsert
logger.warn("Invalid upsert operation [{}] for script [{}], doing nothing...", operation, script.getIdOrCode());
operation = UpdateOpType.NONE;
}

return new Tuple<>(operation, newSource);
return new Tuple<>(operation, ctxMap.getSource());
}

/**
Expand All @@ -120,11 +112,14 @@ Result prepareUpsert(ShardId shardId, UpdateRequest request, final GetResult get
if (request.scriptedUpsert() && request.script() != null) {
// Run the script to perform the create logic
IndexRequest upsert = request.upsertRequest();
Tuple<UpdateOpType, Map<String, Object>> upsertResult = executeScriptedUpsert(
upsert.sourceAsMap(),
request.script,
nowInMillis
UpsertCtxMap ctxMap = new UpsertCtxMap(
getResult.getIndex(),
getResult.getId(),
UpdateOpType.CREATE.toString(),
nowInMillis.getAsLong(),
upsert.sourceAsMap()
);
Tuple<UpdateOpType, Map<String, Object>> upsertResult = executeScriptedUpsert(request.script, ctxMap);
switch (upsertResult.v1()) {
case CREATE -> indexRequest = Requests.indexRequest(request.index()).source(upsertResult.v2());
case NONE -> {
Expand Down Expand Up @@ -237,24 +232,22 @@ Result prepareUpdateScriptRequest(ShardId shardId, UpdateRequest request, GetRes
final String routing = calculateRouting(getResult, currentRequest);
final Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(getResult.internalSourceRef(), true);
final XContentType updateSourceContentType = sourceAndContent.v1();
final Map<String, Object> sourceAsMap = sourceAndContent.v2();

Map<String, Object> ctx = Maps.newMapWithExpectedSize(16);
ctx.put(ContextFields.OP, UpdateOpType.INDEX.toString()); // The default operation is "index"
ctx.put(ContextFields.INDEX, getResult.getIndex());
ctx.put(ContextFields.TYPE, MapperService.SINGLE_MAPPING_NAME);
ctx.put(ContextFields.ID, getResult.getId());
ctx.put(ContextFields.VERSION, getResult.getVersion());
ctx.put(ContextFields.ROUTING, routing);
ctx.put(ContextFields.SOURCE, sourceAsMap);
ctx.put(ContextFields.NOW, nowInMillis.getAsLong());

ctx = executeScript(request.script, ctx);

UpdateOpType operation = UpdateOpType.lenientFromString((String) ctx.get(ContextFields.OP), logger, request.script.getIdOrCode());

@SuppressWarnings("unchecked")
final Map<String, Object> updatedSourceAsMap = (Map<String, Object>) ctx.get(ContextFields.SOURCE);
UpdateCtxMap ctxMap = executeScript(
request.script,
new UpdateCtxMap(
getResult.getIndex(),
getResult.getId(),
getResult.getVersion(),
routing,
MapperService.SINGLE_MAPPING_NAME,
UpdateOpType.INDEX.toString(), // The default operation is "index"
nowInMillis.getAsLong(),
sourceAndContent.v2()
)
);
UpdateOpType operation = UpdateOpType.lenientFromString(ctxMap.getMetadata().getOp(), logger, request.script.getIdOrCode());
final Map<String, Object> updatedSourceAsMap = ctxMap.getSource();

switch (operation) {
case INDEX -> {
Expand Down Expand Up @@ -307,17 +300,17 @@ Result prepareUpdateScriptRequest(ShardId shardId, UpdateRequest request, GetRes
}
}

private Map<String, Object> executeScript(Script script, Map<String, Object> ctx) {
private <T extends UpdateCtxMap> T executeScript(Script script, T ctxMap) {
try {
if (scriptService != null) {
UpdateScript.Factory factory = scriptService.compile(script, UpdateScript.CONTEXT);
UpdateScript executableScript = factory.newInstance(script.getParams(), ctx);
UpdateScript executableScript = factory.newInstance(script.getParams(), ctxMap);
executableScript.execute();
}
} catch (Exception e) {
throw new IllegalArgumentException("failed to execute script", e);
}
return ctx;
return ctxMap;
}

/**
Expand Down Expand Up @@ -429,6 +422,7 @@ public static UpdateOpType lenientFromString(String operation, Logger logger, St
return UpdateOpType.INDEX;
case "delete":
return UpdateOpType.DELETE;
case "noop":
case "none":
return UpdateOpType.NONE;
default:
Expand Down
10 changes: 10 additions & 0 deletions server/src/main/java/org/elasticsearch/ingest/IngestCtxMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,14 @@ public static ZonedDateTime getTimestamp(Map<String, Object> ingestMetadata) {
return null;
}

@Override
public Map<String, Object> getSource() {
return source;
}

@Override
protected Map<String, Object> wrapSource(Map<String, Object> source) {
// Not wrapped in Ingest
return source;
}
}
15 changes: 13 additions & 2 deletions server/src/main/java/org/elasticsearch/script/CtxMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.script;

import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.set.Sets;

import java.util.AbstractCollection;
Expand All @@ -28,6 +29,7 @@
* validation via {@link Metadata}.
*/
public class CtxMap extends AbstractMap<String, Object> {
protected static final String SOURCE = "_source";
protected final Map<String, Object> source;
protected final Metadata metadata;

Expand All @@ -38,7 +40,7 @@ public class CtxMap extends AbstractMap<String, Object> {
* @param metadata the metadata map
*/
protected CtxMap(Map<String, Object> source, Metadata metadata) {
this.source = source != null ? source : new HashMap<>();
this.source = wrapSource(source != null ? source : new HashMap<>());
this.metadata = metadata;
Set<String> badKeys = Sets.intersection(this.metadata.keySet(), this.source.keySet());
if (badKeys.size() > 0) {
Expand All @@ -50,11 +52,20 @@ protected CtxMap(Map<String, Object> source, Metadata metadata) {
}
}

protected Map<String, Object> wrapSource(Map<String, Object> source) {
Map<String, Object> wrapper = Maps.newHashMapWithExpectedSize(1);
wrapper.put(SOURCE, source);
return wrapper;
}

/**
* get the source map, if externally modified then the guarantees of this class are not enforced
*/
@SuppressWarnings("unchecked")
public Map<String, Object> getSource() {
return source;
Object rawSource = source.get(SOURCE);
assert rawSource instanceof Map<?, ?> : " wrapped source of unexpected type";
return (Map<String, Object>) rawSource;
}

/**
Expand Down

0 comments on commit fa25c31

Please sign in to comment.