Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore streamInput() performance over PagedBytesReference. #5589

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -381,12 +381,14 @@ private int countRequiredBuffers(int initialCount, int numBytes) {
private static class PagedBytesReferenceStreamInput extends StreamInput {

private final ByteArray bytearray;
private final BytesRef ref;
private final int offset;
private final int length;
private int pos;

public PagedBytesReferenceStreamInput(ByteArray bytearray, int offset, int length) {
this.bytearray = bytearray;
this.ref = new BytesRef();
this.offset = offset;
this.length = length;
this.pos = 0;
Expand Down Expand Up @@ -420,7 +422,7 @@ public int read() throws IOException {
}

@Override
public int read(byte[] b, int bOffset, int len) throws IOException {
public int read(final byte[] b, final int bOffset, final int len) throws IOException {
if (len == 0) {
return 0;
}
Expand All @@ -430,17 +432,25 @@ public int read(byte[] b, int bOffset, int len) throws IOException {
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just as a sidenote I think the arguments should be final to make sure we don't reassign them :)

// we need to stop at the end
len = Math.min(length, len);
int todo = Math.min(len, length);

// ByteArray.get(BytesRef) does not work since it flips the
// ref's byte[] pointer, so for now we copy byte-by-byte
// current offset into the underlying ByteArray
long bytearrayOffset = offset + pos;

// bytes already copied
int written = 0;
while (written < len) {
b[bOffset + written] = bytearray.get(offset + written);
written++;

while (written < todo) {
long pagefragment = PAGE_SIZE - (bytearrayOffset % PAGE_SIZE); // how much can we read until hitting N*PAGE_SIZE?
int bulksize = (int)Math.min(pagefragment, todo - written); // we cannot copy more than a page fragment
boolean copied = bytearray.get(bytearrayOffset, bulksize, ref); // get the fragment
assert (copied == false); // we should never ever get back a materialized byte[]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this still confuses me why do we return that boolean if it is always expected to be false?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I your reply got busted.... lemme reread

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assert was just for testing. If you find it less confusing I can remove the return value & the assert .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am just wondering if we can make sure we never get a class taht does that but I guess the assert is ok

System.arraycopy(ref.bytes, ref.offset, b, bOffset + written, bulksize); // copy fragment contents
written += bulksize; // count how much we copied
bytearrayOffset += bulksize; // advance ByteArray index
}

pos += written;
pos += written; // finally advance our stream position
return written;
}

Expand Down
Expand Up @@ -98,7 +98,7 @@ public void testSlice() {
}

public void testStreamInput() throws IOException {
int length = randomIntBetween(10, PAGE_SIZE * 3);
int length = randomIntBetween(10, scaledRandomIntBetween(PAGE_SIZE * 2, PAGE_SIZE * 20));
BytesReference pbr = getRandomizedPagedBytesReference(length);
StreamInput si = pbr.streamInput();
assertNotNull(si);
Expand All @@ -107,6 +107,8 @@ public void testStreamInput() throws IOException {
assertEquals(pbr.get(0), si.readByte());
assertEquals(pbr.get(1), si.readByte());
assertEquals(pbr.get(2), si.readByte());

// reset the stream for bulk reading
si.reset();

// buffer for bulk reads
Expand Down Expand Up @@ -151,10 +153,34 @@ public void testStreamInput() throws IOException {
}
}

public void testSliceStreamInput() throws IOException {
int length = randomIntBetween(10, PAGE_SIZE * 3);
public void testStreamInputBulkReadWithOffset() throws IOException {
int length = randomIntBetween(10, scaledRandomIntBetween(PAGE_SIZE * 2, PAGE_SIZE * 20));
BytesReference pbr = getRandomizedPagedBytesReference(length);
StreamInput si = pbr.streamInput();
assertNotNull(si);

// read a bunch of single bytes one by one
int offset = randomIntBetween(1, length / 2);
for (int i = 0; i < offset ; i++) {
assertEquals(pbr.get(i), si.readByte());
}

// now do NOT reset the stream - keep the stream's offset!

// buffer to compare remaining bytes against bulk read
byte[] pbrBytesWithOffset = Arrays.copyOfRange(pbr.toBytes(), offset, length);
// randomized target buffer to ensure no stale slots
byte[] targetBytes = new byte[pbrBytesWithOffset.length];
getRandom().nextBytes(targetBytes);

// bulk-read all
si.readFully(targetBytes);
assertArrayEquals(pbrBytesWithOffset, targetBytes);
}

public void testSliceStreamInput() throws IOException {
int length = randomIntBetween(10, scaledRandomIntBetween(PAGE_SIZE * 2, PAGE_SIZE * 20));
BytesReference pbr = getRandomizedPagedBytesReference(length);

// test stream input over slice (upper half of original)
int sliceOffset = randomIntBetween(1, length / 2);
Expand All @@ -166,7 +192,9 @@ public void testSliceStreamInput() throws IOException {
assertEquals(slice.get(0), sliceInput.readByte());
assertEquals(slice.get(1), sliceInput.readByte());
assertEquals(slice.get(2), sliceInput.readByte());
si.reset();

// reset the slice stream for bulk reading
sliceInput.reset();

// bulk read
byte[] sliceBytes = new byte[sliceLength];
Expand Down