Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EQL: Add option for returning results from the tail of the stream #64869

Merged
merged 6 commits into from
Nov 14, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class EqlSearchRequest implements Validatable, ToXContentObject {
private QueryBuilder filter = null;
private String timestampField = "@timestamp";
private String eventCategoryField = "event.category";
private String resultPosition = "head";
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've kept the existing default for now to not make this PR even bigger or complicate backporting to 7.10.
I'll follow-up with a separate PR for changing the values themselves.


private int size = 10;
private int fetchSize = 1000;
Expand All @@ -57,6 +58,7 @@ public class EqlSearchRequest implements Validatable, ToXContentObject {
static final String KEY_SIZE = "size";
static final String KEY_FETCH_SIZE = "fetch_size";
static final String KEY_QUERY = "query";
static final String KEY_RESULT_POSITION = "result_position";
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like this name; any suggestions? /cc @jrodewig.
I had variations with order but that's incorrect since the order of the results is always ascending, it's rather the position from the stream where the results are picked up.
However I don't think the name makes that obvious...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is about oldest or newest sequences first.
Maybe change this in a true/false option and call it newest_first with default true. Overriding the default means false, implicitly newest first == false == newest last == oldest first.

Copy link
Contributor

@jrodewig jrodewig Nov 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is tricky. +1 to Andrei's idea of converting this to a Boolean with a param name of latest_first or most_recent_first. That would address most of my concerns.

If we decide to keep this as an enum, here are some suggestions in order of preference:

  • head_or_tail
  • orientation with earliest and latest values
  • recency with earliest and latest values
  • result_end

I don't think there is a common phrase that encapsulates both the head/tail concepts. end may work but is probably a bit too vague to be useful here. The head/tail values make the purpose of the enum param much clearer. As these values are unlikely to change, I'd advocate for including them in the param name if we keep this as an enum.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most recent / orientation suggests a different order. When one does tail the results are still ordered ASC but one gets the last X results not the first X.
I ended up with head and tail since we already have those in the languages as pipes and it's actually what happens if you don't specify them. We could use something like default_pipe but then that one is too cryptic...

Copy link
Contributor

@jrodewig jrodewig Nov 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up with head and tail since we already have those in the languages as pipes and it's actually what happens if you don't specify them.

That makes sense. Rather than forcing another term, it may be simpler and more intuitive to lean into that language:

  • head_or_tail with head and tail enum values
  • A head true/false option with a default of false OR a tail true/false option with a default of true

It seems like the other options are too vague or confusing.

Copy link
Contributor

@astefan astefan Nov 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forcing "head" or "tail" in the name of the parameter will implicitly assume the user knows about the purpose of head or tail and I like it. So, theoretically, there shouldn't be a concern about some implied ordering the user would assume. In Elasticsearch there are some settings that have technical terms in them and, unless one reads the documentation and knows something more about Elasticsearch, that setting will not make sense. And I think the events selection window (from start or end of the time stream) can be categorized as a more advanced feature that needs a bit more knowledge about EQL. One has to understand which events are selected and that the order in which these events are returned is always ascending.

Also, since we assume the user knows about "head" and "tail" we can push this a bit more and add "default" and "pipe" to the name of the parameter and make it boolean: "default_head_pipe": true to be more explicit. Using default_pipe is too abstract and it doesn't suggest a limited list of pipes to choose from, whereas default_head_pipe refers explicitly to head and its negation (a value of false) implies tail.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Booleans are more akin to on/off switches. default_head_pipe false simply means there's no default head pipe not that there is a tail one instead.

static final String KEY_WAIT_FOR_COMPLETION_TIMEOUT = "wait_for_completion_timeout";
static final String KEY_KEEP_ALIVE = "keep_alive";
static final String KEY_KEEP_ON_COMPLETION = "keep_on_completion";
Expand All @@ -79,6 +81,7 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par
builder.field(KEY_EVENT_CATEGORY_FIELD, eventCategoryField());
builder.field(KEY_SIZE, size());
builder.field(KEY_FETCH_SIZE, fetchSize());
builder.field(KEY_RESULT_POSITION, resultPosition());

builder.field(KEY_QUERY, query);
if (waitForCompletionTimeout != null) {
Expand Down Expand Up @@ -140,6 +143,19 @@ public EqlSearchRequest eventCategoryField(String eventCategoryField) {
return this;
}

public String resultPosition() {
return resultPosition;
}

public EqlSearchRequest resultPosition(String position) {
if ("head".equals(position) || "tail".equals(position)) {
resultPosition = position;
} else {
throw new IllegalArgumentException("result position needs to be 'head' or 'tail', received '" + position + "'");
}
return this;
}

public int size() {
return this.size;
}
Expand Down Expand Up @@ -211,6 +227,7 @@ public boolean equals(Object o) {
EqlSearchRequest that = (EqlSearchRequest) o;
return size == that.size &&
fetchSize == that.fetchSize &&
resultPosition == that.resultPosition &&
Arrays.equals(indices, that.indices) &&
Objects.equals(indicesOptions, that.indicesOptions) &&
Objects.equals(filter, that.filter) &&
Expand All @@ -237,6 +254,7 @@ public int hashCode() {
tiebreakerField,
eventCategoryField,
query,
resultPosition,
waitForCompletionTimeout,
keepAlive,
keepOnCompletion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package org.elasticsearch.test.eql;

import org.apache.http.HttpHost;
import org.apache.http.client.config.RequestConfig;
import org.elasticsearch.client.EqlClient;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
Expand Down Expand Up @@ -118,11 +119,19 @@ protected EqlSearchResponse runQuery(String index, String query) throws Exceptio
// some queries return more than 10 results
request.size(50);
request.fetchSize(randomIntBetween(2, 50));
request.resultPosition(randomBoolean() ? "head" : "tail");
return runRequest(eqlClient(), request);
}

protected EqlSearchResponse runRequest(EqlClient eqlClient, EqlSearchRequest request) throws IOException {
return eqlClient.search(request, RequestOptions.DEFAULT);
int timeout = Math.toIntExact(timeout().millis());

RequestConfig config = RequestConfig.copy(RequestConfig.DEFAULT)
.setConnectionRequestTimeout(timeout)
.setConnectTimeout(timeout)
.setSocketTimeout(timeout)
.build();
return eqlClient.search(request, RequestOptions.DEFAULT.toBuilder().setRequestConfig(config).build());
Comment on lines 126 to +134
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a timeout setting (which is not straight-forward) to the base test.

}

protected EqlClient eqlClient() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,8 @@ sequence by unique_pid
[any where true]
[any where serial_event_id < 72]
'''
expected_event_ids = [54, 55, 59,
expected_event_ids = [
54, 55, 59,
55, 59, 61,
59, 61, 65,
16, 60, 66,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
private int size = RequestDefaults.SIZE;
private int fetchSize = RequestDefaults.FETCH_SIZE;
private String query;
private String resultPosition = "head";

// Async settings
private TimeValue waitForCompletionTimeout = null;
Expand All @@ -65,6 +66,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
static final String KEY_WAIT_FOR_COMPLETION_TIMEOUT = "wait_for_completion_timeout";
static final String KEY_KEEP_ALIVE = "keep_alive";
static final String KEY_KEEP_ON_COMPLETION = "keep_on_completion";
static final String KEY_RESULT_POSITION = "result_position";

static final ParseField FILTER = new ParseField(KEY_FILTER);
static final ParseField TIMESTAMP_FIELD = new ParseField(KEY_TIMESTAMP_FIELD);
Expand All @@ -76,6 +78,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
static final ParseField WAIT_FOR_COMPLETION_TIMEOUT = new ParseField(KEY_WAIT_FOR_COMPLETION_TIMEOUT);
static final ParseField KEEP_ALIVE = new ParseField(KEY_KEEP_ALIVE);
static final ParseField KEEP_ON_COMPLETION = new ParseField(KEY_KEEP_ON_COMPLETION);
static final ParseField RESULT_POSITION = new ParseField(KEY_RESULT_POSITION);

private static final ObjectParser<EqlSearchRequest, Void> PARSER = objectParser(EqlSearchRequest::new);

Expand All @@ -99,6 +102,9 @@ public EqlSearchRequest(StreamInput in) throws IOException {
this.keepAlive = in.readOptionalTimeValue();
this.keepOnCompletion = in.readBoolean();
}
if (in.getVersion().onOrAfter(Version.V_7_10_0)) {
resultPosition = in.readString();
}
}

@Override
Expand Down Expand Up @@ -168,6 +174,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(KEY_KEEP_ALIVE, keepAlive);
}
builder.field(KEY_KEEP_ON_COMPLETION, keepOnCompletion);
builder.field(KEY_RESULT_POSITION, resultPosition);

return builder;
}
Expand All @@ -192,6 +199,7 @@ protected static <R extends EqlSearchRequest> ObjectParser<R, Void> objectParser
parser.declareField(EqlSearchRequest::keepAlive,
(p, c) -> TimeValue.parseTimeValue(p.text(), KEY_KEEP_ALIVE), KEEP_ALIVE, ObjectParser.ValueType.VALUE);
parser.declareBoolean(EqlSearchRequest::keepOnCompletion, KEEP_ON_COMPLETION);
parser.declareString(EqlSearchRequest::resultPosition, RESULT_POSITION);
return parser;
}

Expand Down Expand Up @@ -281,6 +289,19 @@ public EqlSearchRequest keepOnCompletion(boolean keepOnCompletion) {
return this;
}

public String resultPosition() {
return resultPosition;
}

public EqlSearchRequest resultPosition(String position) {
if ("head".equals(position) || "tail".equals(position)) {
resultPosition = position;
} else {
throw new IllegalArgumentException("result position needs to be 'head' or 'tail', received '" + position + "'");
}
return this;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand All @@ -298,6 +319,10 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalTimeValue(keepAlive);
out.writeBoolean(keepOnCompletion);
}

if (out.getVersion().onOrAfter(Version.V_7_10_0)) { // TODO: Remove after backport
out.writeString(resultPosition);
}
}

@Override
Expand All @@ -321,7 +346,8 @@ public boolean equals(Object o) {
Objects.equals(eventCategoryField, that.eventCategoryField) &&
Objects.equals(query, that.query) &&
Objects.equals(waitForCompletionTimeout, that.waitForCompletionTimeout) &&
Objects.equals(keepAlive, that.keepAlive);
Objects.equals(keepAlive, that.keepAlive) &&
Objects.equals(resultPosition, that.resultPosition);
}


Expand All @@ -338,7 +364,8 @@ public int hashCode() {
eventCategoryField,
query,
waitForCompletionTimeout,
keepAlive);
keepAlive,
resultPosition);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
public class BoxedQueryRequest implements QueryRequest {

private final RangeQueryBuilder timestampRange;

private final SearchSourceBuilder searchSource;

private Ordinal from, to;
Expand Down Expand Up @@ -61,6 +60,16 @@ public BoxedQueryRequest from(Ordinal begin) {
return this;
}

/**
* Sets the upper boundary for the query (inclusive).
* Can be removed through null.
*/
public BoxedQueryRequest to(Ordinal end) {
to = end;
timestampRange.lte(end != null ? end.timestamp() : null);
return this;
}

public Ordinal after() {
return after;
}
Expand All @@ -69,13 +78,8 @@ public Ordinal from() {
return from;
}

/**
* Sets the upper boundary for the query (inclusive).
*/
public BoxedQueryRequest to(Ordinal end) {
to = end;
timestampRange.lte(end != null ? end.timestamp() : null);
return this;
public Ordinal to() {
return to;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,35 +23,39 @@ public class Criterion<Q extends QueryRequest> {
private final HitExtractor timestamp;
private final HitExtractor tiebreaker;

private final boolean reverse;
private final boolean descending;

Criterion(int stage,
Q queryRequest,
List<HitExtractor> keys,
HitExtractor timestamp,
HitExtractor tiebreaker,
boolean reverse) {
boolean descending) {
this.stage = stage;
this.queryRequest = queryRequest;
this.keys = keys;
this.timestamp = timestamp;
this.tiebreaker = tiebreaker;

this.reverse = reverse;
this.descending = descending;
}

public int stage() {
return stage;
}

public boolean reverse() {
return reverse;
public boolean descending() {
return descending;
}

public Q queryRequest() {
return queryRequest;
}

public int keySize() {
return keys.size();
}

public SequenceKey key(SearchHit hit) {
SequenceKey key;
if (keys.isEmpty()) {
Expand Down Expand Up @@ -89,6 +93,6 @@ public Ordinal ordinal(SearchHit hit) {

@Override
public String toString() {
return "[" + stage + "][" + reverse + "]";
return "[" + stage + "][" + descending + "]";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public Executable assemble(List<List<Attribute>> listOfKeys,
HitExtractor tbExtractor = Expressions.isPresent(tiebreaker) ? hitExtractor(tiebreaker, extractorRegistry) : null;
// NB: since there's no aliasing inside EQL, the attribute name is the same as the underlying field name
String timestampName = Expressions.name(timestamp);
String tiebreakerName = Expressions.isPresent(tiebreaker) ? Expressions.name(tiebreaker) : null;

// secondary criteria
List<Criterion<BoxedQueryRequest>> criteria = new ArrayList<>(plans.size() - 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
import java.util.Map;
import java.util.Map.Entry;

/** Dedicated collection for mapping a key to a list of sequences */
/** The list represents the sequence for each stage (based on its index) and is fixed in size */

/**
* Dedicated collection for mapping a key to a list of sequences
* The list represents the sequence for each stage (based on its index) and is fixed in size
*/
class KeyToSequences {

private final int listSize;
Expand Down Expand Up @@ -52,24 +53,6 @@ void add(int stage, Sequence sequence) {
groups[stage].add(sequence);
}

void resetGroupInsertPosition() {
for (SequenceGroup[] groups : keyToSequences.values()) {
for (SequenceGroup group : groups) {
if (group != null) {
group.resetInsertPosition();
}
}
}
}

void resetUntilInsertPosition() {
for (UntilGroup until : keyToUntil.values()) {
if (until != null) {
until.resetInsertPosition();
}
}
}

void until(Iterable<KeyAndOrdinal> until) {
for (KeyAndOrdinal keyAndOrdinal : until) {
// ignore unknown keys
Expand Down Expand Up @@ -116,17 +99,26 @@ void dropUntil() {
keyToUntil.clear();
}

/**
* Remove all matches expect the latest.
*/
void trimToTail() {
for (SequenceGroup[] groups : keyToSequences.values()) {
for (SequenceGroup group : groups) {
if (group != null) {
group.trimToLast();
}
}
}
}

public void clear() {
keyToSequences.clear();
keyToUntil.clear();
}

int numberOfKeys() {
return keyToSequences.size();
}

@Override
public String toString() {
return LoggerMessageFormat.format(null, "Keys=[{}], Until=[{}]", keyToSequences.size(), keyToUntil.size());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import java.util.Objects;

/**
* A match within a sequence, holding the result and occurrance time.
* A match within a sequence, holding the result and occurrence time.
*/
class Match {

Expand Down