Skip to content

Commit

Permalink
Restore streamInput() performance over PagedBytesReference.
Browse files Browse the repository at this point in the history
Closes #5589
  • Loading branch information
Holger Hoffstätte committed Mar 28, 2014
1 parent 7e568b0 commit 089d0e5
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 12 deletions.
Expand Up @@ -381,12 +381,14 @@ private int countRequiredBuffers(int initialCount, int numBytes) {
private static class PagedBytesReferenceStreamInput extends StreamInput {

private final ByteArray bytearray;
private final BytesRef ref;
private final int offset;
private final int length;
private int pos;

public PagedBytesReferenceStreamInput(ByteArray bytearray, int offset, int length) {
this.bytearray = bytearray;
this.ref = new BytesRef();
this.offset = offset;
this.length = length;
this.pos = 0;
Expand Down Expand Up @@ -420,7 +422,7 @@ public int read() throws IOException {
}

@Override
public int read(byte[] b, int bOffset, int len) throws IOException {
public int read(final byte[] b, final int bOffset, final int len) throws IOException {
if (len == 0) {
return 0;
}
Expand All @@ -430,17 +432,25 @@ public int read(byte[] b, int bOffset, int len) throws IOException {
}

// we need to stop at the end
len = Math.min(length, len);
int todo = Math.min(len, length);

// ByteArray.get(BytesRef) does not work since it flips the
// ref's byte[] pointer, so for now we copy byte-by-byte
// current offset into the underlying ByteArray
long bytearrayOffset = offset + pos;

// bytes already copied
int written = 0;
while (written < len) {
b[bOffset + written] = bytearray.get(offset + written);
written++;

while (written < todo) {
long pagefragment = PAGE_SIZE - (bytearrayOffset % PAGE_SIZE); // how much can we read until hitting N*PAGE_SIZE?
int bulksize = (int)Math.min(pagefragment, todo - written); // we cannot copy more than a page fragment
boolean copied = bytearray.get(bytearrayOffset, bulksize, ref); // get the fragment
assert (copied == false); // we should never ever get back a materialized byte[]
System.arraycopy(ref.bytes, ref.offset, b, bOffset + written, bulksize); // copy fragment contents
written += bulksize; // count how much we copied
bytearrayOffset += bulksize; // advance ByteArray index
}

pos += written;
pos += written; // finally advance our stream position
return written;
}

Expand Down
Expand Up @@ -98,7 +98,7 @@ public void testSlice() {
}

public void testStreamInput() throws IOException {
int length = randomIntBetween(10, PAGE_SIZE * 3);
int length = randomIntBetween(10, scaledRandomIntBetween(PAGE_SIZE * 2, PAGE_SIZE * 20));
BytesReference pbr = getRandomizedPagedBytesReference(length);
StreamInput si = pbr.streamInput();
assertNotNull(si);
Expand All @@ -107,6 +107,8 @@ public void testStreamInput() throws IOException {
assertEquals(pbr.get(0), si.readByte());
assertEquals(pbr.get(1), si.readByte());
assertEquals(pbr.get(2), si.readByte());

// reset the stream for bulk reading
si.reset();

// buffer for bulk reads
Expand Down Expand Up @@ -151,10 +153,34 @@ public void testStreamInput() throws IOException {
}
}

public void testSliceStreamInput() throws IOException {
int length = randomIntBetween(10, PAGE_SIZE * 3);
public void testStreamInputBulkReadWithOffset() throws IOException {
int length = randomIntBetween(10, scaledRandomIntBetween(PAGE_SIZE * 2, PAGE_SIZE * 20));
BytesReference pbr = getRandomizedPagedBytesReference(length);
StreamInput si = pbr.streamInput();
assertNotNull(si);

// read a bunch of single bytes one by one
int offset = randomIntBetween(1, length / 2);
for (int i = 0; i < offset ; i++) {
assertEquals(pbr.get(i), si.readByte());
}

// now do NOT reset the stream - keep the stream's offset!

// buffer to compare remaining bytes against bulk read
byte[] pbrBytesWithOffset = Arrays.copyOfRange(pbr.toBytes(), offset, length);
// randomized target buffer to ensure no stale slots
byte[] targetBytes = new byte[pbrBytesWithOffset.length];
getRandom().nextBytes(targetBytes);

// bulk-read all
si.readFully(targetBytes);
assertArrayEquals(pbrBytesWithOffset, targetBytes);
}

public void testSliceStreamInput() throws IOException {
int length = randomIntBetween(10, scaledRandomIntBetween(PAGE_SIZE * 2, PAGE_SIZE * 20));
BytesReference pbr = getRandomizedPagedBytesReference(length);

// test stream input over slice (upper half of original)
int sliceOffset = randomIntBetween(1, length / 2);
Expand All @@ -166,7 +192,9 @@ public void testSliceStreamInput() throws IOException {
assertEquals(slice.get(0), sliceInput.readByte());
assertEquals(slice.get(1), sliceInput.readByte());
assertEquals(slice.get(2), sliceInput.readByte());
si.reset();

// reset the slice stream for bulk reading
sliceInput.reset();

// bulk read
byte[] sliceBytes = new byte[sliceLength];
Expand Down

0 comments on commit 089d0e5

Please sign in to comment.