Skip to content

Commit

Permalink
Handle es filter in Lucene query node (#335)
Browse files Browse the repository at this point in the history
This PR handles Elasticsearch queries in the Lucene query node. ES
queries are extracted from the filter parameter or translated from esql
filters.
  • Loading branch information
dnhatn committed Nov 3, 2022
1 parent 63acda7 commit a84d7b5
Show file tree
Hide file tree
Showing 9 changed files with 247 additions and 90 deletions.
Expand Up @@ -218,13 +218,23 @@ public Page getOutput() {

// initializes currentLeafReaderContext, currentScorer, and currentScorerPos when we switch to a new leaf reader
if (currentLeafReaderContext == null) {
currentLeafReaderContext = leaves.get(currentLeaf);
try {
currentScorer = weight.bulkScorer(currentLeafReaderContext.leafReaderContext);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
currentScorerPos = currentLeafReaderContext.minDoc;
assert currentScorer == null : "currentScorer wasn't reset";
do {
currentLeafReaderContext = leaves.get(currentLeaf);
currentScorerPos = currentLeafReaderContext.minDoc;
try {
currentScorer = weight.bulkScorer(currentLeafReaderContext.leafReaderContext);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
if (currentScorer == null) {
// doesn't match anything; move to the next leaf or abort if finished
currentLeaf++;
if (isFinished()) {
return null;
}
}
} while (currentScorer == null);
}

try {
Expand Down
101 changes: 101 additions & 0 deletions server/src/test/java/org/elasticsearch/compute/OperatorTests.java
Expand Up @@ -9,12 +9,25 @@
package org.elasticsearch.compute;

import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Scorable;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.store.Directory;
import org.apache.lucene.tests.index.RandomIndexWriter;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.compute.aggregation.Aggregator;
import org.elasticsearch.compute.aggregation.AggregatorFunction;
import org.elasticsearch.compute.aggregation.AggregatorMode;
Expand All @@ -40,7 +53,10 @@
import org.elasticsearch.compute.operator.exchange.PassthroughExchanger;
import org.elasticsearch.compute.operator.exchange.RandomExchanger;
import org.elasticsearch.compute.operator.exchange.RandomUnionSourceOperator;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -50,7 +66,11 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -63,6 +83,7 @@
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static java.util.stream.Collectors.toSet;
import static org.hamcrest.Matchers.equalTo;

@Experimental
public class OperatorTests extends ESTestCase {
Expand Down Expand Up @@ -221,6 +242,58 @@ public void testOperatorsWithLuceneSlicing() throws IOException {
}
}

public void testQueryOperator() throws IOException {
Map<BytesRef, Long> docs = new HashMap<>();
CheckedConsumer<DirectoryReader, IOException> verifier = reader -> {
final long from = randomBoolean() ? Long.MIN_VALUE : randomLongBetween(0, 10000);
final long to = randomBoolean() ? Long.MAX_VALUE : randomLongBetween(from, from + 10000);
final Query query = LongPoint.newRangeQuery("pt", from, to);
final String partition = randomFrom("shard", "segment", "doc");
final List<LuceneSourceOperator> queryOperators = switch (partition) {
case "shard" -> List.of(new LuceneSourceOperator(reader, 0, query));
case "segment" -> new LuceneSourceOperator(reader, 0, query).segmentSlice();
case "doc" -> new LuceneSourceOperator(reader, 0, query).docSlice(randomIntBetween(1, 10));
default -> throw new AssertionError("unknown partition [" + partition + "]");
};
List<Driver> drivers = new ArrayList<>();
Set<Integer> actualDocIds = Collections.newSetFromMap(ConcurrentCollections.newConcurrentMap());
for (LuceneSourceOperator queryOperator : queryOperators) {
PageConsumerOperator docCollector = new PageConsumerOperator(page -> {
Block idBlock = page.getBlock(0);
Block segmentBlock = page.getBlock(1);
for (int i = 0; i < idBlock.getPositionCount(); i++) {
int docBase = reader.leaves().get(segmentBlock.getInt(i)).docBase;
int docId = docBase + idBlock.getInt(i);
assertTrue("duplicated docId=" + docId, actualDocIds.add(docId));
}
});
drivers.add(new Driver(List.of(queryOperator, docCollector), () -> {}));
}
Driver.runToCompletion(threadPool.executor(ThreadPool.Names.SEARCH), drivers);
Set<Integer> expectedDocIds = searchForDocIds(reader, query);
assertThat("query=" + query + ", partition=" + partition, actualDocIds, equalTo(expectedDocIds));
};

try (Directory dir = newDirectory(); RandomIndexWriter w = new RandomIndexWriter(random(), dir)) {
int numDocs = randomIntBetween(0, 10_000);
for (int i = 0; i < numDocs; i++) {
Document d = new Document();
long point = randomLongBetween(0, 5000);
d.add(new LongPoint("pt", point));
BytesRef id = Uid.encodeId("id-" + randomIntBetween(0, 5000));
d.add(new Field("id", id, KeywordFieldMapper.Defaults.FIELD_TYPE));
if (docs.put(id, point) != null) {
w.updateDocument(new Term("id", id), d);
} else {
w.addDocument(d);
}
}
try (DirectoryReader reader = w.getReader()) {
verifier.accept(reader);
}
}
}

public void testOperatorsWithPassthroughExchange() {
ExchangeSource exchangeSource = new ExchangeSource();

Expand Down Expand Up @@ -916,4 +989,32 @@ public void addInput(Page page) {
throw new UnsupportedOperationException();
}
}

private static Set<Integer> searchForDocIds(IndexReader reader, Query query) throws IOException {
IndexSearcher searcher = new IndexSearcher(reader);
Set<Integer> docIds = new HashSet<>();
searcher.search(query, new Collector() {
@Override
public LeafCollector getLeafCollector(LeafReaderContext context) {
return new LeafCollector() {
@Override
public void setScorer(Scorable scorer) {

}

@Override
public void collect(int doc) {
int docId = context.docBase + doc;
assertTrue(docIds.add(docId));
}
};
}

@Override
public ScoreMode scoreMode() {
return ScoreMode.COMPLETE_NO_SCORES;
}
});
return docIds;
}
}
Expand Up @@ -9,13 +9,16 @@

import org.elasticsearch.Build;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.compute.Experimental;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin;
Expand All @@ -27,9 +30,13 @@
import org.junit.Assert;
import org.junit.Before;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalDouble;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
Expand Down Expand Up @@ -305,8 +312,45 @@ public void testRefreshSearchIdleShards() throws Exception {
Assert.assertEquals(20, results.values().size());
}

public void testESFilter() throws Exception {
String indexName = "test_filter";
ElasticsearchAssertions.assertAcked(
client().admin()
.indices()
.prepareCreate(indexName)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5)))
.get()
);
ensureYellow(indexName);
int numDocs = randomIntBetween(1, 5000);
Map<String, Long> docs = new HashMap<>();
List<IndexRequestBuilder> indexRequests = new ArrayList<>();
for (int i = 0; i < numDocs; i++) {
String id = "id-" + i;
long value = randomLongBetween(-100_000, 100_000);
docs.put(id, value);
indexRequests.add(client().prepareIndex().setIndex(indexName).setId(id).setSource(Map.of("val", value)));
}
indexRandom(true, randomBoolean(), indexRequests);
String command = "from test_filter | stats avg = avg(val)";
long from = randomBoolean() ? Long.MIN_VALUE : randomLongBetween(-1000, 1000);
long to = randomBoolean() ? Long.MAX_VALUE : randomLongBetween(from, from + 1000);
QueryBuilder filter = new RangeQueryBuilder("val").from(from, true).to(to, true);
EsqlQueryResponse results = new EsqlQueryRequestBuilder(client(), EsqlQueryAction.INSTANCE).query(command)
.filter(filter)
.pragmas(randomPragmas())
.get();
logger.info(results);
OptionalDouble avg = docs.values().stream().filter(v -> from <= v && v <= to).mapToLong(n -> n).average();
if (avg.isPresent()) {
assertEquals(avg.getAsDouble(), (double) results.values().get(0).get(0), 0.01d);
} else {
assertEquals(Double.NaN, (double) results.values().get(0).get(0), 0.01d);
}
}

private EsqlQueryResponse run(String esqlCommands) {
return new EsqlQueryRequestBuilder(client(), EsqlQueryAction.INSTANCE).query(esqlCommands).get();
return new EsqlQueryRequestBuilder(client(), EsqlQueryAction.INSTANCE).query(esqlCommands).pragmas(randomPragmas()).get();
}

private EsqlQueryResponse run(String esqlCommands, Settings pragmas) {
Expand All @@ -317,4 +361,24 @@ private EsqlQueryResponse run(String esqlCommands, Settings pragmas) {
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(EsqlPlugin.class);
}

private static Settings randomPragmas() {
Settings.Builder settings = Settings.builder();
// pragmas are only enabled on snapshot builds
if (Build.CURRENT.isSnapshot()) {
if (randomBoolean()) {
settings.put("add_task_parallelism_above_query", randomBoolean());
}
if (randomBoolean()) {
settings.put("task_concurrency", randomLongBetween(1, 10));
}
if (randomBoolean()) {
settings.put("buffer_max_pages", randomLongBetween(32, 2048));
}
if (randomBoolean()) {
settings.put("data_partitioning", randomFrom("shard", "segment", "doc"));
}
}
return settings.build();
}
}
Expand Up @@ -7,7 +7,9 @@

package org.elasticsearch.xpack.esql.plan.physical;

import org.elasticsearch.common.Strings;
import org.elasticsearch.compute.Experimental;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.xpack.ql.expression.Attribute;
import org.elasticsearch.xpack.ql.expression.FieldAttribute;
import org.elasticsearch.xpack.ql.index.EsIndex;
Expand Down Expand Up @@ -35,11 +37,13 @@ static boolean isSourceAttribute(Attribute attr) {
}

private final EsIndex index;
private final QueryBuilder query;
private final List<Attribute> attrs;

public EsQueryExec(Source source, EsIndex index) {
public EsQueryExec(Source source, EsIndex index, QueryBuilder query) {
super(source);
this.index = index;
this.query = query;
this.attrs = List.of(
new FieldAttribute(source, DOC_ID_FIELD.getName(), DOC_ID_FIELD),
new FieldAttribute(source, SEGMENT_ID_FIELD.getName(), SEGMENT_ID_FIELD),
Expand All @@ -49,21 +53,25 @@ public EsQueryExec(Source source, EsIndex index) {

@Override
protected NodeInfo<EsQueryExec> info() {
return NodeInfo.create(this, EsQueryExec::new, index);
return NodeInfo.create(this, EsQueryExec::new, index, query);
}

public EsIndex index() {
return index;
}

public QueryBuilder query() {
return query;
}

@Override
public List<Attribute> output() {
return attrs;
}

@Override
public int hashCode() {
return Objects.hash(index);
return Objects.hash(index, query);
}

@Override
Expand All @@ -77,7 +85,7 @@ public boolean equals(Object obj) {
}

EsQueryExec other = (EsQueryExec) obj;
return Objects.equals(index, other.index);
return Objects.equals(index, other.index) && Objects.equals(query, other.query);
}

@Override
Expand All @@ -87,6 +95,6 @@ public boolean singleNode() {

@Override
public String nodeString() {
return nodeName() + "[" + index + "]" + NodeUtils.limitedToString(attrs);
return nodeName() + "[" + index + "], query[" + Strings.toString(query, false, true) + "]" + NodeUtils.limitedToString(attrs);
}
}

0 comments on commit a84d7b5

Please sign in to comment.