Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up ByteBlockPool #12506

Merged
merged 11 commits into from
Nov 6, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.util.BitUtil;
import org.apache.lucene.util.ByteBlockPool;
import org.apache.lucene.util.ByteSlicePool;

/* IndexInput that knows how to read the byte slices written
* by Posting and PostingVector. We read the bytes in
Expand Down Expand Up @@ -50,10 +51,10 @@ public void init(ByteBlockPool pool, int startIndex, int endIndex) {
level = 0;
bufferUpto = startIndex / ByteBlockPool.BYTE_BLOCK_SIZE;
bufferOffset = bufferUpto * ByteBlockPool.BYTE_BLOCK_SIZE;
buffer = pool.buffers[bufferUpto];
buffer = pool.getBuffer(bufferUpto);
upto = startIndex & ByteBlockPool.BYTE_BLOCK_MASK;

final int firstSize = TermsHashPerField.LEVEL_SIZE_ARRAY[0];
final int firstSize = ByteSlicePool.LEVEL_SIZE_ARRAY[0];

if (startIndex + firstSize >= endIndex) {
// There is only this one slice to read
Expand Down Expand Up @@ -97,13 +98,13 @@ public void nextSlice() {
// Skip to our next slice
final int nextIndex = (int) BitUtil.VH_LE_INT.get(buffer, limit);

level = TermsHashPerField.NEXT_LEVEL_ARRAY[level];
final int newSize = TermsHashPerField.LEVEL_SIZE_ARRAY[level];
level = ByteSlicePool.NEXT_LEVEL_ARRAY[level];
final int newSize = ByteSlicePool.LEVEL_SIZE_ARRAY[level];

bufferUpto = nextIndex / ByteBlockPool.BYTE_BLOCK_SIZE;
bufferOffset = bufferUpto * ByteBlockPool.BYTE_BLOCK_SIZE;

buffer = pool.buffers[bufferUpto];
buffer = pool.getBuffer(bufferUpto);
upto = nextIndex & ByteBlockPool.BYTE_BLOCK_MASK;

if (nextIndex + newSize >= endIndex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.io.IOException;
import org.apache.lucene.util.BitUtil;
import org.apache.lucene.util.ByteBlockPool;
import org.apache.lucene.util.ByteSlicePool;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefHash;
import org.apache.lucene.util.BytesRefHash.BytesStartArray;
Expand All @@ -39,6 +40,7 @@ abstract class TermsHashPerField implements Comparable<TermsHashPerField> {
private final TermsHashPerField nextPerField;
private final IntBlockPool intPool;
final ByteBlockPool bytePool;
private final ByteSlicePool slicePool;
// for each term we store an integer per stream that points into the bytePool above
// the address is updated once data is written to the stream to point to the next free offset
// in the terms stream. The start address for the stream is stored in
Expand Down Expand Up @@ -73,6 +75,7 @@ abstract class TermsHashPerField implements Comparable<TermsHashPerField> {
IndexOptions indexOptions) {
this.intPool = intPool;
this.bytePool = bytePool;
this.slicePool = new ByteSlicePool(bytePool);
this.streamCount = streamCount;
this.fieldName = fieldName;
this.nextPerField = nextPerField;
Expand All @@ -98,7 +101,7 @@ final void initReader(ByteSliceReader reader, int termID, int stream) {
final int offsetInAddressBuffer = streamStartOffset & IntBlockPool.INT_BLOCK_MASK;
reader.init(
bytePool,
postingsArray.byteStarts[termID] + stream * FIRST_LEVEL_SIZE,
postingsArray.byteStarts[termID] + stream * ByteSlicePool.FIRST_LEVEL_SIZE,
streamAddressBuffer[offsetInAddressBuffer + stream]);
}

Expand Down Expand Up @@ -143,8 +146,8 @@ private void add(int textStart, final int docID) throws IOException {

/**
* Called when we first encounter a new term. We must allocate slies to store the postings (vInt
* compressed doc/freq/prox), and also the int pointers to where (in our ByteBlockPool storage)
* the postings for this term begin.
* compressed doc/freq/prox), and also the int pointers to where (in our {@link ByteBlockPool}
* storage) the postings for this term begin.
*/
private void initStreamSlices(int termID, int docID) throws IOException {
// Init stream slices
Expand All @@ -154,7 +157,8 @@ private void initStreamSlices(int termID, int docID) throws IOException {
intPool.nextBuffer();
}

if (ByteBlockPool.BYTE_BLOCK_SIZE - bytePool.byteUpto < (2 * streamCount) * FIRST_LEVEL_SIZE) {
if (ByteBlockPool.BYTE_BLOCK_SIZE - bytePool.byteUpto
< (2 * streamCount) * ByteSlicePool.FIRST_LEVEL_SIZE) {
// can we fit at least one byte per stream in the current buffer, if not allocate a new one
bytePool.nextBuffer();
}
Expand All @@ -168,7 +172,7 @@ private void initStreamSlices(int termID, int docID) throws IOException {
for (int i = 0; i < streamCount; i++) {
// initialize each stream with a slice we start with ByteBlockPool.FIRST_LEVEL_SIZE)
// and grow as we need more space. see ByteBlockPool.LEVEL_SIZE_ARRAY
final int upto = newSlice(bytePool, FIRST_LEVEL_SIZE, 0);
final int upto = slicePool.newSlice(ByteSlicePool.FIRST_LEVEL_SIZE);
termStreamAddressBuffer[streamAddressOffset + i] = upto + bytePool.byteOffset;
}
postingsArray.byteStarts[termID] = termStreamAddressBuffer[streamAddressOffset];
Expand Down Expand Up @@ -216,12 +220,12 @@ private int positionStreamSlice(int termID, final int docID) throws IOException
final void writeByte(int stream, byte b) {
int streamAddress = streamAddressOffset + stream;
int upto = termStreamAddressBuffer[streamAddress];
byte[] bytes = bytePool.buffers[upto >> ByteBlockPool.BYTE_BLOCK_SHIFT];
byte[] bytes = bytePool.getBuffer(upto >> ByteBlockPool.BYTE_BLOCK_SHIFT);
assert bytes != null;
int offset = upto & ByteBlockPool.BYTE_BLOCK_MASK;
if (bytes[offset] != 0) {
// End of slice; allocate a new one
offset = allocSlice(bytePool, bytes, offset);
offset = slicePool.allocSlice(bytes, offset);
bytes = bytePool.buffer;
termStreamAddressBuffer[streamAddress] = offset + bytePool.byteOffset;
}
Expand All @@ -233,7 +237,7 @@ final void writeBytes(int stream, byte[] b, int offset, int len) {
final int end = offset + len;
int streamAddress = streamAddressOffset + stream;
int upto = termStreamAddressBuffer[streamAddress];
byte[] slice = bytePool.buffers[upto >> ByteBlockPool.BYTE_BLOCK_SHIFT];
byte[] slice = bytePool.getBuffer(upto >> ByteBlockPool.BYTE_BLOCK_SHIFT);
assert slice != null;
int sliceOffset = upto & ByteBlockPool.BYTE_BLOCK_MASK;

Expand All @@ -243,7 +247,7 @@ final void writeBytes(int stream, byte[] b, int offset, int len) {
}

while (offset < end) {
int offsetAndLength = allocKnownSizeSlice(bytePool, slice, sliceOffset);
int offsetAndLength = slicePool.allocKnownSizeSlice(slice, sliceOffset);
sliceOffset = offsetAndLength >> 8;
int sliceLength = offsetAndLength & 0xff;
slice = bytePool.buffer;
Expand Down
100 changes: 61 additions & 39 deletions lucene/core/src/java/org/apache/lucene/util/ByteBlockPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,37 @@
import java.util.List;

/**
* Represents a logical byte[] as a series of blocks. You can write into it by using append and read
* using the offset position (random access). The buffers can be reset to reuse the allocated
* buffers.
* This class enables the allocation of fixed-size buffers and their management as part of a buffer
* array. Allocation is done through the use of an {@link Allocator} which can be customized, e.g.
* to allow recycling old buffers. There are methods for writing ({@link #append(BytesRef)} and
* reading from the buffers (e.g. {@link #readBytes(int, byte[], int, int)}, which handle read/write
* operations across buffer boundaries.
*
* @lucene.internal
*/
public final class ByteBlockPool implements Accountable {
private static final long BASE_RAM_BYTES =
RamUsageEstimator.shallowSizeOfInstance(ByteBlockPool.class);

/**
* Use this to find the index of the buffer containing a byte, given an offset to that byte.
*
* <p>bufferUpto = globalOffset >> BYTE_BLOCK_SHIFT
*
* <p>bufferUpto = globalOffset / BYTE_BLOCK_SIZE
*/
public static final int BYTE_BLOCK_SHIFT = 15;

/** The size of each buffer in the pool. */
public static final int BYTE_BLOCK_SIZE = 1 << BYTE_BLOCK_SHIFT;

/**
* Use this to find the position of a global offset in a particular buffer.
*
* <p>positionInCurrentBuffer = globalOffset &amp; BYTE_BLOCK_MASK
*
* <p>positionInCurrentBuffer = globalOffset % BYTE_BLOCK_SIZE
*/
public static final int BYTE_BLOCK_MASK = BYTE_BLOCK_SIZE - 1;

/** Abstract class for allocating and freeing byte blocks. */
Expand All @@ -46,6 +65,7 @@ protected Allocator(int blockSize) {

public abstract void recycleByteBlocks(byte[][] blocks, int start, int end);

// TODO: This is not used. Can we remove?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to remove now! This is an internal API -- we are free to suddenly change it.

public void recycleByteBlocks(List<byte[]> blocks) {
final byte[][] b = blocks.toArray(new byte[blocks.size()][]);
recycleByteBlocks(b, 0, b.length);
Expand Down Expand Up @@ -90,24 +110,24 @@ public void recycleByteBlocks(byte[][] blocks, int start, int end) {
}
}
}
;

/**
* array of buffers currently used in the pool. Buffers are allocated if needed don't modify this
* outside of this class.
*/
public byte[][] buffers = new byte[10][];
/** Array of buffers currently used in the pool. Buffers are allocated if needed. */
private byte[][] buffers = new byte[10][];

/** index into the buffers array pointing to the current buffer used as the head */
private int bufferUpto = -1; // Which buffer we are upto

/** Where we are in head buffer */
/** Where we are in the head buffer. */
public int byteUpto = BYTE_BLOCK_SIZE;

/** Current head buffer */
/** Current head buffer. */
public byte[] buffer;

/** Current head offset */
/**
* Offset from the start of the first buffer to the start of the current buffer, which is
* bufferUpto * BYTE_BLOCK_SIZE. The buffer pool maintains this offset because it is the first to
* overflow if there are too many allocated blocks.
*/
public int byteOffset = -BYTE_BLOCK_SIZE;

private final Allocator allocator;
Expand All @@ -117,21 +137,22 @@ public ByteBlockPool(Allocator allocator) {
}

/**
* Resets the pool to its initial state reusing the first buffer and fills all buffers with <code>
* 0</code> bytes before they reused or passed to {@link Allocator#recycleByteBlocks(byte[][],
* int, int)}. Calling {@link ByteBlockPool#nextBuffer()} is not needed after reset.
* Resets the pool to its initial state, reusing the first buffer and filling all buffers with
* {@code 0} bytes before they are reused or passed to {@link
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we really need to zero out the buffers that will be recycled. Maybe it makes more sense to have the recycling method do it if necessary?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to make this cleaner (who zeros). Why does byte slicing even require pre-zero'd buffers?

Maybe open a follow-on issue for this? This change is already great.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, there's enough going on in this PR already. I opened an issue to look into this separately: #12734

* Allocator#recycleByteBlocks(byte[][], int, int)}. Calling {@link ByteBlockPool#nextBuffer()} is
* not needed after reset.
*/
public void reset() {
reset(true, true);
}

/**
* Expert: Resets the pool to its initial state reusing the first buffer. Calling {@link
* Expert: Resets the pool to its initial state, while reusing the first buffer. Calling {@link
* ByteBlockPool#nextBuffer()} is not needed after reset.
*
* @param zeroFillBuffers if <code>true</code> the buffers are filled with <code>0</code>. This
* should be set to <code>true</code> if this pool is used with slices.
* @param reuseFirst if <code>true</code> the first buffer will be reused and calling {@link
* @param zeroFillBuffers if {@code true} the buffers are filled with {@code 0}. This should be
* set to {@code true} if this pool is used with slices.
* @param reuseFirst if {@code true} the first buffer will be reused and calling {@link
* ByteBlockPool#nextBuffer()} is not needed after reset iff the block pool was used before
* ie. {@link ByteBlockPool#nextBuffer()} was called before.
*/
Expand Down Expand Up @@ -170,42 +191,42 @@ public void reset(boolean zeroFillBuffers, boolean reuseFirst) {
}

/**
* Advances the pool to its next buffer. This method should be called once after the constructor
* to initialize the pool. In contrast to the constructor a {@link ByteBlockPool#reset()} call
* will advance the pool to its first buffer immediately.
* Allocates a new buffer and advances the pool to it. This method should be called once after the
* constructor to initialize the pool. In contrast to the constructor, a {@link
* ByteBlockPool#reset()} call will advance the pool to its first buffer immediately.
*/
public void nextBuffer() {
if (1 + bufferUpto == buffers.length) {
// The buffer array is full - expand it
byte[][] newBuffers =
new byte[ArrayUtil.oversize(buffers.length + 1, NUM_BYTES_OBJECT_REF)][];
System.arraycopy(buffers, 0, newBuffers, 0, buffers.length);
buffers = newBuffers;
}
// Allocate new buffer and advance the pool to it
buffer = buffers[1 + bufferUpto] = allocator.getByteBlock();
bufferUpto++;

byteUpto = 0;
byteOffset = Math.addExact(byteOffset, BYTE_BLOCK_SIZE);
}

/**
* Fill the provided {@link BytesRef} with the bytes at the specified offset/length slice. This
* will avoid copying the bytes, if the slice fits into a single block; otherwise, it uses the
* provided {@link BytesRefBuilder} to copy bytes over.
* Fill the provided {@link BytesRef} with the bytes at the specified offset and length. This will
* avoid copying the bytes if the slice fits into a single block; otherwise, it uses the provided
* {@link BytesRefBuilder} to copy bytes over.
*/
void setBytesRef(BytesRefBuilder builder, BytesRef result, long offset, int length) {
void setBytesRef(BytesRefBuilder builder, BytesRef result, int offset, int length) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm why the change from long -> int? Previously we were able to address 32 + BYTE_BLOCK_SHIFT bits of address space using long? Or did that fail to work somewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed that the purpose of the long was to increase the address space. I'll change this back.

result.length = length;

int bufferIndex = (int) (offset >> BYTE_BLOCK_SHIFT);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we stick with long we should change this to Math.toIntExact to catch overflow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea!

int bufferIndex = offset >> BYTE_BLOCK_SHIFT;
byte[] buffer = buffers[bufferIndex];
int pos = (int) (offset & BYTE_BLOCK_MASK);
int pos = offset & BYTE_BLOCK_MASK;
if (pos + length <= BYTE_BLOCK_SIZE) {
// common case where the slice lives in a single block: just reference the buffer directly
// without copying
// Common case: The slice lives in a single block. Reference the buffer directly.
result.bytes = buffer;
result.offset = pos;
} else {
// uncommon case: the slice spans at least 2 blocks, so we must copy the bytes:
// Uncommon case: The slice spans at least 2 blocks, so we must copy the bytes.
builder.grow(length);
result.bytes = builder.get().bytes;
result.offset = 0;
Expand Down Expand Up @@ -242,10 +263,10 @@ public void append(final BytesRef bytes) {
*
* <p>Note: this method allows to copy across block boundaries.
*/
public void readBytes(final long offset, final byte[] bytes, int bytesOffset, int bytesLength) {
public void readBytes(final int offset, final byte[] bytes, int bytesOffset, int bytesLength) {
int bytesLeft = bytesLength;
int bufferIndex = (int) (offset >> BYTE_BLOCK_SHIFT);
int pos = (int) (offset & BYTE_BLOCK_MASK);
int bufferIndex = offset >> BYTE_BLOCK_SHIFT;
int pos = offset & BYTE_BLOCK_MASK;
while (bytesLeft > 0) {
byte[] buffer = buffers[bufferIndex++];
int chunk = Math.min(bytesLeft, BYTE_BLOCK_SIZE - pos);
Expand All @@ -259,14 +280,15 @@ public void readBytes(final long offset, final byte[] bytes, int bytesOffset, in
@Override
public long ramBytesUsed() {
long size = BASE_RAM_BYTES;
size += RamUsageEstimator.sizeOfObject(buffer);
size += RamUsageEstimator.shallowSizeOf(buffers);
for (byte[] buf : buffers) {
if (buf == buffer) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did the previous code special case buffer? Isn't buffer a full sized block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't figured out why buffer was a special case. Even if it were not a full block, would that make a difference if we're calling RamUsageEstimator#sizeOfObject on it in both cases?

continue;
}
size += RamUsageEstimator.sizeOfObject(buf);
}
return size;
}

/** Retrieve the buffer at the specified index from the buffer pool. */
public byte[] getBuffer(int bufferIndex) {
return buffers[bufferIndex];
}
}
Loading
Loading