Skip to content

Commit

Permalink
ESQL: Add lookup flavor to MultivalueDedupe (#107624)
Browse files Browse the repository at this point in the history
The infrastruction that powers aggregation grouping uses a method called
`MultivalueDedupe#hash` which deduplicates all the values in a row, adds
them to the hash, and then returns all of the ordinals it added. This
renames that method to `hashAdd` and adds another method `hashLookup`
which performs the same deduplication and then looks the values up in
the hash instead of adding them. It produces the same values as
`hashAdd` would return, except when the hash doesn't contain the value -
in that case it just adds `null` to the block of ordinals.

Think of this as looking up what the aggregation ordinal would be,
rather than building ordinals. I have big plans for this.
  • Loading branch information
nik9000 committed Apr 18, 2024
1 parent 8594954 commit ead585f
Show file tree
Hide file tree
Showing 12 changed files with 1,213 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,11 @@ public BytesRefBlock sortToBlock(BlockFactory blockFactory, boolean ascending) {
}

/**
* Dedupe values and build a {@link IntBlock} suitable for passing
* as the grouping block to a {@link GroupingAggregatorFunction}.
* Dedupe values, add them to the hash, and build an {@link IntBlock} of
* their hashes. This block is suitable for passing as the grouping block
* to a {@link GroupingAggregatorFunction}.
*/
public MultivalueDedupe.HashResult hash(BlockFactory blockFactory, BytesRefHash hash) {
public MultivalueDedupe.HashResult hashAdd(BlockFactory blockFactory, BytesRefHash hash) {
try (IntBlock.Builder builder = blockFactory.newIntBlockBuilder(block.getPositionCount())) {
boolean sawNull = false;
for (int p = 0; p < block.getPositionCount(); p++) {
Expand All @@ -184,15 +185,15 @@ public MultivalueDedupe.HashResult hash(BlockFactory blockFactory, BytesRefHash
}
case 1 -> {
BytesRef v = block.getBytesRef(first, work[0]);
hash(builder, hash, v);
hashAdd(builder, hash, v);
}
default -> {
if (count < ALWAYS_COPY_MISSING) {
copyMissing(first, count);
hashUniquedWork(hash, builder);
hashAddUniquedWork(hash, builder);
} else {
copyAndSort(first, count);
hashSortedWork(hash, builder);
hashAddSortedWork(hash, builder);
}
}
}
Expand All @@ -201,6 +202,36 @@ public MultivalueDedupe.HashResult hash(BlockFactory blockFactory, BytesRefHash
}
}

/**
* Dedupe values and build an {@link IntBlock} of their hashes. This block is
* suitable for passing as the grouping block to a {@link GroupingAggregatorFunction}.
*/
public IntBlock hashLookup(BlockFactory blockFactory, BytesRefHash hash) {
try (IntBlock.Builder builder = blockFactory.newIntBlockBuilder(block.getPositionCount())) {
for (int p = 0; p < block.getPositionCount(); p++) {
int count = block.getValueCount(p);
int first = block.getFirstValueIndex(p);
switch (count) {
case 0 -> builder.appendInt(0);
case 1 -> {
BytesRef v = block.getBytesRef(first, work[0]);
hashLookupSingle(builder, hash, v);
}
default -> {
if (count < ALWAYS_COPY_MISSING) {
copyMissing(first, count);
hashLookupUniquedWork(hash, builder);
} else {
copyAndSort(first, count);
hashLookupSortedWork(hash, builder);
}
}
}
}
return builder.build();
}
}

/**
* Build a {@link BatchEncoder} which deduplicates values at each position
* and then encodes the results into a {@link byte[]} which can be used for
Expand Down Expand Up @@ -352,38 +383,170 @@ private void writeSortedWork(BytesRefBlock.Builder builder, boolean ascending) {
/**
* Writes an already deduplicated {@link #work} to a hash.
*/
private void hashUniquedWork(BytesRefHash hash, IntBlock.Builder builder) {
private void hashAddUniquedWork(BytesRefHash hash, IntBlock.Builder builder) {
if (w == 1) {
hash(builder, hash, work[0]);
hashAdd(builder, hash, work[0]);
return;
}
builder.beginPositionEntry();
for (int i = 0; i < w; i++) {
hash(builder, hash, work[i]);
hashAdd(builder, hash, work[i]);
}
builder.endPositionEntry();
}

/**
* Writes a sorted {@link #work} to a hash, skipping duplicates.
*/
private void hashSortedWork(BytesRefHash hash, IntBlock.Builder builder) {
private void hashAddSortedWork(BytesRefHash hash, IntBlock.Builder builder) {
if (w == 1) {
hash(builder, hash, work[0]);
hashAdd(builder, hash, work[0]);
return;
}
builder.beginPositionEntry();
BytesRef prev = work[0];
hash(builder, hash, prev);
hashAdd(builder, hash, prev);
for (int i = 1; i < w; i++) {
if (false == prev.equals(work[i])) {
if (false == valuesEqual(prev, work[i])) {
prev = work[i];
hash(builder, hash, prev);
hashAdd(builder, hash, prev);
}
}
builder.endPositionEntry();
}

/**
* Looks up an already deduplicated {@link #work} to a hash.
*/
private void hashLookupUniquedWork(BytesRefHash hash, IntBlock.Builder builder) {
if (w == 1) {
hashLookupSingle(builder, hash, work[0]);
return;
}

int i = 1;
long firstLookup = hashLookup(hash, work[0]);
while (firstLookup < 0) {
if (i >= w) {
// Didn't find any values
builder.appendNull();
return;
}
firstLookup = hashLookup(hash, work[i]);
i++;
}

/*
* Step 2 - find the next unique value in the hash
*/
boolean foundSecond = false;
while (i < w) {
long nextLookup = hashLookup(hash, work[i]);
if (nextLookup >= 0) {
builder.beginPositionEntry();
appendFound(builder, firstLookup);
appendFound(builder, nextLookup);
i++;
foundSecond = true;
break;
}
i++;
}

/*
* Step 3a - we didn't find a second value, just emit the first one
*/
if (false == foundSecond) {
appendFound(builder, firstLookup);
return;
}

/*
* Step 3b - we found a second value, search for more
*/
while (i < w) {
long nextLookup = hashLookup(hash, work[i]);
if (nextLookup >= 0) {
appendFound(builder, nextLookup);
}
i++;
}
builder.endPositionEntry();
}

/**
* Looks up a sorted {@link #work} to a hash, skipping duplicates.
*/
private void hashLookupSortedWork(BytesRefHash hash, IntBlock.Builder builder) {
if (w == 1) {
hashLookupSingle(builder, hash, work[0]);
return;
}

/*
* Step 1 - find the first unique value in the hash
* i will contain the next value to probe
* prev will contain the first value in the array contained in the hash
* firstLookup will contain the first value in the hash
*/
int i = 1;
BytesRef prev = work[0];
long firstLookup = hashLookup(hash, prev);
while (firstLookup < 0) {
if (i >= w) {
// Didn't find any values
builder.appendNull();
return;
}
prev = work[i];
firstLookup = hashLookup(hash, prev);
i++;
}

/*
* Step 2 - find the next unique value in the hash
*/
boolean foundSecond = false;
while (i < w) {
if (false == valuesEqual(prev, work[i])) {
long nextLookup = hashLookup(hash, work[i]);
if (nextLookup >= 0) {
prev = work[i];
builder.beginPositionEntry();
appendFound(builder, firstLookup);
appendFound(builder, nextLookup);
i++;
foundSecond = true;
break;
}
}
i++;
}

/*
* Step 3a - we didn't find a second value, just emit the first one
*/
if (false == foundSecond) {
appendFound(builder, firstLookup);
return;
}

/*
* Step 3b - we found a second value, search for more
*/
while (i < w) {
if (false == valuesEqual(prev, work[i])) {
long nextLookup = hashLookup(hash, work[i]);
if (nextLookup >= 0) {
prev = work[i];
appendFound(builder, nextLookup);
}
}
i++;
}
builder.endPositionEntry();
}

/**
* Writes a deduplicated {@link #work} to a {@link BatchEncoder.BytesRefs}.
*/
Expand All @@ -401,7 +564,7 @@ private void convertSortedWorkToUnique() {
int end = w;
w = 1;
for (int i = 1; i < end; i++) {
if (false == prev.equals(work[i])) {
if (false == valuesEqual(prev, work[i])) {
prev = work[i];
work[w].bytes = prev.bytes;
work[w].offset = prev.offset;
Expand All @@ -423,7 +586,28 @@ private void fillWork(int from, int to) {
}
}

private void hash(IntBlock.Builder builder, BytesRefHash hash, BytesRef v) {
builder.appendInt(Math.toIntExact(BlockHash.hashOrdToGroupNullReserved(hash.add(v))));
private void hashAdd(IntBlock.Builder builder, BytesRefHash hash, BytesRef v) {
appendFound(builder, hash.add(v));
}

private long hashLookup(BytesRefHash hash, BytesRef v) {
return hash.find(v);
}

private void hashLookupSingle(IntBlock.Builder builder, BytesRefHash hash, BytesRef v) {
long found = hashLookup(hash, v);
if (found >= 0) {
appendFound(builder, found);
} else {
builder.appendNull();
}
}

private void appendFound(IntBlock.Builder builder, long found) {
builder.appendInt(Math.toIntExact(BlockHash.hashOrdToGroupNullReserved(found)));
}

private static boolean valuesEqual(BytesRef lhs, BytesRef rhs) {
return lhs.equals(rhs);
}
}

0 comments on commit ead585f

Please sign in to comment.