Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions x-pack/docs/en/watcher/actions/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ The following snippet shows a simple `index` action definition:
<1> The id of the action
<2> An optional <<condition,condition>> to restrict action execution
<3> An optional <<transform,transform>> to transform the payload and prepare the data that should be indexed
<4> The elasticsearch index to store the data to
<5> An optional `_id` for the document, if it should always be the same document.
<4> The index, alias, or data stream to which the data will be written
<5> An optional `_id` for the document


[[index-action-attributes]]
Expand All @@ -37,11 +37,15 @@ The following snippet shows a simple `index` action definition:
|======
|Name |Required | Default | Description

| `index` | yes | - | The Elasticsearch index to index into.
| `index` | yes | - | The index, alias, or data stream to index into.


| `doc_id` | no | - | The optional `_id` of the document.

| `op_type` | no | `index` | The <<docs-index-api-op_type,op_type>> for the index operation.
Must be one of either `index` or `create`. Must be `create` if
`index` is a data stream.

| `execution_time_field` | no | - | The field that will store/index the watch execution
time.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ public Action.Result execute(String actionId, WatchExecutionContext ctx, Payload
indexRequest.index(getField(actionId, ctx.id().watchId(), "index", data, INDEX_FIELD, action.index));
indexRequest.type(getField(actionId, ctx.id().watchId(), "type",data, TYPE_FIELD, action.docType));
indexRequest.id(getField(actionId, ctx.id().watchId(), "id",data, ID_FIELD, action.docId));
if (action.opType != null) {
indexRequest.opType(action.opType);
}

data = addTimestampToDocument(data, ctx.executionTime());
BytesReference bytesReference;
Expand Down Expand Up @@ -130,6 +133,9 @@ Action.Result indexBulk(Iterable list, String actionId, WatchExecutionContext ct
indexRequest.index(getField(actionId, ctx.id().watchId(), "index", doc, INDEX_FIELD, action.index));
indexRequest.type(getField(actionId, ctx.id().watchId(), "type",doc, TYPE_FIELD, action.docType));
indexRequest.id(getField(actionId, ctx.id().watchId(), "id",doc, ID_FIELD, action.docId));
if (action.opType != null) {
indexRequest.opType(action.opType);
}

doc = addTimestampToDocument(doc, ctx.executionTime());
try (XContentBuilder builder = jsonBuilder()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
package org.elasticsearch.xpack.watcher.actions.index;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.collect.List;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.time.DateUtils;
import org.elasticsearch.common.unit.TimeValue;
Expand All @@ -31,6 +33,7 @@ public class IndexAction implements Action {
@Nullable @Deprecated final String docType;
@Nullable final String index;
@Nullable final String docId;
@Nullable final DocWriteRequest.OpType opType;
@Nullable final String executionTimeField;
@Nullable final TimeValue timeout;
@Nullable final ZoneId dynamicNameTimeZone;
Expand All @@ -42,18 +45,20 @@ public class IndexAction implements Action {
public IndexAction(@Nullable String index, @Nullable String docId,
@Nullable String executionTimeField,
@Nullable TimeValue timeout, @Nullable ZoneId dynamicNameTimeZone, @Nullable RefreshPolicy refreshPolicy) {
this(index, null, docId, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
this(index, null, docId, null, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
}

/**
* Document types are deprecated, use constructor without docType
*/
@Deprecated
public IndexAction(@Nullable String index, @Nullable String docType, @Nullable String docId,
@Nullable String executionTimeField,
@Nullable TimeValue timeout, @Nullable ZoneId dynamicNameTimeZone, @Nullable RefreshPolicy refreshPolicy) {
public IndexAction(@Nullable String index, @Nullable String docType, @Nullable String docId, @Nullable DocWriteRequest.OpType opType,
@Nullable String executionTimeField, @Nullable TimeValue timeout, @Nullable ZoneId dynamicNameTimeZone,
@Nullable RefreshPolicy refreshPolicy) {
this.index = index;
this.docType = docType;
this.docId = docId;
this.opType = opType;
this.executionTimeField = executionTimeField;
this.timeout = timeout;
this.dynamicNameTimeZone = dynamicNameTimeZone;
Expand All @@ -77,6 +82,10 @@ public String getDocId() {
return docId;
}

public DocWriteRequest.OpType getOpType() {
return opType;
}

public String getExecutionTimeField() {
return executionTimeField;
}
Expand All @@ -96,7 +105,10 @@ public boolean equals(Object o) {

IndexAction that = (IndexAction) o;

return Objects.equals(index, that.index) && Objects.equals(docType, that.docType) && Objects.equals(docId, that.docId)
return Objects.equals(index, that.index)
&& Objects.equals(docType, that.docType)
&& Objects.equals(docId, that.docId)
&& Objects.equals(opType, that.opType)
&& Objects.equals(executionTimeField, that.executionTimeField)
&& Objects.equals(timeout, that.timeout)
&& Objects.equals(dynamicNameTimeZone, that.dynamicNameTimeZone)
Expand All @@ -105,7 +117,7 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
return Objects.hash(index, docType, docId, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
return Objects.hash(index, docType, docId, opType, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
}

@Override
Expand All @@ -120,6 +132,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (docId != null) {
builder.field(Field.DOC_ID.getPreferredName(), docId);
}
if (opType != null) {
builder.field(Field.OP_TYPE.getPreferredName(), opType);
}
if (executionTimeField != null) {
builder.field(Field.EXECUTION_TIME_FIELD.getPreferredName(), executionTimeField);
}
Expand All @@ -139,6 +154,7 @@ public static IndexAction parse(String watchId, String actionId, XContentParser
String index = null;
String docType = null;
String docId = null;
DocWriteRequest.OpType opType = null;
String executionTimeField = null;
TimeValue timeout = null;
ZoneId dynamicNameTimeZone = null;
Expand Down Expand Up @@ -169,6 +185,17 @@ public static IndexAction parse(String watchId, String actionId, XContentParser
docType = parser.text();
} else if (Field.DOC_ID.match(currentFieldName, parser.getDeprecationHandler())) {
docId = parser.text();
} else if (Field.OP_TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
try {
opType = DocWriteRequest.OpType.fromString(parser.text());
if (List.of(DocWriteRequest.OpType.CREATE, DocWriteRequest.OpType.INDEX).contains(opType) == false) {
throw new ElasticsearchParseException("could not parse [{}] action [{}/{}]. op_type value for field [{}] " +
"must be [index] or [create]", TYPE, watchId, actionId, currentFieldName);
}
} catch (IllegalArgumentException e) {
throw new ElasticsearchParseException("could not parse [{}] action [{}/{}]. failed to parse op_type value for " +
"field [{}]", TYPE, watchId, actionId, currentFieldName);
}
} else if (Field.EXECUTION_TIME_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
executionTimeField = parser.text();
} else if (Field.TIMEOUT_HUMAN.match(currentFieldName, parser.getDeprecationHandler())) {
Expand All @@ -193,7 +220,7 @@ public static IndexAction parse(String watchId, String actionId, XContentParser
}
}

return new IndexAction(index, docType, docId, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
return new IndexAction(index, docType, docId, opType, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
}

/**
Expand Down Expand Up @@ -289,6 +316,7 @@ public static class Builder implements Action.Builder<IndexAction> {
final String index;
final String docType;
String docId;
DocWriteRequest.OpType opType;
String executionTimeField;
TimeValue timeout;
ZoneId dynamicNameTimeZone;
Expand All @@ -313,6 +341,11 @@ public Builder setDocId(String docId) {
return this;
}

public Builder setOpType(DocWriteRequest.OpType opType) {
this.opType = opType;
return this;
}

public Builder setExecutionTimeField(String executionTimeField) {
this.executionTimeField = executionTimeField;
return this;
Expand All @@ -335,14 +368,15 @@ public Builder setRefreshPolicy(RefreshPolicy refreshPolicy) {

@Override
public IndexAction build() {
return new IndexAction(index, docType, docId, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
return new IndexAction(index, docType, docId, opType, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
}
}

interface Field {
ParseField INDEX = new ParseField("index");
ParseField DOC_TYPE = new ParseField("doc_type");
ParseField DOC_ID = new ParseField("doc_id");
ParseField OP_TYPE = new ParseField("op_type");
ParseField EXECUTION_TIME_FIELD = new ParseField("execution_time_field");
ParseField SOURCE = new ParseField("source");
ParseField RESPONSE = new ParseField("response");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import static org.elasticsearch.common.util.set.Sets.newHashSet;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasSize;
Expand Down Expand Up @@ -87,6 +88,10 @@ public void testParser() throws Exception {
if (writeTimeout != null) {
builder.field(IndexAction.Field.TIMEOUT.getPreferredName(), writeTimeout.millis());
}
DocWriteRequest.OpType opType = randomBoolean() ? DocWriteRequest.OpType.fromId(randomFrom(new Byte[] { 0, 1 })) : null;
if (opType != null) {
builder.field(IndexAction.Field.OP_TYPE.getPreferredName(), opType.getLowercase());
}
builder.endObject();
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, client);
XContentParser parser = createParser(builder);
Expand All @@ -100,6 +105,9 @@ public void testParser() throws Exception {
if (timestampField != null) {
assertThat(executable.action().executionTimeField, equalTo(timestampField));
}
if (opType != null) {
assertThat(executable.action().opType, equalTo(opType));
}
assertThat(executable.action().timeout, equalTo(writeTimeout));
}

Expand Down Expand Up @@ -146,20 +154,47 @@ public void testParserFailure() throws Exception {
.endObject());
}

public void testOpTypeThatCannotBeParsed() throws Exception {
expectParseFailure(jsonBuilder()
.startObject()
.field(IndexAction.Field.OP_TYPE.getPreferredName(), randomAlphaOfLength(10))
.endObject(),
"failed to parse op_type value for field [op_type]");
}

public void testUnsupportedOpType() throws Exception {
expectParseFailure(jsonBuilder()
.startObject()
.field(IndexAction.Field.OP_TYPE.getPreferredName(),
randomFrom(DocWriteRequest.OpType.UPDATE.name(), DocWriteRequest.OpType.DELETE.name()))
.endObject(),
"op_type value for field [op_type] must be [index] or [create]");
}

private void expectParseFailure(XContentBuilder builder, String expectedMessage) throws Exception {
expectFailure(ElasticsearchParseException.class, builder, expectedMessage);
}

private void expectParseFailure(XContentBuilder builder) throws Exception {
expectFailure(ElasticsearchParseException.class, builder);
}

private void expectFailure(Class clazz, XContentBuilder builder) throws Exception {
expectFailure(clazz, builder, null);
}

private void expectFailure(Class clazz, XContentBuilder builder, String expectedMessage) throws Exception {
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, client);
XContentParser parser = createParser(builder);
parser.nextToken();
expectThrows(clazz, () ->
actionParser.parseExecutable(randomAlphaOfLength(4), randomAlphaOfLength(5), parser));
Throwable t = expectThrows(clazz, () -> actionParser.parseExecutable(randomAlphaOfLength(4), randomAlphaOfLength(5), parser));
if (expectedMessage != null) {
assertThat(t.getMessage(), containsString(expectedMessage));
}
}

public void testUsingParameterIdWithBulkOrIdFieldThrowsIllegalState() {
final IndexAction action = new IndexAction("test-index", "test-type", "123", null, null, null, refreshPolicy);
final IndexAction action = new IndexAction("test-index", "test-type", "123", null, null, null, null, refreshPolicy);
final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
final Map<String, Object> docWithId = MapBuilder.<String, Object>newMapBuilder().put("foo", "bar").put("_id", "0").immutableMap();
Expand Down Expand Up @@ -209,7 +244,7 @@ public void testThatIndexTypeIdDynamically() throws Exception {
final IndexAction action = new IndexAction(configureIndexDynamically ? null : "my_index",
configureTypeDynamically ? null : "my_type",
configureIdDynamically ? null : "my_id",
null, null, null, refreshPolicy);
null, null, null, null, refreshPolicy);
final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));

Expand All @@ -230,7 +265,7 @@ public void testThatIndexTypeIdDynamically() throws Exception {
}

public void testThatIndexActionCanBeConfiguredWithDynamicIndexNameAndBulk() throws Exception {
final IndexAction action = new IndexAction(null, "my-type", null, null, null, null, refreshPolicy);
final IndexAction action = new IndexAction(null, "my-type", null, null, null, null, null, refreshPolicy);
final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));

Expand Down Expand Up @@ -263,7 +298,7 @@ public void testConfigureIndexInMapAndAction() {
String fieldName = randomFrom("_index", "_type");
final IndexAction action = new IndexAction(fieldName.equals("_index") ? "my_index" : null,
fieldName.equals("_type") ? "my_type" : null,
null,null, null, null, refreshPolicy);
null, null, null, null, null, refreshPolicy);
final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));

Expand All @@ -283,7 +318,7 @@ public void testIndexActionExecuteSingleDoc() throws Exception {
String docId = randomAlphaOfLength(5);
String timestampField = randomFrom("@timestamp", null);

IndexAction action = new IndexAction("test-index", "test-type", docIdAsParam ? docId : null, timestampField, null, null,
IndexAction action = new IndexAction("test-index", "test-type", docIdAsParam ? docId : null, null, timestampField, null, null,
refreshPolicy);
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30),
TimeValue.timeValueSeconds(30));
Expand Down Expand Up @@ -334,7 +369,7 @@ public void testIndexActionExecuteSingleDoc() throws Exception {
}

public void testFailureResult() throws Exception {
IndexAction action = new IndexAction("test-index", "test-type", null, "@timestamp", null, null, refreshPolicy);
IndexAction action = new IndexAction("test-index", "test-type", null, null, "@timestamp", null, null, refreshPolicy);
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
Expand Down Expand Up @@ -585,15 +586,16 @@ private List<ActionWrapper> randomActions() {
randomFrom(DataAttachment.JSON, DataAttachment.YAML), EmailAttachments.EMPTY_ATTACHMENTS);
list.add(new ActionWrapper("_email_" + randomAlphaOfLength(8), randomThrottler(),
AlwaysConditionTests.randomCondition(scriptService), randomTransform(),
new ExecutableEmailAction(action, logger, emailService, templateEngine, htmlSanitizer,
new ExecutableEmailAction(action, logger, emailService, templateEngine, htmlSanitizer,
Collections.emptyMap()), null, null));
}
if (randomBoolean()) {
ZoneOffset timeZone = randomBoolean() ? ZoneOffset.UTC : null;
TimeValue timeout = randomBoolean() ? timeValueSeconds(between(1, 10000)) : null;
WriteRequest.RefreshPolicy refreshPolicy = randomBoolean() ? null : randomFrom(WriteRequest.RefreshPolicy.values());
IndexAction action = new IndexAction("_index", null, randomBoolean() ? "123" : null, null, timeout, timeZone,
refreshPolicy);
IndexAction action = new IndexAction("_index", null, randomBoolean() ? "123" : null,
randomBoolean() ? DocWriteRequest.OpType.fromId(randomFrom(new Byte[] { 0, 1 })) : null, null, timeout, timeZone,
refreshPolicy);
list.add(new ActionWrapper("_index_" + randomAlphaOfLength(8), randomThrottler(),
AlwaysConditionTests.randomCondition(scriptService), randomTransform(),
new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30),
Expand Down