Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Take stream position into account when calculating remaining length #5677

Merged
merged 1 commit into from Apr 3, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -431,27 +431,26 @@ public int read(final byte[] b, final int bOffset, final int len) throws IOExcep
return -1;
}

// we need to stop at the end
int todo = Math.min(len, length);
final int numBytesToCopy = Math.min(len, length - pos); // copy the full lenth or the remaining part

// current offset into the underlying ByteArray
long bytearrayOffset = offset + pos;
long byteArrayOffset = offset + pos;

// bytes already copied
int written = 0;
int copiedBytes = 0;

while (written < todo) {
long pagefragment = PAGE_SIZE - (bytearrayOffset % PAGE_SIZE); // how much can we read until hitting N*PAGE_SIZE?
int bulksize = (int)Math.min(pagefragment, todo - written); // we cannot copy more than a page fragment
boolean copied = bytearray.get(bytearrayOffset, bulksize, ref); // get the fragment
while (copiedBytes < numBytesToCopy) {
long pageFragment = PAGE_SIZE - (byteArrayOffset % PAGE_SIZE); // how much can we read until hitting N*PAGE_SIZE?
int bulkSize = (int)Math.min(pageFragment, numBytesToCopy - copiedBytes); // we cannot copy more than a page fragment
boolean copied = bytearray.get(byteArrayOffset, bulkSize, ref); // get the fragment
assert (copied == false); // we should never ever get back a materialized byte[]
System.arraycopy(ref.bytes, ref.offset, b, bOffset + written, bulksize); // copy fragment contents
written += bulksize; // count how much we copied
bytearrayOffset += bulksize; // advance ByteArray index
System.arraycopy(ref.bytes, ref.offset, b, bOffset + copiedBytes, bulkSize); // copy fragment contents
copiedBytes += bulkSize; // count how much we copied
byteArrayOffset += bulkSize; // advance ByteArray index
}

pos += written; // finally advance our stream position
return written;
pos += copiedBytes; // finally advance our stream position
return copiedBytes;
}

@Override
Expand Down
Expand Up @@ -178,6 +178,34 @@ public void testStreamInputBulkReadWithOffset() throws IOException {
assertArrayEquals(pbrBytesWithOffset, targetBytes);
}

public void testRandomReads() throws IOException {
int length = randomIntBetween(10, scaledRandomIntBetween(PAGE_SIZE * 2, PAGE_SIZE * 20));
BytesReference pbr = getRandomizedPagedBytesReference(length);
StreamInput streamInput = pbr.streamInput();
BytesRef target = new BytesRef();
while(target.length < pbr.length()) {
switch (randomIntBetween(0, 10)) {
case 6:
case 5:
target.append(new BytesRef(new byte[] {streamInput.readByte()}));
break;
case 4:
case 3:
BytesRef bytesRef = streamInput.readBytesRef(scaledRandomIntBetween(1, pbr.length() - target.length));
target.append(bytesRef);
break;
default:
byte[] buffer = new byte[scaledRandomIntBetween(1, pbr.length() - target.length)];
int offset = scaledRandomIntBetween(0, buffer.length - 1);
int read = streamInput.read(buffer, offset, buffer.length - offset);
target.append(new BytesRef(buffer, offset, read));
break;
}
}
assertEquals(pbr.length(), target.length);
assertArrayEquals(pbr.toBytes(), Arrays.copyOfRange(target.bytes, target.offset, target.length));
}

public void testSliceStreamInput() throws IOException {
int length = randomIntBetween(10, scaledRandomIntBetween(PAGE_SIZE * 2, PAGE_SIZE * 20));
BytesReference pbr = getRandomizedPagedBytesReference(length);
Expand Down Expand Up @@ -208,6 +236,13 @@ public void testSliceStreamInput() throws IOException {
byte[] sliceToBytes = slice.toBytes();
assertEquals(sliceBytes.length, sliceToBytes.length);
assertArrayEquals(sliceBytes, sliceToBytes);

sliceInput.reset();
byte[] buffer = new byte[sliceLength + scaledRandomIntBetween(1, 100)];
int offset = scaledRandomIntBetween(0, Math.max(1, buffer.length - sliceLength - 1));
int read = sliceInput.read(buffer, offset, sliceLength / 2);
sliceInput.read(buffer, offset + read, sliceLength);
assertArrayEquals(sliceBytes, Arrays.copyOfRange(buffer, offset, offset + sliceLength));
}

public void testWriteTo() throws IOException {
Expand Down
Expand Up @@ -23,10 +23,7 @@
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.FastCharArrayWriter;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentGenerator;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;

Expand Down Expand Up @@ -201,4 +198,59 @@ public void testDateTypesConversion() throws Exception {
builder.map(map);
assertThat(builder.string(), equalTo("{\"calendar\":\"" + expectedCalendar + "\"}"));
}

@Test
public void testCopyCurrentStructure() throws Exception {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
builder.startObject()
.field("test", "test field")
.startObject("filter")
.startObject("terms");

// up to 20k random terms
int numTerms = randomInt(20000) + 1;
List<String> terms = new ArrayList<>(numTerms);
for (int i = 0; i < numTerms; i++) {
terms.add("test" + i);
}

builder.field("fakefield", terms).endObject().endObject().endObject();

XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(builder.bytes());

XContentBuilder filterBuilder = null;
XContentParser.Token token;
String currentFieldName = null;
assertThat(parser.nextToken(), equalTo(XContentParser.Token.START_OBJECT));
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if ("test".equals(currentFieldName)) {
assertThat(parser.text(), equalTo("test field"));
}
} else if (token == XContentParser.Token.START_OBJECT) {
if ("filter".equals(currentFieldName)) {
filterBuilder = XContentFactory.contentBuilder(parser.contentType());
filterBuilder.copyCurrentStructure(parser);
}
}
}

assertNotNull(filterBuilder);
parser = XContentFactory.xContent(XContentType.JSON).createParser(filterBuilder.bytes());
assertThat(parser.nextToken(), equalTo(XContentParser.Token.START_OBJECT));
assertThat(parser.nextToken(), equalTo(XContentParser.Token.FIELD_NAME));
assertThat(parser.currentName(), equalTo("terms"));
assertThat(parser.nextToken(), equalTo(XContentParser.Token.START_OBJECT));
assertThat(parser.nextToken(), equalTo(XContentParser.Token.FIELD_NAME));
assertThat(parser.currentName(), equalTo("fakefield"));
assertThat(parser.nextToken(), equalTo(XContentParser.Token.START_ARRAY));
int i = 0;
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
assertThat(parser.text(), equalTo(terms.get(i++)));
}

assertThat(i, equalTo(terms.size()));
}
}