Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Watcher: Allow to execute actions for each element in array #41997

Merged
merged 24 commits into from
Jul 3, 2019

Conversation

spinscale
Copy link
Contributor

@spinscale spinscale commented May 9, 2019

This adds the ability to execute an action for each element that occurs
in an array, for example you could sent a dedicated slack action for
each search hit returned from a search.

Relates #34546

Reviewers notes: My intention is to get this into master and 7.x branches, please veto if you disagree. Also, please take a special look at the watch history and that no additional fields are created unintentionally in the history mapping.

This adds the ability to execute an action for each element that occurs
in an array, for example you could sent a dedicated slack action for
each search hit returned from a search.

Relates elastic#34546
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-core-features

@jakelandis jakelandis self-requested a review May 10, 2019 16:15
@jakelandis
Copy link
Contributor

@spinscale - i am trying to test this and am running into the following error

[2019-05-21T13:14:04,586][ERROR][o.e.x.w.a.l.ExecutableLoggingAction] [node1] failed to execute action [log_event_watch/log_hits]
org.elasticsearch.ElasticsearchException: specified foreach object was not a an array/collection: [ctx.payload.hits.hits]
	at org.elasticsearch.xpack.core.watcher.actions.ActionWrapper.execute(ActionWrapper.java:183) [main/:?]
	at org.elasticsearch.xpack.watcher.execution.ExecutionService.executeInner(ExecutionService.java:513) [main/:?]
	at org.elasticsearch.xpack.watcher.execution.ExecutionService.execute(ExecutionService.java:307) [main/:?]
	at org.elasticsearch.xpack.watcher.execution.ExecutionService.lambda$executeAsync$5(ExecutionService.java:407) [main/:?]
	at org.elasticsearch.xpack.watcher.execution.ExecutionService$WatchExecutionTask.run(ExecutionService.java:602) [main/:?]
	at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:687) [main/:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]

A slightly modified version of the doc example I am using to test:

DELETE .watcher-history-*
DELETE _watcher/watch/log_event_watch
DELETE log-events
PUT _ingest/pipeline/now
{
  "processors": [
    {
      "set": {
        "field": "timestamp",
        "value": "{{_ingest.timestamp}}"
      }
    }
  ]
}
POST log-events/_doc?pipeline=now
{
  "status" : "error"
}
GET log-events/_search
{
  "query": {
    "match": {
      "status": "error"
    }
  }
}
PUT _watcher/watch/log_event_watch
{
  "trigger" : {
    "schedule" : { "interval" : "15s" }
  },
  "input" : {
    "search" : {
      "request" : {
        "indices" : "log-events",
        "body" : {
          "query" : { "match" : { "status" : "error" } }
        }
      }
    }
  },
  "condition" : {
    "compare" : { "ctx.payload.hits.total" : { "gt" : 0 } }
  },
  "actions" : {
    "log_hits" : {
      "foreach" : "ctx.payload.hits.hits", 
      "logging" : {
        "text" : "Found error in document {{_id}} with timestamp {{_source.timestamp}}"
      }
    }
  }
}

GET .watcher-history-*/_search?sort=trigger_event.triggered_time:desc&size=1

@spinscale
Copy link
Contributor Author

the documentation stated something different than the implementation by using ctx.payload.foo.bar in the path instead of foo.bar. I fixed the implementation as we use ctx.payload everywhere else as well.

@spinscale
Copy link
Contributor Author

@elasticmachine run test elasticsearch-ci/1

@jakelandis
Copy link
Contributor

I pulled the latest and the error is now gone and executes the action for each hit. However, (using the example above), It is not reading the template field correctly.

e.g. Found error in document {{_id}} with timestamp {{_source.timestamp}}

results in :

[2019-05-28T10:55:33,071][INFO ][o.e.x.w.a.l.ExecutableLoggingAction] [node1] Found error in document  with timestamp 
[2019-05-28T10:55:33,071][INFO ][o.e.x.w.a.l.ExecutableLoggingAction] [node1] Found error in document  with timestamp 
[2019-05-28T10:55:33,071][INFO ][o.e.x.w.a.l.ExecutableLoggingAction] [node1] Found error in document  with timestamp 
[2019-05-28T10:55:33,071][INFO ][o.e.x.w.a.l.ExecutableLoggingAction] [node1] Found error in document  with timestamp 
[2019-05-28T10:55:33,071][INFO ][o.e.x.w.a.l.ExecutableLoggingAction] [node1] Found error in document  with timestamp 
[2019-05-28T10:55:33,072][INFO ][o.e.x.w.a.l.ExecutableLoggingAction] [node1] Found error in document  with timestamp 
[2019-05-28T10:55:33,072][INFO ][o.e.x.w.a.l.ExecutableLoggingAction] [node1] Found error in document  with timestamp 
[2019-05-28T10:55:33,072][INFO ][o.e.x.w.a.l.ExecutableLoggingAction] [node1] Found error in document  with timestamp 

@spinscale
Copy link
Contributor Author

indeed. this slipped through the tests as it requires mustache for testing. I'll investigate that one and ping you once I found the issue

@spinscale
Copy link
Contributor Author

I fixed the notation in the docs how to access fields.

You still need to specify ctx.payload and then the custom field, like

"text" : "Found error in document {{ctx.payload._id}} with timestamp {{ctx.payload._source.timestamp}}"

the reason for this, as each action creates the ctx model by itself when executing, so we cannot easily drop that.

Copy link
Contributor

@jakelandis jakelandis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good... a couple small points

x-pack/docs/en/watcher/actions.asciidoc Outdated Show resolved Hide resolved
x-pack/docs/en/watcher/actions.asciidoc Outdated Show resolved Hide resolved
if (o instanceof Map) {
results.add(action.execute(id, ctx, new Payload.Simple((Map<String, Object>) o)));
} else {
throw new ElasticsearchException("item in foreach [{}] object was not a map", path);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How difficult is it to remove the requirement of only an array of maps ? I think an array of arrays, or an array of values would be useful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to also be able to pass any object, which then is available in ctx.payload._value

List<Action.Result> results = new ArrayList<>();
Object object = ObjectPath.eval(path, toMap(ctx));
if (object instanceof Collection) {
Collection collection = Collection.class.cast(object);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add an upper limit here ? I am fine with a hard coded 100 size (or something like that) just so that we don't allow this to be a DOS vector.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set it hundred, also included the number of executed actions in the JSON that gets written to watch history

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100 seems quite arbitrary from my perspective. Any chance to make this an option?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please open a new issue to make this configurable plus the rationale. The main goal here was to not let the watch run extremely long and block other watch executions, as well as its own executions.

Thank you!

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed as #45169

Copy link
Contributor

@jakelandis jakelandis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM (assuming CI agrees)

Thanks for putting this together, i suspect this will get quite a bit usage. May want to consider adding this a highlight feature for the minor release this lands ?

@spinscale spinscale merged commit e63fd53 into elastic:master Jul 3, 2019
spinscale added a commit that referenced this pull request Jul 3, 2019
This adds the ability to execute an action for each element that occurs
in an array, for example you could sent a dedicated slack action for
each search hit returned from a search.

There is also a limit for the number of actions executed, which is
hardcoded to 100 right now, to prevent having watches run forever.

The watch history logs each action result and the total number of actions
the were executed.

Relates #34546
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants