Skip to content

Commit

Permalink
Watcher: Allow to execute actions for each element in array (#41997)
Browse files Browse the repository at this point in the history
This adds the ability to execute an action for each element that occurs
in an array, for example you could sent a dedicated slack action for
each search hit returned from a search.

There is also a limit for the number of actions executed, which is 
hardcoded to 100 right now, to prevent having watches run forever.

The watch history logs each action result and the total number of actions
the were executed.

Relates #34546
  • Loading branch information
spinscale committed Jul 3, 2019
1 parent d6f36a8 commit e63fd53
Show file tree
Hide file tree
Showing 12 changed files with 447 additions and 31 deletions.
43 changes: 43 additions & 0 deletions x-pack/docs/en/watcher/actions.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,49 @@ of a watch during its execution:
image::images/action-throttling.jpg[align="center"]


[[action-foreach]]
=== Running an action for each element in an array

You can use the `foreach` field in an action to trigger the configured action
for every element within that array.

In order to protect from long running watches, after one hundred runs with an
foreach loop the execution is gracefully stopped.

[source,js]
--------------------------------------------------
PUT _watcher/watch/log_event_watch
{
"trigger" : {
"schedule" : { "interval" : "5m" }
},
"input" : {
"search" : {
"request" : {
"indices" : "log-events",
"body" : {
"query" : { "match" : { "status" : "error" } }
}
}
}
},
"condition" : {
"compare" : { "ctx.payload.hits.total" : { "gt" : 0 } }
},
"actions" : {
"log_hits" : {
"foreach" : "ctx.payload.hits.hits", <1>
"logging" : {
"text" : "Found id {{ctx.payload._id}} with field {{ctx.payload._source.my_field}}"
}
}
}
}
--------------------------------------------------
// CONSOLE

<1> The logging statement will be executed for each of the returned search hits.

[[action-conditions]]
=== Adding conditions to actions

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectPath;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.script.JodaCompatibleZonedDateTime;
import org.elasticsearch.xpack.core.watcher.actions.throttler.ActionThrottler;
import org.elasticsearch.xpack.core.watcher.actions.throttler.Throttler;
import org.elasticsearch.xpack.core.watcher.actions.throttler.ThrottlerField;
Expand All @@ -30,29 +34,42 @@
import java.time.Clock;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;

public class ActionWrapper implements ToXContentObject {

private final int MAXIMUM_FOREACH_RUNS = 100;

private String id;
@Nullable
private final ExecutableCondition condition;
@Nullable
private final ExecutableTransform<Transform, Transform.Result> transform;
private final ActionThrottler throttler;
private final ExecutableAction<? extends Action> action;
@Nullable
private String path;

public ActionWrapper(String id, ActionThrottler throttler,
@Nullable ExecutableCondition condition,
@Nullable ExecutableTransform<Transform, Transform.Result> transform,
ExecutableAction<? extends Action> action) {
ExecutableAction<? extends Action> action,
@Nullable String path) {
this.id = id;
this.condition = condition;
this.throttler = throttler;
this.transform = transform;
this.action = action;
this.path = path;
}

public String id() {
Expand Down Expand Up @@ -140,16 +157,90 @@ public ActionWrapperResult execute(WatchExecutionContext ctx) {
return new ActionWrapperResult(id, conditionResult, null, new Action.Result.FailureWithException(action.type(), e));
}
}
try {
Action.Result actionResult = action.execute(id, ctx, payload);
return new ActionWrapperResult(id, conditionResult, transformResult, actionResult);
} catch (Exception e) {
action.logger().error(
if (Strings.isEmpty(path)) {
try {
Action.Result actionResult = action.execute(id, ctx, payload);
return new ActionWrapperResult(id, conditionResult, transformResult, actionResult);
} catch (Exception e) {
action.logger().error(
(Supplier<?>) () -> new ParameterizedMessage("failed to execute action [{}/{}]", ctx.watch().id(), id), e);
return new ActionWrapperResult(id, new Action.Result.FailureWithException(action.type(), e));
}
} else {
try {
List<Action.Result> results = new ArrayList<>();
Object object = ObjectPath.eval(path, toMap(ctx));
int runs = 0;
if (object instanceof Collection) {
Collection collection = Collection.class.cast(object);
if (collection.isEmpty()) {
throw new ElasticsearchException("foreach object [{}] was an empty list, could not run any action", path);
} else {
for (Object o : collection) {
if (runs >= MAXIMUM_FOREACH_RUNS) {
break;
}
if (o instanceof Map) {
results.add(action.execute(id, ctx, new Payload.Simple((Map<String, Object>) o)));
} else {
results.add(action.execute(id, ctx, new Payload.Simple("_value", o)));
}
runs++;
}
}
} else if (object == null) {
throw new ElasticsearchException("specified foreach object was null: [{}]", path);
} else {
throw new ElasticsearchException("specified foreach object was not a an array/collection: [{}]", path);
}

// check if we have mixed results, then set to partial failure
final Set<Action.Result.Status> statuses = results.stream().map(Action.Result::status).collect(Collectors.toSet());
Action.Result.Status status;
if (statuses.size() == 1) {
status = statuses.iterator().next();
} else {
status = Action.Result.Status.PARTIAL_FAILURE;
}

final int numberOfActionsExecuted = runs;
return new ActionWrapperResult(id, conditionResult, transformResult,
new Action.Result(action.type(), status) {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("number_of_actions_executed", numberOfActionsExecuted);
builder.startArray(WatchField.FOREACH.getPreferredName());
for (Action.Result result : results) {
builder.startObject();
result.toXContent(builder, params);
builder.endObject();
}
builder.endArray();
return builder;
}
});
} catch (Exception e) {
action.logger().error(
(Supplier<?>) () -> new ParameterizedMessage("failed to execute action [{}/{}]", ctx.watch().id(), id), e);
return new ActionWrapperResult(id, new Action.Result.FailureWithException(action.type(), e));
return new ActionWrapperResult(id, new Action.Result.FailureWithException(action.type(), e));
}
}
}

private Map<String, Object> toMap(WatchExecutionContext ctx) {
Map<String, Object> model = new HashMap<>();
model.put("id", ctx.id().value());
model.put("watch_id", ctx.id().watchId());
model.put("execution_time", new JodaCompatibleZonedDateTime(ctx.executionTime().toInstant(), ZoneOffset.UTC));
model.put("trigger", ctx.triggerEvent().data());
model.put("metadata", ctx.watch().metadata());
model.put("vars", ctx.vars());
if (ctx.payload().data() != null) {
model.put("payload", ctx.payload().data());
}
return Map.of("ctx", model);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down Expand Up @@ -186,6 +277,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
.field(transform.type(), transform, params)
.endObject();
}
if (Strings.isEmpty(path) == false) {
builder.field(WatchField.FOREACH.getPreferredName(), path);
}
builder.field(action.type(), action, params);
return builder.endObject();
}
Expand All @@ -198,6 +292,7 @@ static ActionWrapper parse(String watchId, String actionId, XContentParser parse
ExecutableCondition condition = null;
ExecutableTransform<Transform, Transform.Result> transform = null;
TimeValue throttlePeriod = null;
String path = null;
ExecutableAction<? extends Action> action = null;

String currentFieldName = null;
Expand All @@ -208,6 +303,8 @@ static ActionWrapper parse(String watchId, String actionId, XContentParser parse
} else {
if (WatchField.CONDITION.match(currentFieldName, parser.getDeprecationHandler())) {
condition = actionRegistry.getConditionRegistry().parseExecutable(watchId, parser);
} else if (WatchField.FOREACH.match(currentFieldName, parser.getDeprecationHandler())) {
path = parser.text();
} else if (Transform.TRANSFORM.match(currentFieldName, parser.getDeprecationHandler())) {
transform = actionRegistry.getTransformRegistry().parse(watchId, parser);
} else if (ThrottlerField.THROTTLE_PERIOD.match(currentFieldName, parser.getDeprecationHandler())) {
Expand Down Expand Up @@ -235,7 +332,7 @@ static ActionWrapper parse(String watchId, String actionId, XContentParser parse
}

ActionThrottler throttler = new ActionThrottler(clock, throttlePeriod, licenseState);
return new ActionWrapper(actionId, throttler, condition, transform, action);
return new ActionWrapper(actionId, throttler, condition, transform, action, path);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public WatchSourceBuilder addAction(String id, TimeValue throttlePeriod, Transfo
}

public WatchSourceBuilder addAction(String id, TimeValue throttlePeriod, Transform transform, Action action) {
actions.put(id, new TransformedAction(id, action, throttlePeriod, null, transform));
actions.put(id, new TransformedAction(id, action, throttlePeriod, null, transform, null));
return this;
}

Expand All @@ -111,7 +111,13 @@ public WatchSourceBuilder addAction(String id, TimeValue throttlePeriod, Conditi
}

public WatchSourceBuilder addAction(String id, TimeValue throttlePeriod, Condition condition, Transform transform, Action action) {
actions.put(id, new TransformedAction(id, action, throttlePeriod, condition, transform));
actions.put(id, new TransformedAction(id, action, throttlePeriod, condition, transform, null));
return this;
}

public WatchSourceBuilder addAction(String id, TimeValue throttlePeriod, Condition condition, Transform transform, String path,
Action action) {
actions.put(id, new TransformedAction(id, action, throttlePeriod, condition, transform, path));
return this;
}

Expand Down Expand Up @@ -186,16 +192,18 @@ public final BytesReference buildAsBytes(XContentType contentType) {
static class TransformedAction implements ToXContentObject {

private final Action action;
@Nullable private String path;
@Nullable private final TimeValue throttlePeriod;
@Nullable private final Condition condition;
@Nullable private final Transform transform;

TransformedAction(String id, Action action, @Nullable TimeValue throttlePeriod,
@Nullable Condition condition, @Nullable Transform transform) {
@Nullable Condition condition, @Nullable Transform transform, @Nullable String path) {
this.throttlePeriod = throttlePeriod;
this.condition = condition;
this.transform = transform;
this.action = action;
this.path = path;
}

@Override
Expand All @@ -215,6 +223,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
.field(transform.type(), transform, params)
.endObject();
}
if (path != null) {
builder.field("foreach", path);
}
builder.field(action.type(), action, params);
return builder.endObject();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ public final class WatcherIndexTemplateRegistryField {
// version 7: add full exception stack traces for better debugging
// version 8: fix slack attachment property not to be dynamic, causing field type issues
// version 9: add a user field defining which user executed the watch
// version 10: add support for foreach path in actions
// Note: if you change this, also inform the kibana team around the watcher-ui
public static final String INDEX_TEMPLATE_VERSION = "9";
public static final String INDEX_TEMPLATE_VERSION = "10";
public static final String HISTORY_TEMPLATE_NAME = ".watch-history-" + INDEX_TEMPLATE_VERSION;
public static final String HISTORY_TEMPLATE_NAME_NO_ILM = ".watch-history-no-ilm-" + INDEX_TEMPLATE_VERSION;
public static final String TRIGGERED_TEMPLATE_NAME = ".triggered_watches";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public final class WatchField {
public static final ParseField CONDITION = new ParseField("condition");
public static final ParseField ACTIONS = new ParseField("actions");
public static final ParseField TRANSFORM = new ParseField("transform");
public static final ParseField FOREACH = new ParseField("foreach");
public static final ParseField THROTTLE_PERIOD = new ParseField("throttle_period_in_millis");
public static final ParseField THROTTLE_PERIOD_HUMAN = new ParseField("throttle_period");
public static final ParseField METADATA = new ParseField("metadata");
Expand Down
7 changes: 7 additions & 0 deletions x-pack/plugin/core/src/main/resources/watch-history.json
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,13 @@
"reason" : {
"type" : "keyword"
},
"number_of_actions_executed": {
"type": "integer"
},
"foreach" : {
"type": "object",
"enabled" : false
},
"email": {
"type": "object",
"dynamic": true,
Expand Down
Loading

0 comments on commit e63fd53

Please sign in to comment.