Skip to content

Commit

Permalink
Leverage ordinals in enrich lookup (#107449)
Browse files Browse the repository at this point in the history
This change leverages ordinals in enrich lookup. Instead of looking up 
and extracting enrich fields for all input terms, this improvement only
looks up and extracts the dictionary, then applies the ordinals to the
enrich results.

```
| 50th percentile | esql_stats_enrich_rates_fares | 242.949 | 34.7007 | -208.248 | ms |  -85.72% |
| 90th percentile | esql_stats_enrich_rates_fares | 245.479 | 36.3419 | -209.137 | ms |  -85.20% |
|100th percentile | esql_stats_enrich_rates_fares | 252.877 | 49.0826 | -203.795 | ms |  -80.59% |
```
  • Loading branch information
dnhatn committed Apr 16, 2024
1 parent 1e4d4da commit c2a3ec4
Show file tree
Hide file tree
Showing 14 changed files with 619 additions and 184 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/107449.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 107449
summary: Leverage ordinals in enrich lookup
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ public boolean isDense() {
return ordinals.getTotalValueCount() * 2 / 3 >= bytes.getPositionCount();
}

public IntBlock getOrdinalsBlock() {
return ordinals;
}

public BytesRefVector getDictionaryVector() {
return bytes;
}

@Override
public BytesRef getBytesRef(int valueIndex, BytesRef dest) {
return bytes.getBytesRef(ordinals.getInt(valueIndex), dest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,39 @@ public void testForPushDownEnrichRule() {
}
}

/**
* To enable enrich lookup using ordinals
*/
public void testManyDocuments() {
int numDocs = between(200, 2000);
var artists = Map.of("s1", "Eagles", "s2", "Linkin Park", "s3", "Linkin Park", "s4", "Disturbed");
client().admin()
.indices()
.prepareCreate("many_docs")
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1))
.setMapping("song_id", "type=keyword")
.get();
Map<String, Long> songs = new HashMap<>();
for (int i = 0; i < numDocs; i++) {
String song = randomFrom(artists.keySet());
client().prepareIndex("many_docs").setSource("song_id", song).get();
songs.merge(song, 1L, Long::sum);
}
client().admin().indices().prepareRefresh("many_docs").get();
try (EsqlQueryResponse resp = run("FROM many_docs | ENRICH songs | STATS count(*) BY artist")) {
List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
Map<String, Long> actual = new HashMap<>();
for (List<Object> value : values) {
actual.merge((String) value.get(1), (Long) value.get(0), Long::sum);
}
Map<String, Long> expected = new HashMap<>();
for (Map.Entry<String, Long> e : songs.entrySet()) {
expected.merge(artists.get(e.getKey()), e.getValue(), Long::sum);
}
assertThat(actual, equalTo(expected));
}
}

public static class LocalStateEnrich extends LocalStateCompositeXPackPlugin {

public LocalStateEnrich(final Settings settings, final Path configPath) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BooleanBlock;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.Releasables;
Expand All @@ -25,9 +26,9 @@
final class EnrichResultBuilderForBoolean extends EnrichResultBuilder {
private ObjectArray<boolean[]> cells;

EnrichResultBuilderForBoolean(BlockFactory blockFactory, int channel, int totalPositions) {
super(blockFactory, channel, totalPositions);
this.cells = blockFactory.bigArrays().newObjectArray(totalPositions);
EnrichResultBuilderForBoolean(BlockFactory blockFactory, int channel) {
super(blockFactory, channel);
this.cells = blockFactory.bigArrays().newObjectArray(1);
}

@Override
Expand All @@ -39,6 +40,7 @@ void addInputPage(IntVector positions, Page page) {
continue;
}
int cellPosition = positions.getInt(i);
cells = blockFactory.bigArrays().grow(cells, cellPosition + 1);
final var oldCell = cells.get(cellPosition);
final var newCell = extendCell(oldCell, valueCount);
cells.set(cellPosition, newCell);
Expand All @@ -59,30 +61,82 @@ private boolean[] extendCell(boolean[] oldCell, int newValueCount) {
}
}

@Override
Block build() {
try (BooleanBlock.Builder builder = blockFactory.newBooleanBlockBuilder(totalPositions)) {
for (int i = 0; i < totalPositions; i++) {
final var cell = cells.get(i);
if (cell == null) {
builder.appendNull();
continue;
}
if (cell.length > 1) {
builder.beginPositionEntry();
}
// TODO: sort and dedup
for (var v : cell) {
builder.appendBoolean(v);
}
if (cell.length > 1) {
builder.endPositionEntry();
private boolean[] combineCell(boolean[] first, boolean[] second) {
if (first == null) {
return second;
}
if (second == null) {
return first;
}
var result = new boolean[first.length + second.length];
System.arraycopy(first, 0, result, 0, first.length);
System.arraycopy(second, 0, result, first.length, second.length);
return result;
}

private void appendGroupToBlockBuilder(BooleanBlock.Builder builder, boolean[] group) {
if (group == null) {
builder.appendNull();
} else if (group.length == 1) {
builder.appendBoolean(group[0]);
} else {
builder.beginPositionEntry();
// TODO: sort and dedup and set MvOrdering
for (var v : group) {
builder.appendBoolean(v);
}
builder.endPositionEntry();
}
}

private boolean[] getCellOrNull(int position) {
return position < cells.size() ? cells.get(position) : null;
}

private Block buildWithSelected(IntBlock selected) {
try (BooleanBlock.Builder builder = blockFactory.newBooleanBlockBuilder(selected.getPositionCount())) {
for (int i = 0; i < selected.getPositionCount(); i++) {
int selectedCount = selected.getValueCount(i);
switch (selectedCount) {
case 0 -> builder.appendNull();
case 1 -> {
int groupId = selected.getInt(selected.getFirstValueIndex(i));
appendGroupToBlockBuilder(builder, getCellOrNull(groupId));
}
default -> {
int firstValueIndex = selected.getFirstValueIndex(i);
var cell = getCellOrNull(selected.getInt(firstValueIndex));
for (int p = 1; p < selectedCount; p++) {
int groupId = selected.getInt(firstValueIndex + p);
cell = combineCell(cell, getCellOrNull(groupId));
}
appendGroupToBlockBuilder(builder, cell);
}
}
}
return builder.build();
}
}

private Block buildWithSelected(IntVector selected) {
try (BooleanBlock.Builder builder = blockFactory.newBooleanBlockBuilder(selected.getPositionCount())) {
for (int i = 0; i < selected.getPositionCount(); i++) {
appendGroupToBlockBuilder(builder, getCellOrNull(selected.getInt(i)));
}
return builder.build();
}
}

@Override
Block build(IntBlock selected) {
var vector = selected.asVector();
if (vector != null) {
return buildWithSelected(vector);
} else {
return buildWithSelected(selected);
}
}

@Override
public void close() {
Releasables.close(cells, super::close);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.Releasables;
Expand All @@ -26,14 +27,15 @@
*/
final class EnrichResultBuilderForBytesRef extends EnrichResultBuilder {
private final BytesRefArray bytes; // shared between all cells
private BytesRef scratch = new BytesRef();
private ObjectArray<int[]> cells;

EnrichResultBuilderForBytesRef(BlockFactory blockFactory, int channel, int totalPositions) {
super(blockFactory, channel, totalPositions);
this.cells = blockFactory.bigArrays().newObjectArray(totalPositions);
EnrichResultBuilderForBytesRef(BlockFactory blockFactory, int channel) {
super(blockFactory, channel);
this.cells = blockFactory.bigArrays().newObjectArray(1);
BytesRefArray bytes = null;
try {
bytes = new BytesRefArray(totalPositions * 3L, blockFactory.bigArrays());
bytes = new BytesRefArray(1L, blockFactory.bigArrays());
this.bytes = bytes;
} finally {
if (bytes == null) {
Expand All @@ -52,6 +54,7 @@ void addInputPage(IntVector positions, Page page) {
continue;
}
int cellPosition = positions.getInt(i);
cells = blockFactory.bigArrays().grow(cells, cellPosition + 1);
final var oldCell = cells.get(cellPosition);
final var newCell = extendCell(oldCell, valueCount);
cells.set(cellPosition, newCell);
Expand All @@ -75,31 +78,82 @@ private int[] extendCell(int[] oldCell, int newValueCount) {
}
}

@Override
Block build() {
try (BytesRefBlock.Builder builder = blockFactory.newBytesRefBlockBuilder(totalPositions)) {
BytesRef scratch = new BytesRef();
for (int i = 0; i < totalPositions; i++) {
final var cell = cells.get(i);
if (cell == null) {
builder.appendNull();
continue;
}
if (cell.length > 1) {
builder.beginPositionEntry();
}
// TODO: sort and dedup
for (var v : cell) {
builder.appendBytesRef(bytes.get(v, scratch));
}
if (cell.length > 1) {
builder.endPositionEntry();
private int[] combineCell(int[] first, int[] second) {
if (first == null) {
return second;
}
if (second == null) {
return first;
}
var result = new int[first.length + second.length];
System.arraycopy(first, 0, result, 0, first.length);
System.arraycopy(second, 0, result, first.length, second.length);
return result;
}

private void appendGroupToBlockBuilder(BytesRefBlock.Builder builder, int[] group) {
if (group == null) {
builder.appendNull();
} else if (group.length == 1) {
builder.appendBytesRef(bytes.get(group[0], scratch));
} else {
builder.beginPositionEntry();
// TODO: sort and dedup and set MvOrdering
for (var v : group) {
builder.appendBytesRef(bytes.get(v, scratch));
}
builder.endPositionEntry();
}
}

private int[] getCellOrNull(int position) {
return position < cells.size() ? cells.get(position) : null;
}

private Block buildWithSelected(IntBlock selected) {
try (BytesRefBlock.Builder builder = blockFactory.newBytesRefBlockBuilder(selected.getPositionCount())) {
for (int i = 0; i < selected.getPositionCount(); i++) {
int selectedCount = selected.getValueCount(i);
switch (selectedCount) {
case 0 -> builder.appendNull();
case 1 -> {
int groupId = selected.getInt(selected.getFirstValueIndex(i));
appendGroupToBlockBuilder(builder, getCellOrNull(groupId));
}
default -> {
int firstValueIndex = selected.getFirstValueIndex(i);
var cell = getCellOrNull(selected.getInt(firstValueIndex));
for (int p = 1; p < selectedCount; p++) {
int groupId = selected.getInt(firstValueIndex + p);
cell = combineCell(cell, getCellOrNull(groupId));
}
appendGroupToBlockBuilder(builder, cell);
}
}
}
return builder.build();
}
}

private Block buildWithSelected(IntVector selected) {
try (BytesRefBlock.Builder builder = blockFactory.newBytesRefBlockBuilder(selected.getPositionCount())) {
for (int i = 0; i < selected.getPositionCount(); i++) {
appendGroupToBlockBuilder(builder, getCellOrNull(selected.getInt(i)));
}
return builder.build();
}
}

@Override
Block build(IntBlock selected) {
var vector = selected.asVector();
if (vector != null) {
return buildWithSelected(vector);
} else {
return buildWithSelected(selected);
}
}

@Override
public void close() {
Releasables.close(bytes, cells, super::close);
Expand Down

0 comments on commit c2a3ec4

Please sign in to comment.