Skip to content

Commit

Permalink
Add enrich operator and lookup service (ESQL-1198)
Browse files Browse the repository at this point in the history
This PR introduces the EnrichLookupService and its client,
EnrichLookupOperator. The lookup service can operate on any single-shard
indices, not limited to enrich indices. To minimize communication
overhead, the lookup process occurs on a per-page basis instead of per
row and uses the local shard copy of the lookup index. Currently, the
lookup service only supports the "match" type, other match types will be
added later. The lookup pipeline consists of three operators:

1. Finding the document IDs that match the input terms. This stage is
performed by the MatchQueryOperator (or its variants). The input terms
are sorted alphabetically to optimize I/O when positioning them. The
resulting document IDs are also sorted in ascending order to improve the
performance of field extraction in step 2.

2. Extracting field values for the matched document IDs. This is done by
the existing ValuesSourceReaderOperator for each enrich field.

3. Combining the extracted values based on positions and filling nulls
for positions without matches. This is done by MergePositionsOperator.
It supports only single position for now.
  • Loading branch information
dnhatn committed Jun 6, 2023
1 parent 4f2f22f commit 6527adc
Show file tree
Hide file tree
Showing 11 changed files with 1,264 additions and 11 deletions.
Expand Up @@ -141,6 +141,18 @@ public Page appendBlocks(Block[] toAdd) {
return new Page(false, positionCount, newBlocks);
}

/**
* Creates a new page, appending the blocks of the given block to the existing blocks in this Page.
*
* @param toAdd the page to append
* @return a new Page
* @throws IllegalArgumentException if any blocks of the given page does not have the same number of
* positions as the blocks in this Page
*/
public Page appendPage(Page toAdd) {
return appendBlocks(toAdd.blocks);
}

@Override
public int hashCode() {
int result = Objects.hash(positionCount);
Expand Down
Expand Up @@ -153,7 +153,7 @@ public LongBlock readValues(IntVector docs) throws IOException {
for (int i = 0; i < positionCount; i++) {
int doc = docs.getInt(i);
// docs within same block must be in order
if (lastDoc >= doc) {
if (doc < lastDoc) {
throw new IllegalStateException("docs within same block must be in order");
}
if (numericDocValues.advanceExact(doc)) {
Expand Down Expand Up @@ -207,7 +207,7 @@ public LongBlock readValues(IntVector docs) throws IOException {
for (int i = 0; i < positionCount; i++) {
int doc = docs.getInt(i);
// docs within same block must be in order
if (this.docID >= doc) {
if (doc < this.docID) {
throw new IllegalStateException("docs within same block must be in order");
}
read(doc, blockBuilder);
Expand Down Expand Up @@ -270,7 +270,7 @@ public IntBlock readValues(IntVector docs) throws IOException {
for (int i = 0; i < positionCount; i++) {
int doc = docs.getInt(i);
// docs within same block must be in order
if (lastDoc >= doc) {
if (doc < lastDoc) {
throw new IllegalStateException("docs within same block must be in order");
}
if (numericDocValues.advanceExact(doc)) {
Expand Down Expand Up @@ -324,7 +324,7 @@ public IntBlock readValues(IntVector docs) throws IOException {
for (int i = 0; i < positionCount; i++) {
int doc = docs.getInt(i);
// docs within same block must be in order
if (this.docID >= doc) {
if (doc < this.docID) {
// TODO this may not be true after sorting many docs in a single segment.
throw new IllegalStateException("docs within same block must be in order");
}
Expand Down Expand Up @@ -389,7 +389,7 @@ public DoubleBlock readValues(IntVector docs) throws IOException {
for (int i = 0; i < positionCount; i++) {
int doc = docs.getInt(i);
// docs within same block must be in order
if (lastDoc >= doc) {
if (doc < lastDoc) {
throw new IllegalStateException("docs within same block must be in order");
}
if (numericDocValues.advanceExact(doc)) {
Expand Down Expand Up @@ -445,7 +445,7 @@ public DoubleBlock readValues(IntVector docs) throws IOException {
for (int i = 0; i < positionCount; i++) {
int doc = docs.getInt(i);
// docs within same block must be in order
if (this.docID >= doc) {
if (doc < this.docID) {
throw new IllegalStateException("docs within same block must be in order");
}
read(doc, blockBuilder);
Expand Down Expand Up @@ -507,7 +507,7 @@ public BytesRefBlock readValues(IntVector docs) throws IOException {
for (int i = 0; i < docs.getPositionCount(); i++) {
int doc = docs.getInt(i);
// docs within same block must be in order
if (this.docID >= doc) {
if (doc < this.docID) {
throw new IllegalStateException("docs within same block must be in order");
}
read(doc, blockBuilder);
Expand Down Expand Up @@ -569,7 +569,7 @@ public BooleanBlock readValues(IntVector docs) throws IOException {
for (int i = 0; i < positionCount; i++) {
int doc = docs.getInt(i);
// docs within same block must be in order
if (lastDoc >= doc) {
if (doc < lastDoc) {
throw new IllegalStateException("docs within same block must be in order");
}
if (numericDocValues.advanceExact(doc)) {
Expand Down Expand Up @@ -623,7 +623,7 @@ public BooleanBlock readValues(IntVector docs) throws IOException {
for (int i = 0; i < positionCount; i++) {
int doc = docs.getInt(i);
// docs within same block must be in order
if (this.docID >= doc) {
if (doc < this.docID) {
throw new IllegalStateException("docs within same block must be in order");
}
read(doc, blockBuilder);
Expand Down
@@ -0,0 +1,199 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.lookup;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.Driver;
import org.elasticsearch.compute.operator.DriverRunner;
import org.elasticsearch.compute.operator.OutputOperator;
import org.elasticsearch.compute.operator.SourceOperator;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.action.AbstractEsqlIntegTestCase;
import org.elasticsearch.xpack.esql.action.EsqlQueryRequest;
import org.elasticsearch.xpack.esql.enrich.EnrichLookupOperator;
import org.elasticsearch.xpack.esql.plugin.TransportEsqlQueryAction;
import org.elasticsearch.xpack.ql.expression.Attribute;
import org.elasticsearch.xpack.ql.expression.FieldAttribute;
import org.elasticsearch.xpack.ql.tree.Source;
import org.elasticsearch.xpack.ql.type.DataTypes;
import org.elasticsearch.xpack.ql.type.EsField;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import static org.hamcrest.Matchers.equalTo;

public class EnrichLookupIT extends AbstractEsqlIntegTestCase {

public void testSimple() {
ElasticsearchAssertions.assertAcked(
client().admin()
.indices()
.prepareCreate("users")
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1))
.setMapping(
"uid",
"type=keyword,doc_values=false",
"name",
"type=keyword,index=false",
"city",
"type=keyword,index=false",
"joined",
"type=date,index=false,format=yyyy-MM-dd"
)
);
List<Map<String, String>> users = List.of(
Map.of("uid", "j1", "name", "John", "city", "New York/NY", "joined", "2020-03-01"),
Map.of("uid", "m4", "name", "Mike", "city", "Boston/MA", "joined", "2010-06-20"),
Map.of("uid", "j2", "name", "Jack", "city", "Austin/TX", "joined", "1999-11-03")
);
for (Map<String, String> user : users) {
client().prepareIndex("users").setSource(user).get();
}
client().admin().indices().prepareForceMerge("users").setMaxNumSegments(1).get();
client().admin().indices().prepareRefresh("users").get();
List<Attribute> enrichAttributes = List.of(
new FieldAttribute(Source.EMPTY, "name", new EsField("name", DataTypes.KEYWORD, Map.of(), true)),
new FieldAttribute(Source.EMPTY, "city", new EsField("city", DataTypes.KEYWORD, Map.of(), true)),
new FieldAttribute(Source.EMPTY, "joined", new EsField("joined", DataTypes.DATETIME, Map.of(), true))
);

DiscoveryNode clientNode = randomFrom(clusterService().state().nodes().stream().toList());
var lookupService = internalCluster().getInstance(TransportEsqlQueryAction.class, clientNode.getName()).enrichLookupService();
TransportService transportService = internalCluster().getInstance(TransportService.class, clientNode.getName());

EsqlQueryRequest parentRequest = new EsqlQueryRequest();
parentRequest.query("FROM index");
CancellableTask parentTask = (CancellableTask) transportService.getTaskManager().register("test", "test-action", parentRequest);
EnrichLookupOperator enrichOperator = new EnrichLookupOperator(
"test-session",
parentTask,
randomIntBetween(1, 3),
0,
lookupService,
"users",
"match",
"uid",
enrichAttributes
);
BytesRefBlock userBlock = BytesRefBlock.newBlockBuilder(5)
.appendBytesRef(new BytesRef("j1"))
.appendNull()
.appendBytesRef(new BytesRef("j2"))
.appendBytesRef(new BytesRef("j1"))
.appendBytesRef(new BytesRef("m3"))
.build();
SourceOperator sourceOperator = sourceOperator(userBlock);

AtomicReference<Page> outputPage = new AtomicReference<>();
OutputOperator outputOperator = new OutputOperator(List.of(), Function.identity(), page -> {
outputPage.getAndUpdate(current -> {
if (current == null) {
return page;
}
Block.Builder[] builders = new Block.Builder[current.getBlockCount()];
for (int i = 0; i < current.getBlockCount(); i++) {
builders[i] = current.getBlock(i).elementType().newBlockBuilder(1);
builders[i].copyFrom(current.getBlock(i), 0, current.getPositionCount());
builders[i].copyFrom(page.getBlock(i), 0, page.getPositionCount());
}
return new Page(Arrays.stream(builders).map(Block.Builder::build).toArray(Block[]::new));
});
});

DateFormatter dateFmt = DateFormatter.forPattern("yyyy-MM-dd");

ExecutorService executor = internalCluster().getInstance(TransportService.class).getThreadPool().executor(ThreadPool.Names.GENERIC);
DriverRunner.runToCompletion(executor, List.of(new Driver(sourceOperator, List.of(enrichOperator), outputOperator, () -> {})));
transportService.getTaskManager().unregister(parentTask);
Page output = outputPage.get();
assertThat(output.getBlockCount(), equalTo(4));
assertThat(output.getPositionCount(), equalTo(5));
BytesRef scratch = new BytesRef();
BytesRefBlock names = output.getBlock(1);
BytesRefBlock cities = output.getBlock(2);
LongBlock dates = output.getBlock(3);

assertThat(names.getBytesRef(0, scratch), equalTo(new BytesRef("John")));
assertThat(cities.getBytesRef(0, scratch), equalTo(new BytesRef("New York/NY")));
assertThat(dateFmt.formatMillis(dates.getLong(0)), equalTo("2020-03-01"));

assertTrue(names.isNull(1));
assertTrue(cities.isNull(1));
assertTrue(dates.isNull(1));

assertThat(names.getBytesRef(2, scratch), equalTo(new BytesRef("Jack")));
assertThat(cities.getBytesRef(2, scratch), equalTo(new BytesRef("Austin/TX")));
assertThat(dateFmt.formatMillis(dates.getLong(2)), equalTo("1999-11-03"));

assertThat(names.getBytesRef(3, scratch), equalTo(new BytesRef("John")));
assertThat(cities.getBytesRef(3, scratch), equalTo(new BytesRef("New York/NY")));
assertThat(dateFmt.formatMillis(dates.getLong(3)), equalTo("2020-03-01"));

assertTrue(names.isNull(4));
assertTrue(cities.isNull(4));
assertTrue(dates.isNull(4));
}

private static SourceOperator sourceOperator(BytesRefBlock input) {
return new SourceOperator() {
int position = 0;

@Override
public void finish() {

}

@Override
public boolean isFinished() {
return position >= input.getPositionCount();
}

@Override
public Page getOutput() {
if (isFinished()) {
return null;
}
int remaining = input.getPositionCount() - position;
int size = between(1, remaining);
BytesRefBlock.Builder builder = BytesRefBlock.newBlockBuilder(size);
builder.copyFrom(input, position, position + size);
position += size;
return new Page(builder.build());
}

@Override
public void close() {

}
};
}

public void testRandom() {

}

public void testMultipleMatches() {

}
}
@@ -0,0 +1,71 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.enrich;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.AsyncOperator;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.xpack.ql.expression.Attribute;

import java.util.List;

public final class EnrichLookupOperator extends AsyncOperator {
private final EnrichLookupService enrichLookupService;
private final String sessionId;
private final CancellableTask parentTask;
private final int inputChannel;
private final String enrichIndex;
private final String matchType;
private final String matchField;
private final List<Attribute> enrichFields;

public EnrichLookupOperator(
String sessionId,
CancellableTask parentTask,
int maxOutstandingRequests,
int inputChannel,
EnrichLookupService enrichLookupService,
String enrichIndex,
String matchType,
String matchField,
List<Attribute> enrichFields
) {
super(maxOutstandingRequests);
this.sessionId = sessionId;
this.parentTask = parentTask;
this.inputChannel = inputChannel;
this.enrichLookupService = enrichLookupService;
this.enrichIndex = enrichIndex;
this.matchType = matchType;
this.matchField = matchField;
this.enrichFields = enrichFields;
}

@Override
protected void performAsync(Page inputPage, ActionListener<Page> listener) {
final Block inputBlock = inputPage.getBlock(inputChannel);
enrichLookupService.lookupAsync(
sessionId,
parentTask,
enrichIndex,
matchType,
matchField,
enrichFields,
new Page(inputBlock),
listener.map(inputPage::appendPage)
);
}

@Override
public void close() {
// TODO: Maybe create a sub-task as the parent task of all the lookup tasks
// then cancel it when this operator terminates early (e.g., have enough result).
}
}

0 comments on commit 6527adc

Please sign in to comment.