Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/137995.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 137995
summary: Improve bulk loading of binary doc values
area: Mapping
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,10 @@ public ByteArray newByteArray(long size, boolean clearOnResize) {
}
}

public ByteArray newByteArrayWrapper(byte[] bytes) {
return validate(new ByteArrayWrapper(this, bytes, bytes.length, null, false));
}

/**
* Allocate a new {@link ByteArray} initialized with zeros.
* @param size the initial length of the array
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public BytesRefArray(StreamInput in, BigArrays bigArrays) throws IOException {
}
}

private BytesRefArray(LongArray startOffsets, ByteArray bytes, long size, BigArrays bigArrays) {
public BytesRefArray(LongArray startOffsets, ByteArray bytes, long size, BigArrays bigArrays) {
this.bytes = bytes;
this.startOffsets = startOffsets;
this.size = size;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,13 +220,27 @@ public BlockLoader.Block tryRead(
boolean toInt
) throws IOException {
int count = docs.count() - offset;
try (var builder = factory.bytesRefs(count)) {
for (int i = offset; i < docs.count(); i++) {
doc = docs.get(i);
bytesSlice.readBytes((long) doc * length, bytes.bytes, 0, length);
builder.appendBytesRef(bytes);
int firstDocId = docs.get(offset);
int lastDocId = docs.get(count - 1);
doc = lastDocId;

if (isDense(firstDocId, lastDocId, count)) {
try (var builder = factory.singletonBytesRefs(count)) {
int bulkLength = length * count;
byte[] bytes = new byte[bulkLength];
bytesSlice.readBytes((long) firstDocId * length, bytes, 0, bulkLength);
builder.appendBytesRefs(bytes, length);
return builder.build();
}
} else {
try (var builder = factory.bytesRefs(count)) {
for (int i = offset; i < docs.count(); i++) {
int docId = docs.get(i);
bytesSlice.readBytes((long) docId * length, bytes.bytes, 0, length);
builder.appendBytesRef(bytes);
}
return builder.build();
}
return builder.build();
}
}
};
Expand Down Expand Up @@ -255,15 +269,39 @@ public BlockLoader.Block tryRead(
boolean toInt
) throws IOException {
int count = docs.count() - offset;
try (var builder = factory.bytesRefs(count)) {
for (int i = offset; i < docs.count(); i++) {
doc = docs.get(i);
long startOffset = addresses.get(doc);
bytes.length = (int) (addresses.get(doc + 1L) - startOffset);
bytesSlice.readBytes(startOffset, bytes.bytes, 0, bytes.length);
builder.appendBytesRef(bytes);
int firstDocId = docs.get(offset);
int lastDocId = docs.get(count - 1);
doc = lastDocId;

if (isDense(firstDocId, lastDocId, count)) {
try (var builder = factory.singletonBytesRefs(count)) {
long[] offsets = new long[count + 1];

// Normalize offsets so that first offset is 0
long startOffset = addresses.get(firstDocId);
for (int i = offset, j = 1; i < docs.count(); i++, j++) {
int docId = docs.get(i);
long nextOffset = addresses.get(docId + 1) - startOffset;
offsets[j] = nextOffset;
}

int length = Math.toIntExact(addresses.get(lastDocId + 1L) - startOffset);
byte[] bytes = new byte[length];
bytesSlice.readBytes(startOffset, bytes, 0, length);
builder.appendBytesRefs(bytes, offsets);
return builder.build();
}
} else {
try (var builder = factory.bytesRefs(count)) {
for (int i = offset; i < docs.count(); i++) {
int docId = docs.get(i);
long startOffset = addresses.get(docId);
bytes.length = (int) (addresses.get(docId + 1L) - startOffset);
bytesSlice.readBytes(startOffset, bytes.bytes, 0, bytes.length);
builder.appendBytesRef(bytes);
}
return builder.build();
}
return builder.build();
}
}
};
Expand Down Expand Up @@ -1556,13 +1594,6 @@ long lookAheadValueAt(int targetDoc) throws IOException {
return lookaheadBlock[valueIndex];
}

static boolean isDense(int firstDocId, int lastDocId, int length) {
// This does not detect duplicate docids (e.g [1, 1, 2, 4] would be detected as dense),
// this can happen with enrich or lookup. However this codec isn't used for enrich / lookup.
// This codec is only used in the context of logsdb and tsdb, so this is fine here.
return lastDocId - firstDocId == length - 1;
}

@Override
SortedOrdinalReader sortedOrdinalReader() {
return null;
Expand Down Expand Up @@ -1681,6 +1712,13 @@ public BlockLoader.Block tryRead(
}
}

private static boolean isDense(int firstDocId, int lastDocId, int length) {
// This does not detect duplicate docids (e.g [1, 1, 2, 4] would be detected as dense),
// this can happen with enrich or lookup. However this codec isn't used for enrich / lookup.
// This codec is only used in the context of logsdb and tsdb, so this is fine here.
return lastDocId - firstDocId == length - 1;
}

private NumericDocValues getRangeEncodedNumericDocValues(NumericEntry entry, long maxOrd) throws IOException {
final var ordinalsReader = new SortedOrdinalReader(
maxOrd,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,17 @@ interface BlockFactory {
*/
BytesRefBuilder bytesRefs(int expectedCount);

/**
* Build a specialized builder for singleton dense {@link BytesRef} fields with the following constraints:
* <ul>
* <li>Only one value per document can be collected</li>
* <li>No more than expectedCount values can be collected</li>
* </ul>
*
* @param expectedCount The maximum number of values to be collected.
*/
SingletonBytesRefBuilder singletonBytesRefs(int expectedCount);

/**
* Build a builder to load doubles as loaded from doc values.
* Doc values load doubles in sorted order.
Expand Down Expand Up @@ -574,6 +585,22 @@ interface BytesRefBuilder extends Builder {
BytesRefBuilder appendBytesRef(BytesRef value);
}

/**
* Specialized builder for collecting dense arrays of BytesRef values.
*/
interface SingletonBytesRefBuilder extends Builder {
/**
* Append multiple BytesRef. Offsets contains offsets of each BytesRef in the byte array.
* The length of the offsets array is one more than the number of BytesRefs.
*/
SingletonBytesRefBuilder appendBytesRefs(byte[] bytes, long[] offsets) throws IOException;

/**
* Append multiple BytesRefs, all with the same length.
*/
SingletonBytesRefBuilder appendBytesRefs(byte[] bytes, long bytesRefLengths) throws IOException;
}

interface FloatBuilder extends Builder {
/**
* Appends a float to the current entry.
Expand Down
Loading