Skip to content

Commit

Permalink
EQL: Use the implicit tiebreaker sort value in the search queries (#7…
Browse files Browse the repository at this point in the history
  • Loading branch information
astefan committed Apr 27, 2021
1 parent 5a2d2b0 commit fa92fc4
Show file tree
Hide file tree
Showing 9 changed files with 353 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class Criterion<Q extends QueryRequest> {
private final List<HitExtractor> keys;
private final HitExtractor timestamp;
private final HitExtractor tiebreaker;
private final HitExtractor implicitTiebreaker;

private final boolean descending;
private final int keySize;
Expand All @@ -31,12 +32,14 @@ public Criterion(int stage,
List<HitExtractor> keys,
HitExtractor timestamp,
HitExtractor tiebreaker,
HitExtractor implicitTiebreaker,
boolean descending) {
this.stage = stage;
this.queryRequest = queryRequest;
this.keys = keys;
this.timestamp = timestamp;
this.tiebreaker = tiebreaker;
this.implicitTiebreaker = implicitTiebreaker;

this.descending = descending;

Expand Down Expand Up @@ -73,7 +76,6 @@ public Object[] key(SearchHit hit) {

@SuppressWarnings({ "unchecked" })
public Ordinal ordinal(SearchHit hit) {

Object ts = timestamp.extract(hit);
if (ts instanceof Number == false) {
throw new EqlIllegalArgumentException("Expected timestamp as long but got {}", ts);
Expand All @@ -89,7 +91,13 @@ public Ordinal ordinal(SearchHit hit) {
}
tbreaker = (Comparable<Object>) tb;
}
return new Ordinal(timestamp, tbreaker);

Object implicitTbreaker = implicitTiebreaker.extract(hit);
if (implicitTbreaker instanceof Number == false) {
throw new EqlIllegalArgumentException("Expected _shard_doc/implicit tiebreaker as long but got [{}]", implicitTbreaker);
}
long implicitTiebreaker = ((Number) implicitTbreaker).longValue();
return new Ordinal(timestamp, tbreaker, implicitTiebreaker);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.xpack.eql.execution.search.QueryRequest;
import org.elasticsearch.xpack.eql.execution.search.RuntimeUtils;
import org.elasticsearch.xpack.eql.execution.search.extractor.FieldHitExtractor;
import org.elasticsearch.xpack.eql.execution.search.extractor.ImplicitTiebreakerHitExtractor;
import org.elasticsearch.xpack.eql.execution.search.extractor.TimestampFieldHitExtractor;
import org.elasticsearch.xpack.eql.execution.sequence.SequenceMatcher;
import org.elasticsearch.xpack.eql.execution.sequence.TumblingWindow;
Expand Down Expand Up @@ -59,6 +60,8 @@ public Executable assemble(List<List<Attribute>> listOfKeys,
// fields
HitExtractor tsExtractor = timestampExtractor(hitExtractor(timestamp, extractorRegistry));
HitExtractor tbExtractor = Expressions.isPresent(tiebreaker) ? hitExtractor(tiebreaker, extractorRegistry) : null;
// implicit tiebreake, present only in the response and which doesn't have a corresponding field
HitExtractor itbExtractor = ImplicitTiebreakerHitExtractor.INSTANCE;
// NB: since there's no aliasing inside EQL, the attribute name is the same as the underlying field name
String timestampName = Expressions.name(timestamp);

Expand Down Expand Up @@ -93,7 +96,7 @@ public Executable assemble(List<List<Attribute>> listOfKeys,
QueryRequest original = () -> source;
BoxedQueryRequest boxedRequest = new BoxedQueryRequest(original, timestampName, keyFields);
Criterion<BoxedQueryRequest> criterion =
new Criterion<>(i, boxedRequest, keyExtractors, tsExtractor, tbExtractor, i == 0 && descending);
new Criterion<>(i, boxedRequest, keyExtractors, tsExtractor, tbExtractor, itbExtractor, i == 0 && descending);
criteria.add(criterion);
} else {
// until
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ public class Ordinal implements Comparable<Ordinal> {

private final long timestamp;
private final Comparable<Object> tiebreaker;
private final long implicitTiebreaker; // _shard_doc tiebreaker automatically added by ES PIT

public Ordinal(long timestamp, Comparable<Object> tiebreaker) {
public Ordinal(long timestamp, Comparable<Object> tiebreaker, long implicitTiebreaker) {
this.timestamp = timestamp;
this.tiebreaker = tiebreaker;
this.implicitTiebreaker = implicitTiebreaker;
}

public long timestamp() {
Expand All @@ -27,9 +29,13 @@ public Comparable<Object> tiebreaker() {
return tiebreaker;
}

public long implicitTiebreaker() {
return implicitTiebreaker;
}

@Override
public int hashCode() {
return Objects.hash(timestamp, tiebreaker);
return Objects.hash(timestamp, tiebreaker, implicitTiebreaker);
}

@Override
Expand All @@ -44,12 +50,13 @@ public boolean equals(Object obj) {

Ordinal other = (Ordinal) obj;
return Objects.equals(timestamp, other.timestamp)
&& Objects.equals(tiebreaker, other.tiebreaker);
&& Objects.equals(tiebreaker, other.tiebreaker)
&& Objects.equals(implicitTiebreaker, other.implicitTiebreaker);
}

@Override
public String toString() {
return "[" + timestamp + "][" + (tiebreaker != null ? tiebreaker.toString() : "") + "]";
return "[" + timestamp + "][" + (tiebreaker != null ? tiebreaker.toString() : "") + "][" + implicitTiebreaker + "]";
}

@Override
Expand All @@ -59,16 +66,22 @@ public int compareTo(Ordinal o) {
}
if (timestamp == o.timestamp) {
if (tiebreaker != null) {
// if the other tiebreaker is null, it is higher (nulls are last)
return o.tiebreaker != null ? tiebreaker.compareTo(o.tiebreaker) : -1;
if (o.tiebreaker != null) {
if (tiebreaker.compareTo(o.tiebreaker) == 0) {
return Long.compare(implicitTiebreaker, o.implicitTiebreaker);
}
return tiebreaker.compareTo(o.tiebreaker);
} else {
return -1;
}
}
// this tiebreaker is null
else {
// nulls are last so unless both are null (equal)
// this ordinal is greater (after) then the other tiebreaker
// so fall through to 1
if (o.tiebreaker == null) {
return 0;
return Long.compare(implicitTiebreaker, o.implicitTiebreaker);
}
}
}
Expand Down Expand Up @@ -97,6 +110,8 @@ public boolean afterOrAt(Ordinal other) {
}

public Object[] toArray() {
return tiebreaker != null ? new Object[] { timestamp, tiebreaker } : new Object[] { timestamp };
return tiebreaker != null ?
new Object[] { timestamp, tiebreaker, implicitTiebreaker }
: new Object[] { timestamp, implicitTiebreaker };
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.eql.execution.search.extractor;

import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.eql.EqlIllegalArgumentException;
import org.elasticsearch.xpack.ql.execution.search.extractor.HitExtractor;

import java.io.IOException;

/**
* Returns the implicit Elasticsearch tiebreaker value associated with a PIT request for every search hit against which it is run.
*/
public class ImplicitTiebreakerHitExtractor implements HitExtractor {

public static final HitExtractor INSTANCE = new ImplicitTiebreakerHitExtractor();
static final String NAME = "tb";

private ImplicitTiebreakerHitExtractor() {}

@Override
public void writeTo(StreamOutput out) throws IOException { }

@Override
public String getWriteableName() {
return NAME;
}

@Override
public Object extract(SearchHit hit) {
Object[] sortValues = hit.getRawSortValues();
if (sortValues.length == 0) {
throw new EqlIllegalArgumentException("Expected at least one sorting value in the search hit, but got none");
}
return sortValues[sortValues.length - 1];
}

@Override
public String hitName() {
return null;
}

@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
return false;
}
return true;
}

@Override
public int hashCode() {
return ImplicitTiebreakerHitExtractor.class.hashCode();
}

@Override
public String toString() {
return "_shard_doc";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchSortValues;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.elasticsearch.xpack.eql.action.EqlSearchAction;
Expand Down Expand Up @@ -74,4 +76,23 @@ public static IndicesOptions randomIndicesOptions() {
return IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean(),
randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean());
}

public static SearchSortValues randomSearchSortValues(Object[] values) {
DocValueFormat[] sortValueFormats = new DocValueFormat[values.length];
for (int i = 0; i < values.length; i++) {
sortValueFormats[i] = DocValueFormat.RAW;
}
return new SearchSortValues(values, sortValueFormats);
}

public static SearchSortValues randomSearchLongSortValues() {
int size = randomIntBetween(1, 20);
Object[] values = new Object[size];
DocValueFormat[] sortValueFormats = new DocValueFormat[size];
for (int i = 0; i < size; i++) {
values[i] = randomLong();
sortValueFormats[i] = DocValueFormat.RAW;
}
return new SearchSortValues(values, sortValueFormats);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.eql.execution.assembler;

import org.apache.lucene.search.TotalHits;
import org.apache.lucene.search.TotalHits.Relation;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchResponse.Clusters;
import org.elasticsearch.action.search.SearchResponseSections;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchSortValues;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.eql.execution.assembler.SequenceSpecTests.TimestampExtractor;
import org.elasticsearch.xpack.eql.execution.search.HitReference;
import org.elasticsearch.xpack.eql.execution.search.QueryClient;
import org.elasticsearch.xpack.eql.execution.search.QueryRequest;
import org.elasticsearch.xpack.eql.execution.search.extractor.ImplicitTiebreakerHitExtractor;
import org.elasticsearch.xpack.eql.execution.sequence.SequenceMatcher;
import org.elasticsearch.xpack.eql.execution.sequence.TumblingWindow;
import org.elasticsearch.xpack.ql.execution.search.extractor.HitExtractor;

import java.util.ArrayList;
import java.util.List;

import static java.util.Collections.emptyList;
import static org.elasticsearch.action.ActionListener.wrap;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;

public class ImplicitTiebreakerTests extends ESTestCase {

private final List<HitExtractor> keyExtractors = emptyList();
private final HitExtractor tsExtractor = TimestampExtractor.INSTANCE;
private final HitExtractor tbExtractor = null;
private final HitExtractor implicitTbExtractor = ImplicitTiebreakerHitExtractor.INSTANCE;
private final List<Long> implicitTiebreakerValues;
private final int stages = randomIntBetween(3, 10);

public ImplicitTiebreakerTests() {
this.implicitTiebreakerValues = new ArrayList<>(stages);
for (int i = 0; i < stages; i++) {
implicitTiebreakerValues.add(randomLong());
}
}

class TestQueryClient implements QueryClient {

@Override
public void query(QueryRequest r, ActionListener<SearchResponse> l) {
int ordinal = r.searchSource().terminateAfter();
if (ordinal > 0) {
int previous = ordinal - 1;
// except the first request, the rest should have the previous response's search_after _shard_doc value
assertArrayEquals("Elements at stage " + ordinal + " do not match",
r.searchSource().searchAfter(), new Object[] { (long) previous, implicitTiebreakerValues.get(previous) });
}

long sortValue = implicitTiebreakerValues.get(ordinal);
SearchHit searchHit = new SearchHit(ordinal, String.valueOf(ordinal), null, null);
searchHit.sortValues(new SearchSortValues(
new Long[] { (long) ordinal, sortValue },
new DocValueFormat[] { DocValueFormat.RAW, DocValueFormat.RAW }));
SearchHits searchHits = new SearchHits(new SearchHit[] { searchHit }, new TotalHits(1, Relation.EQUAL_TO), 0.0f);
SearchResponseSections internal = new SearchResponseSections(searchHits, null, null, false, false, null, 0);
SearchResponse s = new SearchResponse(internal, null, 0, 1, 0, 0, null, Clusters.EMPTY);
l.onResponse(s);
}

@Override
public void fetchHits(Iterable<List<HitReference>> refs, ActionListener<List<List<SearchHit>>> listener) {
List<List<SearchHit>> searchHits = new ArrayList<>();
for (List<HitReference> ref : refs) {
List<SearchHit> hits = new ArrayList<>(ref.size());
for (HitReference hitRef : ref) {
hits.add(new SearchHit(-1, hitRef.id(), null, null));
}
searchHits.add(hits);
}
listener.onResponse(searchHits);
}
}

public void testImplicitTiebreakerBeingSet() {
QueryClient client = new TestQueryClient();
List<Criterion<BoxedQueryRequest>> criteria = new ArrayList<>(stages);
boolean descending = randomBoolean();
boolean criteriaDescending = descending;

for (int i = 0; i < stages; i++) {
final int j = i;
criteria.add(new Criterion<BoxedQueryRequest>(i,
new BoxedQueryRequest(() -> SearchSourceBuilder.searchSource()
.size(10)
.query(matchAllQuery())
.terminateAfter(j), "@timestamp", emptyList()),
keyExtractors,
tsExtractor,
tbExtractor,
implicitTbExtractor,
criteriaDescending));
// for DESC (TAIL) sequences only the first criterion is descending the rest are ASC, so flip it after the first query
if (criteriaDescending && i == 0) {
criteriaDescending = false;
}
}

SequenceMatcher matcher = new SequenceMatcher(stages, descending, TimeValue.MINUS_ONE, null);
TumblingWindow window = new TumblingWindow(client, criteria, null, matcher);
window.execute(wrap(p -> {}, ex -> {
throw ExceptionsHelper.convertToRuntime(ex);
}));
}
}

0 comments on commit fa92fc4

Please sign in to comment.