Skip to content

Commit

Permalink
EQL: Introduce sequencing fetch size (#59063)
Browse files Browse the repository at this point in the history
The current internal sequence algorithm relies on fetching multiple results and then paginating through the dataset. Depending on the dataset and memory, setting a larger page size can yield better performance at the expense of memory.
This PR makes this behavior explicit by decoupling the fetch size from size, the maximum number of results desired.
As such, use in testing a minimum fetch size which exposed a number of bugs:

Jumping across data across queries causing valid data to be seen as a gap.
Incorrectly resuming searching across pages (again causing data to be discarded).
which have been addressed.

(cherry picked from commit 2f389a7)
  • Loading branch information
costin committed Jul 6, 2020
1 parent b2e9c6f commit f9c15d0
Show file tree
Hide file tree
Showing 23 changed files with 277 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public class EqlSearchRequest implements Validatable, ToXContentObject {
private String implicitJoinKeyField = "agent.id";
private boolean isCaseSensitive = true;

private int fetchSize = 50;
private int size = 10;
private int fetchSize = 1000;
private SearchAfterBuilder searchAfterBuilder;
private String query;
private String tiebreakerField;
Expand All @@ -60,6 +61,7 @@ public class EqlSearchRequest implements Validatable, ToXContentObject {
static final String KEY_IMPLICIT_JOIN_KEY_FIELD = "implicit_join_key_field";
static final String KEY_CASE_SENSITIVE = "case_sensitive";
static final String KEY_SIZE = "size";
static final String KEY_FETCH_SIZE = "fetch_size";
static final String KEY_SEARCH_AFTER = "search_after";
static final String KEY_QUERY = "query";
static final String KEY_WAIT_FOR_COMPLETION_TIMEOUT = "wait_for_completion_timeout";
Expand All @@ -85,7 +87,8 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par
if (implicitJoinKeyField != null) {
builder.field(KEY_IMPLICIT_JOIN_KEY_FIELD, implicitJoinKeyField());
}
builder.field(KEY_SIZE, fetchSize());
builder.field(KEY_SIZE, size());
builder.field(KEY_FETCH_SIZE, fetchSize());

if (searchAfterBuilder != null) {
builder.array(KEY_SEARCH_AFTER, searchAfterBuilder.getSortValues());
Expand Down Expand Up @@ -172,14 +175,26 @@ public EqlSearchRequest implicitJoinKeyField(String implicitJoinKeyField) {
return this;
}

public int size() {
return this.size;
}

public EqlSearchRequest size(int size) {
this.size = size;
if (fetchSize <= 0) {
throw new IllegalArgumentException("size must be greater than 0");
}
return this;
}

public int fetchSize() {
return this.fetchSize;
}

public EqlSearchRequest fetchSize(int size) {
this.fetchSize = size;
if (fetchSize <= 0) {
throw new IllegalArgumentException("size must be greater than 0");
if (fetchSize < 2) {
throw new IllegalArgumentException("fetch size must be greater than 1");
}
return this;
}
Expand Down Expand Up @@ -246,7 +261,8 @@ public boolean equals(Object o) {
return false;
}
EqlSearchRequest that = (EqlSearchRequest) o;
return fetchSize == that.fetchSize &&
return size == that.size &&
fetchSize == that.fetchSize &&
Arrays.equals(indices, that.indices) &&
Objects.equals(indicesOptions, that.indicesOptions) &&
Objects.equals(filter, that.filter) &&
Expand All @@ -268,6 +284,7 @@ public int hashCode() {
Arrays.hashCode(indices),
indicesOptions,
filter,
size,
fetchSize,
timestampField,
tiebreakerField,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private void assertResponse(EqlSearchResponse response, int count) {

public void testBasicSearch() throws Exception {
EqlClient eql = highLevelClient().eql();
EqlSearchRequest request = new EqlSearchRequest("index", "process where true");
EqlSearchRequest request = new EqlSearchRequest("index", "process where true").size(RECORD_COUNT);
assertResponse(execute(request, eql::search, eql::searchAsync), RECORD_COUNT);
}

Expand All @@ -115,7 +115,7 @@ public void testSimpleConditionSearch() throws Exception {
EqlSearchRequest request = new EqlSearchRequest("index", "foo where pid > 0");

// test with non-default event.category mapping
request.eventCategoryField("event_type");
request.eventCategoryField("event_type").size(RECORD_COUNT);

EqlSearchResponse response = execute(request, eql::search, eql::searchAsync);
assertResponse(response, RECORD_COUNT / DIVIDER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ protected EqlSearchResponse runQuery(String index, String query, boolean isCaseS
EqlSearchRequest request = new EqlSearchRequest(testIndexName, query);
request.isCaseSensitive(isCaseSensitive);
request.tiebreakerField("event.sequence");
// some queries return more than 10 results
request.size(50);
request.fetchSize(2);
return eqlClient().search(request, RequestOptions.DEFAULT);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.function.Supplier;

import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.xpack.eql.action.RequestDefaults.FETCH_SIZE;
import static org.elasticsearch.xpack.eql.action.RequestDefaults.FIELD_EVENT_CATEGORY;
import static org.elasticsearch.xpack.eql.action.RequestDefaults.FIELD_IMPLICIT_JOIN_KEY;
import static org.elasticsearch.xpack.eql.action.RequestDefaults.FIELD_TIMESTAMP;
Expand All @@ -51,7 +50,8 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
private String tiebreakerField = null;
private String eventCategoryField = FIELD_EVENT_CATEGORY;
private String implicitJoinKeyField = FIELD_IMPLICIT_JOIN_KEY;
private int fetchSize = FETCH_SIZE;
private int size = RequestDefaults.SIZE;
private int fetchSize = RequestDefaults.FETCH_SIZE;
private SearchAfterBuilder searchAfterBuilder;
private String query;
private boolean isCaseSensitive = false;
Expand All @@ -67,6 +67,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
static final String KEY_EVENT_CATEGORY_FIELD = "event_category_field";
static final String KEY_IMPLICIT_JOIN_KEY_FIELD = "implicit_join_key_field";
static final String KEY_SIZE = "size";
static final String KEY_FETCH_SIZE = "fetch_size";
static final String KEY_SEARCH_AFTER = "search_after";
static final String KEY_QUERY = "query";
static final String KEY_WAIT_FOR_COMPLETION_TIMEOUT = "wait_for_completion_timeout";
Expand All @@ -80,6 +81,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
static final ParseField EVENT_CATEGORY_FIELD = new ParseField(KEY_EVENT_CATEGORY_FIELD);
static final ParseField IMPLICIT_JOIN_KEY_FIELD = new ParseField(KEY_IMPLICIT_JOIN_KEY_FIELD);
static final ParseField SIZE = new ParseField(KEY_SIZE);
static final ParseField FETCH_SIZE = new ParseField(KEY_FETCH_SIZE);
static final ParseField SEARCH_AFTER = new ParseField(KEY_SEARCH_AFTER);
static final ParseField QUERY = new ParseField(KEY_QUERY);
static final ParseField WAIT_FOR_COMPLETION_TIMEOUT = new ParseField(KEY_WAIT_FOR_COMPLETION_TIMEOUT);
Expand All @@ -102,6 +104,7 @@ public EqlSearchRequest(StreamInput in) throws IOException {
tiebreakerField = in.readOptionalString();
eventCategoryField = in.readString();
implicitJoinKeyField = in.readString();
size = in.readVInt();
fetchSize = in.readVInt();
searchAfterBuilder = in.readOptionalWriteable(SearchAfterBuilder::new);
query = in.readString();
Expand Down Expand Up @@ -148,10 +151,14 @@ public ActionRequestValidationException validate() {
validationException = addValidationError("implicit join key field is null or empty", validationException);
}

if (fetchSize <= 0) {
if (size <= 0) {
validationException = addValidationError("size must be greater than 0", validationException);
}

if (fetchSize < 2) {
validationException = addValidationError("fetch size must be greater than 1", validationException);
}

if (keepAlive != null && keepAlive.getMillis() < MIN_KEEP_ALIVE) {
validationException =
addValidationError("[keep_alive] must be greater than 1 minute, got:" + keepAlive.toString(), validationException);
Expand All @@ -173,7 +180,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (implicitJoinKeyField != null) {
builder.field(KEY_IMPLICIT_JOIN_KEY_FIELD, implicitJoinKeyField());
}
builder.field(KEY_SIZE, fetchSize());
builder.field(KEY_SIZE, size());
builder.field(KEY_FETCH_SIZE, fetchSize());

if (searchAfterBuilder != null) {
builder.array(SEARCH_AFTER.getPreferredName(), searchAfterBuilder.getSortValues());
Expand Down Expand Up @@ -204,7 +212,8 @@ protected static <R extends EqlSearchRequest> ObjectParser<R, Void> objectParser
parser.declareString(EqlSearchRequest::tiebreakerField, TIEBREAKER_FIELD);
parser.declareString(EqlSearchRequest::eventCategoryField, EVENT_CATEGORY_FIELD);
parser.declareString(EqlSearchRequest::implicitJoinKeyField, IMPLICIT_JOIN_KEY_FIELD);
parser.declareInt(EqlSearchRequest::fetchSize, SIZE);
parser.declareInt(EqlSearchRequest::size, SIZE);
parser.declareInt(EqlSearchRequest::fetchSize, FETCH_SIZE);
parser.declareField(EqlSearchRequest::setSearchAfter, SearchAfterBuilder::fromXContent, SEARCH_AFTER,
ObjectParser.ValueType.OBJECT_ARRAY);
parser.declareString(EqlSearchRequest::query, QUERY);
Expand Down Expand Up @@ -259,10 +268,21 @@ public EqlSearchRequest implicitJoinKeyField(String implicitJoinKeyField) {
return this;
}

public int fetchSize() { return this.fetchSize; }
public int size() {
return this.size;
}

public EqlSearchRequest size(int size) {
this.size = size;
return this;
}

public int fetchSize() {
return this.fetchSize;
}

public EqlSearchRequest fetchSize(int size) {
this.fetchSize = size;
public EqlSearchRequest fetchSize(int fetchSize) {
this.fetchSize = fetchSize;
return this;
}

Expand Down Expand Up @@ -334,6 +354,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(tiebreakerField);
out.writeString(eventCategoryField);
out.writeString(implicitJoinKeyField);
out.writeVInt(size);
out.writeVInt(fetchSize);
out.writeOptionalWriteable(searchAfterBuilder);
out.writeString(query);
Expand All @@ -354,7 +375,8 @@ public boolean equals(Object o) {
return false;
}
EqlSearchRequest that = (EqlSearchRequest) o;
return fetchSize == that.fetchSize &&
return size == that.size &&
fetchSize == that.fetchSize &&
Arrays.equals(indices, that.indices) &&
Objects.equals(indicesOptions, that.indicesOptions) &&
Objects.equals(filter, that.filter) &&
Expand All @@ -375,6 +397,7 @@ public int hashCode() {
Arrays.hashCode(indices),
indicesOptions,
filter,
size,
fetchSize,
timestampField,
tiebreakerField,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,13 @@ public EqlSearchRequestBuilder implicitJoinKeyField(String implicitJoinKeyField)
return this;
}

public EqlSearchRequestBuilder fetchSize(int size) {
request.fetchSize(size);
public EqlSearchRequestBuilder size(int size) {
request.size(size);
return this;
}

public EqlSearchRequestBuilder fetchSize(int fetchSize) {
request.fetchSize(fetchSize);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ private RequestDefaults() {}
public static final String FIELD_EVENT_CATEGORY = "event.category";
public static final String FIELD_IMPLICIT_JOIN_KEY = "agent.id";

public static int FETCH_SIZE = 10;
public static int SIZE = 10;
public static int FETCH_SIZE = 1000;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,25 @@
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
import static org.elasticsearch.index.query.QueryBuilders.rangeQuery;

/**
* Ranged or boxed query. Provides a beginning or end to the current query.
* The query moves between them through search_after.
*
* Note that the range is not set at once on purpose since each query tends to have
* its own number of results separate from the others.
* As such, each query starts where it lefts to reach the current in-progress window
* as oppose to always operating with the exact same window.
*/
public class BoxedQueryRequest implements QueryRequest {

private final RangeQueryBuilder timestampRange;
private final RangeQueryBuilder tiebreakerRange;

private final SearchSourceBuilder searchSource;

private Ordinal from, to;
private Ordinal after;

public BoxedQueryRequest(QueryRequest original, String timestamp, String tiebreaker) {
searchSource = original.searchSource();

Expand All @@ -44,28 +56,50 @@ public SearchSourceBuilder searchSource() {
}

@Override
public void next(Ordinal ordinal) {
// reset existing constraints
timestampRange.gte(null).lte(null);
if (tiebreakerRange != null) {
tiebreakerRange.gte(null).lte(null);
}
public void nextAfter(Ordinal ordinal) {
after = ordinal;
// and leave only search_after
searchSource.searchAfter(ordinal.toArray());
}

public BoxedQueryRequest between(Ordinal begin, Ordinal end) {
timestampRange.gte(begin.timestamp()).lte(end.timestamp());

/**
* Sets the lower boundary for the query (non-inclusive).
* Can be removed (when the query in unbounded) through null.
*/
public BoxedQueryRequest from(Ordinal begin) {
from = begin;
if (tiebreakerRange != null) {
tiebreakerRange.gte(begin.tiebreaker()).lte(end.tiebreaker());
timestampRange.gte(begin != null ? begin.timestamp() : null);
tiebreakerRange.gt(begin != null ? begin.tiebreaker() : null);
} else {
timestampRange.gt(begin != null ? begin.timestamp() : null);
}
return this;
}

public Ordinal from() {
return from;
}

/**
* Sets the upper boundary for the query (inclusive).
* Can be removed (when the query in unbounded) through null.
*/
public BoxedQueryRequest to(Ordinal end) {
to = end;
timestampRange.lte(end != null ? end.timestamp() : null);
if (tiebreakerRange != null) {
tiebreakerRange.lte(end != null ? end.tiebreaker() : null);
}
return this;
}

@Override
public String toString() {
return searchSource.toString();
return "( " + string(from) + " >-" + string(after) + "-> " + string(to) + "]";
}

private static String string(Ordinal o) {
return o != null ? o.toString() : "<none>";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,9 @@ public Ordinal ordinal(SearchHit hit) {
}
return new Ordinal(timestamp, tbreaker);
}

@Override
public String toString() {
return "[" + stage + "][" + reverse + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,11 @@ public Executable assemble(List<List<Attribute>> listOfKeys,
if (query instanceof EsQueryExec) {
QueryRequest original = ((EsQueryExec) query).queryRequest(session);

// increase the request size based on the fetch size (since size is applied already through limit)

BoxedQueryRequest boxedRequest = new BoxedQueryRequest(original, timestampName, tiebreakerName);
Criterion<BoxedQueryRequest> criterion =
new Criterion<>(i, boxedRequest, keyExtractors, tsExtractor, tbExtractor, i> 0 && descending);
new Criterion<>(i, boxedRequest, keyExtractors, tsExtractor, tbExtractor, i > 0 && descending);
criteria.add(criterion);
} else {
// until
Expand Down

0 comments on commit f9c15d0

Please sign in to comment.