Skip to content

Commit

Permalink
EQL: Avoid filtering on tiebreakers (#63415)
Browse files Browse the repository at this point in the history
Do not filter by tiebreaker while searching sequence matches as
it's not monotonic and thus can filter out valid data.
Add handling for data 'near' the boundary that has the same timestamp
but different tie-breaker and thus can be just outside the window.

Fix #62781
Relates #63215

(cherry picked from commit 36f8346)
(cherry picked from commit 72a2ce8)
  • Loading branch information
costin committed Oct 8, 2020
1 parent 1f7b107 commit 2ab5f22
Show file tree
Hide file tree
Showing 13 changed files with 206 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ query = '''
[ ERROR where true ]
[ STAT where true ]
'''
expected_event_ids = [1,2,3]
expected_event_ids = [1,2,3,1,2,3]

[[queries]]
name = "basicWithFilter"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@
package org.elasticsearch.xpack.eql;

import org.elasticsearch.test.eql.EqlExtraSpecTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;

@TestLogging(value = "org.elasticsearch.xpack.eql:TRACE", reason = "results logging")
public class EqlExtraIT extends EqlExtraSpecTestCase {

public EqlExtraIT(String query, String name, long[] eventIds) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@

package org.elasticsearch.xpack.eql.execution.assembler;

import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.eql.execution.search.Ordinal;
import org.elasticsearch.xpack.eql.execution.search.QueryRequest;
import org.elasticsearch.xpack.eql.execution.search.RuntimeUtils;

import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
import static org.elasticsearch.index.query.QueryBuilders.rangeQuery;

/**
Expand All @@ -21,36 +20,23 @@
*
* Note that the range is not set at once on purpose since each query tends to have
* its own number of results separate from the others.
* As such, each query starts where it lefts to reach the current in-progress window
* As such, each query starts from where it left off to reach the current in-progress window
* as oppose to always operating with the exact same window.
*/
public class BoxedQueryRequest implements QueryRequest {

private final RangeQueryBuilder timestampRange;
private final RangeQueryBuilder tiebreakerRange;

private final SearchSourceBuilder searchSource;

private Ordinal from, to;
private Ordinal after;

public BoxedQueryRequest(QueryRequest original, String timestamp, String tiebreaker) {
public BoxedQueryRequest(QueryRequest original, String timestamp) {
searchSource = original.searchSource();
// setup range queries and preserve their reference to simplify the update
timestampRange = rangeQuery(timestamp).timeZone("UTC").format("epoch_millis");
BoolQueryBuilder filter = boolQuery().filter(timestampRange);
if (tiebreaker != null) {
tiebreakerRange = rangeQuery(tiebreaker);
filter.filter(tiebreakerRange);
} else {
tiebreakerRange = null;
}

searchSource = original.searchSource();
// combine with existing query (if it exists)
if (searchSource.query() != null) {
filter = filter.must(searchSource.query());
}
searchSource.query(filter);
RuntimeUtils.addFilter(timestampRange, searchSource);
}

@Override
Expand All @@ -72,9 +58,6 @@ public void nextAfter(Ordinal ordinal) {
public BoxedQueryRequest from(Ordinal begin) {
from = begin;
timestampRange.gte(begin != null ? begin.timestamp() : null);
if (tiebreakerRange != null) {
tiebreakerRange.gte(begin != null ? begin.tiebreaker() : null);
}
return this;
}

Expand All @@ -88,14 +71,10 @@ public Ordinal from() {

/**
* Sets the upper boundary for the query (inclusive).
* Can be removed (when the query in unbounded) through null.
*/
public BoxedQueryRequest to(Ordinal end) {
to = end;
timestampRange.lte(end != null ? end.timestamp() : null);
if (tiebreakerRange != null) {
tiebreakerRange.lte(end != null ? end.tiebreaker() : null);
}
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public Executable assemble(List<List<Attribute>> listOfKeys,
if (query instanceof EsQueryExec) {
SearchSourceBuilder source = ((EsQueryExec) query).source(session);
QueryRequest original = () -> source;
BoxedQueryRequest boxedRequest = new BoxedQueryRequest(original, timestampName, tiebreakerName);
BoxedQueryRequest boxedRequest = new BoxedQueryRequest(original, timestampName);
Criterion<BoxedQueryRequest> criterion =
new Criterion<>(i, boxedRequest, keyExtractors, tsExtractor, tbExtractor, i == 0 && descending);
criteria.add(criterion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;

import static org.elasticsearch.index.query.QueryBuilders.idsQuery;
import static org.elasticsearch.xpack.eql.execution.search.RuntimeUtils.prepareRequest;
Expand All @@ -37,7 +38,7 @@ public class BasicQueryClient implements QueryClient {

private static final Logger log = RuntimeUtils.QUERY_LOG;

private final EqlConfiguration cfg;
final EqlConfiguration cfg;
final Client client;
final String[] indices;

Expand All @@ -53,22 +54,37 @@ public void query(QueryRequest request, ActionListener<SearchResponse> listener)
// set query timeout
searchSource.timeout(cfg.requestTimeout());

if (log.isTraceEnabled()) {
log.trace("About to execute query {} on {}", StringUtils.toString(searchSource), indices);
}
if (cfg.isCancelled()) {
throw new TaskCancelledException("cancelled");
}

SearchRequest search = prepareRequest(client, searchSource, false, indices);
search(search, new BasicListener(listener));
}

protected void search(SearchRequest search, ActionListener<SearchResponse> listener) {
if (cfg.isCancelled()) {
listener.onFailure(new TaskCancelledException("cancelled"));
return;
}

if (log.isTraceEnabled()) {
log.trace("About to execute query {} on {}", StringUtils.toString(search.source()), indices);
}

client.search(search, listener);
}

protected void search(MultiSearchRequest search, ActionListener<MultiSearchResponse> listener) {
if (cfg.isCancelled()) {
listener.onFailure(new TaskCancelledException("cancelled"));
return;
}

if (log.isTraceEnabled()) {
StringJoiner sj = new StringJoiner("\n");
for (SearchRequest request : search.requests()) {
sj.add(StringUtils.toString(request.source()));
}
log.trace("About to execute multi-queries {} on {}", sj, indices);
}

client.multiSearch(search, listener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public Ordinal(long timestamp, Comparable<Object> tiebreaker) {
this.timestamp = timestamp;
this.tiebreaker = tiebreaker;
}

public long timestamp() {
return timestamp;
}
Expand All @@ -36,11 +36,11 @@ public boolean equals(Object obj) {
if (this == obj) {
return true;
}

if (obj == null || getClass() != obj.getClass()) {
return false;
}

Ordinal other = (Ordinal) obj;
return Objects.equals(timestamp, other.timestamp)
&& Objects.equals(tiebreaker, other.tiebreaker);
Expand Down Expand Up @@ -81,7 +81,23 @@ public boolean between(Ordinal left, Ordinal right) {
return (compareTo(left) <= 0 && compareTo(right) >= 0) || (compareTo(right) <= 0 && compareTo(left) >= 0);
}

public boolean before(Ordinal other) {
return compareTo(other) < 0;
}

public boolean beforeOrAt(Ordinal other) {
return compareTo(other) <= 0;
}

public boolean after(Ordinal other) {
return compareTo(other) > 0;
}

public boolean afterOrAt(Ordinal other) {
return compareTo(other) >= 0;
}

public Object[] toArray() {
return tiebreaker != null ? new Object[] { timestamp, tiebreaker } : new Object[] { timestamp };
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.builder.PointInTimeBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction;
Expand All @@ -30,7 +28,6 @@
import java.util.function.Function;

import static org.elasticsearch.action.ActionListener.wrap;
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
import static org.elasticsearch.index.query.QueryBuilders.termsQuery;
import static org.elasticsearch.xpack.ql.util.ActionListeners.map;

Expand Down Expand Up @@ -105,12 +102,7 @@ private void makeRequestPITCompatible(SearchRequest request) {
String[] indices = request.indices();
if (CollectionUtils.isEmpty(indices) == false) {
request.indices(Strings.EMPTY_ARRAY);
BoolQueryBuilder indexFilter = boolQuery().filter(termsQuery(GetResult._INDEX, indices));
QueryBuilder query = source.query();
if (query != null) {
indexFilter.must(query);
}
source.query(indexFilter);
RuntimeUtils.addFilter(termsQuery(GetResult._INDEX, indices), source);
}
}

Expand All @@ -123,11 +115,11 @@ private <Response> ActionListener<Response> pitListener(Function<Response, Strin
},
// always close PIT in case of exceptions
e -> {
if (pitId != null) {
close(wrap(b -> {
}, listener::onFailure));
}
listener.onFailure(e);
if (pitId != null && cfg.isCancelled() == false) {
// ignore any success/failure to avoid obfuscating the response
close(wrap(b -> {}, ex -> {}));
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.builder.SearchSourceBuilder;
Expand All @@ -34,6 +36,8 @@
import java.util.List;
import java.util.Set;

import static org.elasticsearch.index.query.QueryBuilders.boolQuery;

public final class RuntimeUtils {

static final Logger QUERY_LOG = LogManager.getLogger(QueryClient.class);
Expand All @@ -50,10 +54,10 @@ static void logSearchResponse(SearchResponse response, Logger logger) {
aggsNames.append(aggs.get(i).getName() + (i + 1 == aggs.size() ? "" : ", "));
}

logger.trace("Got search response [hits {} {}, {} aggregations: [{}], {} failed shards, {} skipped shards, "
+ "{} successful shards, {} total shards, took {}, timed out [{}]]", response.getHits().getTotalHits().relation.toString(),
response.getHits().getTotalHits().value, aggs.size(), aggsNames, response.getFailedShards(), response.getSkippedShards(),
response.getSuccessfulShards(), response.getTotalShards(), response.getTook(), response.isTimedOut());
logger.trace("Got search response [hits {}, {} aggregations: [{}], {} failed shards, {} skipped shards, "
+ "{} successful shards, {} total shards, took {}, timed out [{}]]", response.getHits().getTotalHits(), aggs.size(),
aggsNames, response.getFailedShards(), response.getSkippedShards(), response.getSuccessfulShards(),
response.getTotalShards(), response.getTook(), response.isTimedOut());
}

public static List<HitExtractor> createExtractor(List<FieldExtraction> fields, EqlConfiguration cfg) {
Expand Down Expand Up @@ -111,4 +115,30 @@ public static SearchRequest prepareRequest(Client client,
public static List<SearchHit> searchHits(SearchResponse response) {
return Arrays.asList(response.getHits().getHits());
}

// optimized method that adds filter to existing bool queries without additional wrapping
// additionally checks whether the given query exists for safe decoration
public static SearchSourceBuilder addFilter(QueryBuilder filter, SearchSourceBuilder source) {
BoolQueryBuilder bool = null;
QueryBuilder query = source.query();

if (query instanceof BoolQueryBuilder) {
bool = (BoolQueryBuilder) query;
if (filter != null && bool.filter().contains(filter) == false) {
bool.filter(filter);
}
}
else {
bool = boolQuery();
if (query != null) {
bool.filter(query);
}
if (filter != null) {
bool.filter(filter);
}

source.query(bool);
}
return source;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ public static SearchSourceBuilder sourceBuilder(QueryContainer container, QueryB
}
}

optimize(container, source);

return source;
}

Expand All @@ -94,7 +96,7 @@ private static void sorting(QueryContainer container, SearchSourceBuilder source
sortBuilder = fieldSort(fa.name())
.missing(as.missing().position())
.unmappedType(fa.dataType().esType());

if (fa.isNested()) {
FieldSortBuilder fieldSort = fieldSort(fa.name())
.missing(as.missing().position())
Expand Down Expand Up @@ -134,8 +136,6 @@ private static void sorting(QueryContainer container, SearchSourceBuilder source
}

private static void optimize(QueryContainer query, SearchSourceBuilder builder) {
if (query.shouldTrackHits()) {
builder.trackTotalHits(true);
}
builder.trackTotalHits(query.shouldTrackHits());
}
}
}

0 comments on commit 2ab5f22

Please sign in to comment.