Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/138631.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 138631
summary: Improved bulk loading for binary doc values
area: Codec
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -364,13 +364,13 @@ private BinaryDocValues getCompressedBinary(BinaryEntry entry) throws IOExceptio
final RandomAccessInput addressesData = this.data.randomAccessSlice(entry.addressesOffset, entry.addressesLength);
final LongValues addresses = DirectMonotonicReader.getInstance(entry.addressesMeta, addressesData);

final RandomAccessInput docRangeData = this.data.randomAccessSlice(entry.docRangeOffset, entry.docRangeLength);
final DirectMonotonicReader docRanges = DirectMonotonicReader.getInstance(entry.docRangeMeta, docRangeData);
final RandomAccessInput docOffsetsData = this.data.randomAccessSlice(entry.docOffsetsOffset, entry.docOffsetLength);
final DirectMonotonicReader docOffsets = DirectMonotonicReader.getInstance(entry.docOffsetMeta, docOffsetsData);
return new DenseBinaryDocValues(maxDoc) {
final BinaryDecoder decoder = new BinaryDecoder(
entry.compression.compressionMode().newDecompressor(),
addresses,
docRanges,
docOffsets,
data.clone(),
entry.maxUncompressedChunkSize,
entry.maxNumDocsInAnyBlock
Expand All @@ -391,13 +391,22 @@ public BlockLoader.Block tryRead(
boolean toInt
) throws IOException {
int count = docs.count() - offset;
try (var builder = factory.bytesRefs(count)) {
for (int i = offset; i < docs.count(); i++) {
doc = docs.get(i);
var bytes = decoder.decode(doc, entry.numCompressedBlocks);
builder.appendBytesRef(bytes);
int firstDocId = docs.get(offset);
int lastDocId = docs.get(count - 1);
doc = lastDocId;

if (isDense(firstDocId, lastDocId, count)) {
try (var builder = factory.singletonBytesRefs(count)) {
decoder.decodeBulk(entry.numCompressedBlocks, firstDocId, count, builder);
return builder.build();
}
} else {
try (var builder = factory.bytesRefs(count)) {
for (int i = offset; i < docs.count(); i++) {
builder.appendBytesRef(decoder.decode(docs.get(i), entry.numCompressedBlocks));
}
return builder.build();
}
return builder.build();
}
}
};
Expand All @@ -414,13 +423,13 @@ public BlockLoader.Block tryRead(
final RandomAccessInput addressesData = this.data.randomAccessSlice(entry.addressesOffset, entry.addressesLength);
final LongValues addresses = DirectMonotonicReader.getInstance(entry.addressesMeta, addressesData);

final RandomAccessInput docRangeData = this.data.randomAccessSlice(entry.docRangeOffset, entry.docRangeLength);
final DirectMonotonicReader docRanges = DirectMonotonicReader.getInstance(entry.docRangeMeta, docRangeData);
final RandomAccessInput docOffsetsData = this.data.randomAccessSlice(entry.docOffsetsOffset, entry.docOffsetLength);
final DirectMonotonicReader docOffsets = DirectMonotonicReader.getInstance(entry.docOffsetMeta, docOffsetsData);
return new SparseBinaryDocValues(disi) {
final BinaryDecoder decoder = new BinaryDecoder(
entry.compression.compressionMode().newDecompressor(),
addresses,
docRanges,
docOffsets,
data.clone(),
entry.maxUncompressedChunkSize,
entry.maxNumDocsInAnyBlock
Expand All @@ -439,7 +448,7 @@ public BytesRef binaryValue() throws IOException {
static final class BinaryDecoder {

private final LongValues addresses;
private final DirectMonotonicReader docRanges;
private final DirectMonotonicReader docOffsets;
private final IndexInput compressedData;
// Cache of last uncompressed block
private long lastBlockId = -1;
Expand All @@ -453,14 +462,14 @@ static final class BinaryDecoder {
BinaryDecoder(
Decompressor decompressor,
LongValues addresses,
DirectMonotonicReader docRanges,
DirectMonotonicReader docOffsets,
IndexInput compressedData,
int biggestUncompressedBlockSize,
int maxNumDocsInAnyBlock
) {
this.decompressor = decompressor;
this.addresses = addresses;
this.docRanges = docRanges;
this.docOffsets = docOffsets;
this.compressedData = compressedData;
// pre-allocate a byte array large enough for the biggest uncompressed block needed.
this.uncompressedBlock = new byte[biggestUncompressedBlockSize];
Expand Down Expand Up @@ -509,8 +518,12 @@ void deltaDecode(int[] arr, int length) {
}
}

long findAndUpdateBlock(DirectMonotonicReader docRanges, long lastBlockId, int docNumber, int numBlocks) {
long index = docRanges.binarySearch(lastBlockId + 1, numBlocks, docNumber);
long findAndUpdateBlock(int docNumber, int numBlocks) {
if (docNumber < limitDocNumForBlock && lastBlockId >= 0) {
return lastBlockId;
}

long index = docOffsets.binarySearch(lastBlockId + 1, numBlocks, docNumber);
// If index is found, index is inclusive lower bound of docNum range, so docNum is in blockId == index
if (index < 0) {
// If index was not found, insertion point (-index - 1) will be upper bound of docNum range.
Expand All @@ -519,14 +532,25 @@ long findAndUpdateBlock(DirectMonotonicReader docRanges, long lastBlockId, int d
}
assert index < numBlocks : "invalid range " + index + " for doc " + docNumber + " in numBlocks " + numBlocks;

startDocNumForBlock = docRanges.get(index);
limitDocNumForBlock = docRanges.get(index + 1);
startDocNumForBlock = docOffsets.get(index);
limitDocNumForBlock = docOffsets.get(index + 1);
return index;
}

// If query is over adjacent values we can scan forward through blocks, rather than binary searching for the next block.
long findAndUpdateBlockByScanning(int docNumber) {
if (docNumber < limitDocNumForBlock && lastBlockId >= 0) {
return lastBlockId;
}
long blockId = lastBlockId + 1;
startDocNumForBlock = docOffsets.get(blockId);
limitDocNumForBlock = docOffsets.get(blockId + 1);
return blockId;
}

BytesRef decode(int docNumber, int numBlocks) throws IOException {
// docNumber, rather than docId, because these are dense and could be indices from a DISI
long blockId = docNumber < limitDocNumForBlock ? lastBlockId : findAndUpdateBlock(docRanges, lastBlockId, docNumber, numBlocks);
long blockId = findAndUpdateBlock(docNumber, numBlocks);

int numDocsInBlock = (int) (limitDocNumForBlock - startDocNumForBlock);
int idxInBlock = (int) (docNumber - startDocNumForBlock);
Expand All @@ -544,6 +568,80 @@ BytesRef decode(int docNumber, int numBlocks) throws IOException {
uncompressedBytesRef.length = end - start;
return uncompressedBytesRef;
}

int computeMultipleBlockBufferSize(int count, int firstDoc, long firstBlockId, long numBlocks) throws IOException {
IndexInput readAhead = compressedData.clone();
int lastDoc = firstDoc + count - 1;
int requiredBufferSize = 0;

for (long blockId = firstBlockId; blockId < numBlocks; blockId++) {
long blockStartOffset = addresses.get(blockId);
readAhead.seek(blockStartOffset);
readAhead.readByte(); // skip BlockHeader
int uncompressedBlockLength = readAhead.readVInt();
requiredBufferSize += uncompressedBlockLength;

long blockLimit = docOffsets.get(blockId + 1);
if (lastDoc < blockLimit) {
break;
}
}
return requiredBufferSize;
}

void decodeBulk(int numBlocks, int firstDoc, int count, BlockLoader.SingletonBytesRefBuilder builder) throws IOException {
int remainingCount = count;
int nextDoc = firstDoc;
int blockDocOffset = 0;
int blockByteOffset = 0;

// Need to binary search forward for first blockId, but since query is dense, can scan from then on.
// This block contains at least one value for range.
long firstBlockId = findAndUpdateBlock(nextDoc, numBlocks);
long[] offsets = new long[count + 1];
int bufferSize = computeMultipleBlockBufferSize(count, firstDoc, firstBlockId, numBlocks);
byte[] bytes = new byte[bufferSize];

while (remainingCount > 0) {
long blockId = findAndUpdateBlockByScanning(nextDoc);
int numDocsInBlock = (int) (limitDocNumForBlock - startDocNumForBlock);
int idxFirstDocInBlock = (int) (nextDoc - startDocNumForBlock);
int countInBlock = Math.min(numDocsInBlock - idxFirstDocInBlock, remainingCount);

assert idxFirstDocInBlock < numDocsInBlock;
assert countInBlock <= numDocsInBlock;

if (blockId != lastBlockId) {
decompressBlock((int) blockId, numDocsInBlock);
// uncompressedBytesRef and uncompressedDocStarts now populated
lastBlockId = blockId;
}

// Copy offsets for block into combined offset array
int startOffset = uncompressedDocStarts[idxFirstDocInBlock];
int endOffset = uncompressedDocStarts[idxFirstDocInBlock + countInBlock];
int lenValuesInBlock = endOffset - startOffset;
for (int i = 0; i < countInBlock; i++) {
int byteOffsetInBlock = uncompressedDocStarts[idxFirstDocInBlock + i + 1] - startOffset;
offsets[blockDocOffset + i + 1] = byteOffsetInBlock + blockByteOffset;
}

// Copy uncompressedBlock bytes into buffer for multiple blocks
System.arraycopy(uncompressedBlock, startOffset, bytes, blockByteOffset, lenValuesInBlock);

nextDoc += countInBlock;
remainingCount -= countInBlock;
blockDocOffset += countInBlock;
blockByteOffset += lenValuesInBlock;
}

int totalLen = Math.toIntExact(offsets[count]);
if (totalLen == 0) {
builder.appendBytesRefs(new byte[0], 0);
} else {
builder.appendBytesRefs(bytes, offsets);
}
}
}

abstract static class DenseBinaryDocValues extends BinaryDocValues implements BlockLoader.OptionalColumnAtATimeReader {
Expand Down Expand Up @@ -1533,9 +1631,9 @@ private BinaryEntry readBinary(IndexInput meta, int version) throws IOException
entry.addressesMeta = DirectMonotonicReader.loadMeta(meta, numCompressedChunks + 1, blockShift);
entry.addressesLength = meta.readLong();

entry.docRangeOffset = meta.readLong();
entry.docRangeMeta = DirectMonotonicReader.loadMeta(meta, numCompressedChunks + 1, blockShift);
entry.docRangeLength = meta.readLong();
entry.docOffsetsOffset = meta.readLong();
entry.docOffsetMeta = DirectMonotonicReader.loadMeta(meta, numCompressedChunks + 1, blockShift);
entry.docOffsetLength = meta.readLong();

entry.numCompressedBlocks = numCompressedChunks;
}
Expand Down Expand Up @@ -2230,14 +2328,14 @@ static class BinaryEntry {
int maxLength;
long addressesOffset;
long addressesLength;
long docRangeOffset;
long docRangeLength;
long docOffsetsOffset;
long docOffsetLength;
// compression mode
int maxUncompressedChunkSize;
int maxNumDocsInAnyBlock;
int numCompressedBlocks;
DirectMonotonicReader.Meta addressesMeta;
DirectMonotonicReader.Meta docRangeMeta;
DirectMonotonicReader.Meta docOffsetMeta;

BinaryEntry(BinaryDVCompressionMode compression) {
this.compression = compression;
Expand Down