From 389a21b177a045a36b67fa75d84629f675773d46 Mon Sep 17 00:00:00 2001 From: Nandor Kollar Date: Mon, 20 Feb 2017 16:21:39 +0100 Subject: [PATCH 1/2] AVRO-1857: GenericDatumWriter.write using BufferedBinaryEncoder leaves ByteBuffer in indeterminate state --- .../apache/avro/io/BufferedBinaryEncoder.java | 7 ++- .../java/org/apache/avro/io/TestEncoders.java | 60 +++++++++++++++++++ 2 files changed, 64 insertions(+), 3 deletions(-) diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/BufferedBinaryEncoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/BufferedBinaryEncoder.java index 82a36f9ebee..0a46ed7fa08 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/BufferedBinaryEncoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/BufferedBinaryEncoder.java @@ -157,11 +157,12 @@ public void writeFixed(byte[] bytes, int start, int len) throws IOException { @Override public void writeFixed(ByteBuffer bytes) throws IOException { - if (!bytes.hasArray() && bytes.remaining() > bulkLimit) { + ByteBuffer readOnlyBytes = bytes.asReadOnlyBuffer(); + if (!readOnlyBytes.hasArray() && readOnlyBytes.remaining() > bulkLimit) { flushBuffer(); - sink.innerWrite(bytes); // bypass the buffer + sink.innerWrite(readOnlyBytes); // bypass the readOnlyBytes } else { - super.writeFixed(bytes); + super.writeFixed(readOnlyBytes); } } diff --git a/lang/java/avro/src/test/java/org/apache/avro/io/TestEncoders.java b/lang/java/avro/src/test/java/org/apache/avro/io/TestEncoders.java index 4d16f163505..c54e2c6a3b3 100644 --- a/lang/java/avro/src/test/java/org/apache/avro/io/TestEncoders.java +++ b/lang/java/avro/src/test/java/org/apache/avro/io/TestEncoders.java @@ -20,6 +20,12 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; import org.apache.avro.AvroTypeException; import org.apache.avro.Schema; @@ -32,7 +38,15 @@ import org.junit.Assert; import org.junit.Test; +import static java.util.Arrays.asList; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + public class TestEncoders { + private static final int ENCODER_BUFFER_SIZE = 32; + private static final int EXAMPLE_DATA_SIZE = 17; + private static EncoderFactory factory = EncoderFactory.get(); @Test @@ -193,4 +207,50 @@ public void testJsonRecordOrderingWithProjection2() throws IOException { Assert.assertEquals("{\"a\": {\"a1\": null, \"a2\": true}}", o.toString()); } + @Test + public void testArrayBackedByteBuffer() throws IOException { + ByteBuffer buffer = ByteBuffer.wrap(someBytes(EXAMPLE_DATA_SIZE)); + + testWithBuffer(buffer); + } + + @Test + public void testMappedByteBuffer() throws IOException { + Path file = Files.createTempFile("test", "data"); + Files.write(file, someBytes(EXAMPLE_DATA_SIZE)); + MappedByteBuffer buffer = FileChannel.open(file, StandardOpenOption.READ).map(FileChannel.MapMode.READ_ONLY, 0, EXAMPLE_DATA_SIZE); + + testWithBuffer(buffer); + } + + private void testWithBuffer(ByteBuffer buffer) throws IOException { + assertThat(asList(buffer.position(), buffer.remaining()), is(asList(0, EXAMPLE_DATA_SIZE))); + + ByteArrayOutputStream output = new ByteArrayOutputStream(EXAMPLE_DATA_SIZE * 2); + EncoderFactory encoderFactory = new EncoderFactory(); + encoderFactory.configureBufferSize(ENCODER_BUFFER_SIZE); + + Encoder encoder = encoderFactory.binaryEncoder(output, null); + new GenericDatumWriter(Schema.create(Schema.Type.BYTES)).write(buffer, encoder); + encoder.flush(); + + assertThat(output.toByteArray(), equalTo(avroEncoded(someBytes(EXAMPLE_DATA_SIZE)))); + assertThat(asList(buffer.position(), buffer.remaining()), is(asList(0, EXAMPLE_DATA_SIZE))); // fails if buffer is not array-backed and buffer overflow occurs + } + + private byte[] someBytes(int size) { + byte[] result = new byte[size]; + for (int i = 0; i < size; i++) { + result[i] = (byte) i; + } + return result; + } + + private byte[] avroEncoded(byte[] bytes) { + assert bytes.length < 64; + byte[] result = new byte[1 + bytes.length]; + result[0] = (byte) (bytes.length * 2); // zig-zag encoding + System.arraycopy(bytes, 0, result, 1, bytes.length); + return result; + } } From fbcc7fc9e239b0b9619ca735d2d542a986218672 Mon Sep 17 00:00:00 2001 From: Nandor Kollar Date: Tue, 23 May 2017 16:25:58 +0200 Subject: [PATCH 2/2] AVRO-1857: GenericDatumWriter.write using BufferedBinaryEncoder leaves ByteBuffer in indeterminate state --- .../src/main/java/org/apache/avro/io/BufferedBinaryEncoder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/BufferedBinaryEncoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/BufferedBinaryEncoder.java index 0a46ed7fa08..e99f43f99f2 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/BufferedBinaryEncoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/BufferedBinaryEncoder.java @@ -158,7 +158,7 @@ public void writeFixed(byte[] bytes, int start, int len) throws IOException { @Override public void writeFixed(ByteBuffer bytes) throws IOException { ByteBuffer readOnlyBytes = bytes.asReadOnlyBuffer(); - if (!readOnlyBytes.hasArray() && readOnlyBytes.remaining() > bulkLimit) { + if (!bytes.hasArray() && bytes.remaining() > bulkLimit) { flushBuffer(); sink.innerWrite(readOnlyBytes); // bypass the readOnlyBytes } else {