diff --git a/docs/changelog/138631.yaml b/docs/changelog/138631.yaml new file mode 100644 index 0000000000000..9e6d254879d45 --- /dev/null +++ b/docs/changelog/138631.yaml @@ -0,0 +1,5 @@ +pr: 138631 +summary: Improved bulk loading for binary doc values +area: Codec +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java index 8100685cb4df3..74b98f6cdfca8 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java @@ -364,13 +364,13 @@ private BinaryDocValues getCompressedBinary(BinaryEntry entry) throws IOExceptio final RandomAccessInput addressesData = this.data.randomAccessSlice(entry.addressesOffset, entry.addressesLength); final LongValues addresses = DirectMonotonicReader.getInstance(entry.addressesMeta, addressesData); - final RandomAccessInput docRangeData = this.data.randomAccessSlice(entry.docRangeOffset, entry.docRangeLength); - final DirectMonotonicReader docRanges = DirectMonotonicReader.getInstance(entry.docRangeMeta, docRangeData); + final RandomAccessInput docOffsetsData = this.data.randomAccessSlice(entry.docOffsetsOffset, entry.docOffsetLength); + final DirectMonotonicReader docOffsets = DirectMonotonicReader.getInstance(entry.docOffsetMeta, docOffsetsData); return new DenseBinaryDocValues(maxDoc) { final BinaryDecoder decoder = new BinaryDecoder( entry.compression.compressionMode().newDecompressor(), addresses, - docRanges, + docOffsets, data.clone(), entry.maxUncompressedChunkSize, entry.maxNumDocsInAnyBlock @@ -391,13 +391,22 @@ public BlockLoader.Block tryRead( boolean toInt ) throws IOException { int count = docs.count() - offset; - try (var builder = factory.bytesRefs(count)) { - for (int i = offset; i < docs.count(); i++) { - doc = docs.get(i); - var bytes = decoder.decode(doc, entry.numCompressedBlocks); - builder.appendBytesRef(bytes); + int firstDocId = docs.get(offset); + int lastDocId = docs.get(count - 1); + doc = lastDocId; + + if (isDense(firstDocId, lastDocId, count)) { + try (var builder = factory.singletonBytesRefs(count)) { + decoder.decodeBulk(entry.numCompressedBlocks, firstDocId, count, builder); + return builder.build(); + } + } else { + try (var builder = factory.bytesRefs(count)) { + for (int i = offset; i < docs.count(); i++) { + builder.appendBytesRef(decoder.decode(docs.get(i), entry.numCompressedBlocks)); + } + return builder.build(); } - return builder.build(); } } }; @@ -414,13 +423,13 @@ public BlockLoader.Block tryRead( final RandomAccessInput addressesData = this.data.randomAccessSlice(entry.addressesOffset, entry.addressesLength); final LongValues addresses = DirectMonotonicReader.getInstance(entry.addressesMeta, addressesData); - final RandomAccessInput docRangeData = this.data.randomAccessSlice(entry.docRangeOffset, entry.docRangeLength); - final DirectMonotonicReader docRanges = DirectMonotonicReader.getInstance(entry.docRangeMeta, docRangeData); + final RandomAccessInput docOffsetsData = this.data.randomAccessSlice(entry.docOffsetsOffset, entry.docOffsetLength); + final DirectMonotonicReader docOffsets = DirectMonotonicReader.getInstance(entry.docOffsetMeta, docOffsetsData); return new SparseBinaryDocValues(disi) { final BinaryDecoder decoder = new BinaryDecoder( entry.compression.compressionMode().newDecompressor(), addresses, - docRanges, + docOffsets, data.clone(), entry.maxUncompressedChunkSize, entry.maxNumDocsInAnyBlock @@ -439,7 +448,7 @@ public BytesRef binaryValue() throws IOException { static final class BinaryDecoder { private final LongValues addresses; - private final DirectMonotonicReader docRanges; + private final DirectMonotonicReader docOffsets; private final IndexInput compressedData; // Cache of last uncompressed block private long lastBlockId = -1; @@ -453,14 +462,14 @@ static final class BinaryDecoder { BinaryDecoder( Decompressor decompressor, LongValues addresses, - DirectMonotonicReader docRanges, + DirectMonotonicReader docOffsets, IndexInput compressedData, int biggestUncompressedBlockSize, int maxNumDocsInAnyBlock ) { this.decompressor = decompressor; this.addresses = addresses; - this.docRanges = docRanges; + this.docOffsets = docOffsets; this.compressedData = compressedData; // pre-allocate a byte array large enough for the biggest uncompressed block needed. this.uncompressedBlock = new byte[biggestUncompressedBlockSize]; @@ -509,8 +518,12 @@ void deltaDecode(int[] arr, int length) { } } - long findAndUpdateBlock(DirectMonotonicReader docRanges, long lastBlockId, int docNumber, int numBlocks) { - long index = docRanges.binarySearch(lastBlockId + 1, numBlocks, docNumber); + long findAndUpdateBlock(int docNumber, int numBlocks) { + if (docNumber < limitDocNumForBlock && lastBlockId >= 0) { + return lastBlockId; + } + + long index = docOffsets.binarySearch(lastBlockId + 1, numBlocks, docNumber); // If index is found, index is inclusive lower bound of docNum range, so docNum is in blockId == index if (index < 0) { // If index was not found, insertion point (-index - 1) will be upper bound of docNum range. @@ -519,14 +532,25 @@ long findAndUpdateBlock(DirectMonotonicReader docRanges, long lastBlockId, int d } assert index < numBlocks : "invalid range " + index + " for doc " + docNumber + " in numBlocks " + numBlocks; - startDocNumForBlock = docRanges.get(index); - limitDocNumForBlock = docRanges.get(index + 1); + startDocNumForBlock = docOffsets.get(index); + limitDocNumForBlock = docOffsets.get(index + 1); return index; } + // If query is over adjacent values we can scan forward through blocks, rather than binary searching for the next block. + long findAndUpdateBlockByScanning(int docNumber) { + if (docNumber < limitDocNumForBlock && lastBlockId >= 0) { + return lastBlockId; + } + long blockId = lastBlockId + 1; + startDocNumForBlock = docOffsets.get(blockId); + limitDocNumForBlock = docOffsets.get(blockId + 1); + return blockId; + } + BytesRef decode(int docNumber, int numBlocks) throws IOException { // docNumber, rather than docId, because these are dense and could be indices from a DISI - long blockId = docNumber < limitDocNumForBlock ? lastBlockId : findAndUpdateBlock(docRanges, lastBlockId, docNumber, numBlocks); + long blockId = findAndUpdateBlock(docNumber, numBlocks); int numDocsInBlock = (int) (limitDocNumForBlock - startDocNumForBlock); int idxInBlock = (int) (docNumber - startDocNumForBlock); @@ -544,6 +568,80 @@ BytesRef decode(int docNumber, int numBlocks) throws IOException { uncompressedBytesRef.length = end - start; return uncompressedBytesRef; } + + int computeMultipleBlockBufferSize(int count, int firstDoc, long firstBlockId, long numBlocks) throws IOException { + IndexInput readAhead = compressedData.clone(); + int lastDoc = firstDoc + count - 1; + int requiredBufferSize = 0; + + for (long blockId = firstBlockId; blockId < numBlocks; blockId++) { + long blockStartOffset = addresses.get(blockId); + readAhead.seek(blockStartOffset); + readAhead.readByte(); // skip BlockHeader + int uncompressedBlockLength = readAhead.readVInt(); + requiredBufferSize += uncompressedBlockLength; + + long blockLimit = docOffsets.get(blockId + 1); + if (lastDoc < blockLimit) { + break; + } + } + return requiredBufferSize; + } + + void decodeBulk(int numBlocks, int firstDoc, int count, BlockLoader.SingletonBytesRefBuilder builder) throws IOException { + int remainingCount = count; + int nextDoc = firstDoc; + int blockDocOffset = 0; + int blockByteOffset = 0; + + // Need to binary search forward for first blockId, but since query is dense, can scan from then on. + // This block contains at least one value for range. + long firstBlockId = findAndUpdateBlock(nextDoc, numBlocks); + long[] offsets = new long[count + 1]; + int bufferSize = computeMultipleBlockBufferSize(count, firstDoc, firstBlockId, numBlocks); + byte[] bytes = new byte[bufferSize]; + + while (remainingCount > 0) { + long blockId = findAndUpdateBlockByScanning(nextDoc); + int numDocsInBlock = (int) (limitDocNumForBlock - startDocNumForBlock); + int idxFirstDocInBlock = (int) (nextDoc - startDocNumForBlock); + int countInBlock = Math.min(numDocsInBlock - idxFirstDocInBlock, remainingCount); + + assert idxFirstDocInBlock < numDocsInBlock; + assert countInBlock <= numDocsInBlock; + + if (blockId != lastBlockId) { + decompressBlock((int) blockId, numDocsInBlock); + // uncompressedBytesRef and uncompressedDocStarts now populated + lastBlockId = blockId; + } + + // Copy offsets for block into combined offset array + int startOffset = uncompressedDocStarts[idxFirstDocInBlock]; + int endOffset = uncompressedDocStarts[idxFirstDocInBlock + countInBlock]; + int lenValuesInBlock = endOffset - startOffset; + for (int i = 0; i < countInBlock; i++) { + int byteOffsetInBlock = uncompressedDocStarts[idxFirstDocInBlock + i + 1] - startOffset; + offsets[blockDocOffset + i + 1] = byteOffsetInBlock + blockByteOffset; + } + + // Copy uncompressedBlock bytes into buffer for multiple blocks + System.arraycopy(uncompressedBlock, startOffset, bytes, blockByteOffset, lenValuesInBlock); + + nextDoc += countInBlock; + remainingCount -= countInBlock; + blockDocOffset += countInBlock; + blockByteOffset += lenValuesInBlock; + } + + int totalLen = Math.toIntExact(offsets[count]); + if (totalLen == 0) { + builder.appendBytesRefs(new byte[0], 0); + } else { + builder.appendBytesRefs(bytes, offsets); + } + } } abstract static class DenseBinaryDocValues extends BinaryDocValues implements BlockLoader.OptionalColumnAtATimeReader { @@ -1533,9 +1631,9 @@ private BinaryEntry readBinary(IndexInput meta, int version) throws IOException entry.addressesMeta = DirectMonotonicReader.loadMeta(meta, numCompressedChunks + 1, blockShift); entry.addressesLength = meta.readLong(); - entry.docRangeOffset = meta.readLong(); - entry.docRangeMeta = DirectMonotonicReader.loadMeta(meta, numCompressedChunks + 1, blockShift); - entry.docRangeLength = meta.readLong(); + entry.docOffsetsOffset = meta.readLong(); + entry.docOffsetMeta = DirectMonotonicReader.loadMeta(meta, numCompressedChunks + 1, blockShift); + entry.docOffsetLength = meta.readLong(); entry.numCompressedBlocks = numCompressedChunks; } @@ -2230,14 +2328,14 @@ static class BinaryEntry { int maxLength; long addressesOffset; long addressesLength; - long docRangeOffset; - long docRangeLength; + long docOffsetsOffset; + long docOffsetLength; // compression mode int maxUncompressedChunkSize; int maxNumDocsInAnyBlock; int numCompressedBlocks; DirectMonotonicReader.Meta addressesMeta; - DirectMonotonicReader.Meta docRangeMeta; + DirectMonotonicReader.Meta docOffsetMeta; BinaryEntry(BinaryDVCompressionMode compression) { this.compression = compression;