Skip to content

Commit

Permalink
AVRO-1081. Java: Fix to be able to write ByteBuffers that have no bac…
Browse files Browse the repository at this point in the history
…king array. Also fix reflection to correctly read ByteBuffer fields.

git-svn-id: https://svn.apache.org/repos/asf/avro/trunk@1339825 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
cutting committed May 17, 2012
1 parent 4a6cd27 commit 5be068e
Show file tree
Hide file tree
Showing 10 changed files with 212 additions and 14 deletions.
4 changes: 4 additions & 0 deletions CHANGES.txt
Expand Up @@ -58,6 +58,10 @@ Avro 1.7.0 (unreleased)
AVRO-1065. NodeRecord::isValid() treats records with no fields as
invalid. (thiru)

AVRO-1081. Java: Fix to be able to write ByteBuffers that have no
backing array. Also fix reflection to correctly read ByteBuffer
fields. (cutting)

Avro 1.6.3 (5 March 2012)

AVRO-1077. Missing 'inline' for union set function. (thiru)
Expand Down
Expand Up @@ -279,8 +279,7 @@ private void resetBufferTo(int size) throws IOException {
* Appending non-conforming data may result in an unreadable file. */
public void appendEncoded(ByteBuffer datum) throws IOException {
assertOpen();
int start = datum.position();
bufOut.writeFixed(datum.array(), start, datum.limit()-start);
bufOut.writeFixed(datum);
blockCount++;
writeIfBlockFull();
}
Expand Down
Expand Up @@ -148,7 +148,7 @@ protected Object read(Object old, Schema expected,
case UNION: return read(old, expected.getTypes().get(in.readIndex()), in);
case FIXED: return readFixed(old, expected, in);
case STRING: return readString(old, expected, in);
case BYTES: return readBytes(old, in);
case BYTES: return readBytes(old, expected, in);
case INT: return readInt(old, expected, in);
case LONG: return in.readLong();
case FLOAT: return in.readFloat();
Expand Down Expand Up @@ -341,6 +341,14 @@ protected Object readString(Object old, Decoder in) throws IOException {
* Utf8#Utf8(String)}.*/
protected Object createString(String value) { return new Utf8(value); }

/** Called to read byte arrays. Subclasses may override to use a different
* byte array representation. By default, this calls {@link
* Decoder#readBytes(ByteBuffer)}.*/
protected Object readBytes(Object old, Schema s, Decoder in)
throws IOException {
return readBytes(old, in);
}

/** Called to read byte arrays. Subclasses may override to use a different
* byte array representation. By default, this calls {@link
* Decoder#readBytes(ByteBuffer)}.*/
Expand Down
Expand Up @@ -57,10 +57,13 @@ public void writeString(String string) throws IOException {

@Override
public void writeBytes(ByteBuffer bytes) throws IOException {
int pos = bytes.position();
int start = bytes.arrayOffset() + pos;
int len = bytes.limit() - pos;
writeBytes(bytes.array(), start, len);
int len = bytes.limit() - bytes.position();
if (0 == len) {
writeZero();
} else {
writeInt(len);
writeFixed(bytes);
}
}

@Override
Expand Down
Expand Up @@ -19,6 +19,9 @@

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;

import org.apache.avro.AvroRuntimeException;

Expand Down Expand Up @@ -151,6 +154,15 @@ public void writeFixed(byte[] bytes, int start, int len) throws IOException {
System.arraycopy(bytes, start, buf, pos, len);
pos += len;
}

@Override
public void writeFixed(ByteBuffer bytes) throws IOException {
if (!bytes.hasArray() && bytes.remaining() > bulkLimit) {
sink.innerWrite(bytes); // bypass the buffer
} else {
super.writeFixed(bytes);
}
}

@Override
protected void writeZero() throws IOException {
Expand Down Expand Up @@ -182,15 +194,20 @@ private abstract static class ByteSink {
protected ByteSink() {}
/** Write data from bytes, starting at off, for len bytes **/
protected abstract void innerWrite(byte[] bytes, int off, int len) throws IOException;

protected abstract void innerWrite(ByteBuffer buff) throws IOException;

/** Flush the underlying output, if supported **/
protected abstract void innerFlush() throws IOException;
}

static class OutputStreamSink extends ByteSink {
private final OutputStream out;
private final WritableByteChannel channel;
private OutputStreamSink(OutputStream out) {
super();
this.out = out;
channel = Channels.newChannel(out);
}
@Override
protected void innerWrite(byte[] bytes, int off, int len)
Expand All @@ -201,5 +218,9 @@ protected void innerWrite(byte[] bytes, int off, int len)
protected void innerFlush() throws IOException {
out.flush();
}
@Override
protected void innerWrite(ByteBuffer buff) throws IOException {
channel.write(buff);
}
}
}
13 changes: 13 additions & 0 deletions lang/java/avro/src/main/java/org/apache/avro/io/Encoder.java
Expand Up @@ -166,6 +166,19 @@ public void writeFixed(byte[] bytes) throws IOException {
writeFixed(bytes, 0, bytes.length);
}

/** A version of {@link writeFixed} for ByteBuffers. */
public void writeFixed(ByteBuffer bytes) throws IOException {
int pos = bytes.position();
int len = bytes.limit() - pos;
if (bytes.hasArray()) {
writeFixed(bytes.array(), bytes.arrayOffset() + pos, len);
} else {
byte[] b = new byte[len];
bytes.get(b, 0, len);
writeFixed(b, 0, len);
}
}

/**
* Writes an enumeration.
* @param e
Expand Down
Expand Up @@ -262,8 +262,11 @@ protected Schema createSchema(Type type, Map<String,Schema> names) {
return super.createSchema(type, names);
if (c.isArray()) { // array
Class component = c.getComponentType();
if (component == Byte.TYPE) // byte array
return Schema.create(Schema.Type.BYTES);
if (component == Byte.TYPE) { // byte array
Schema result = Schema.create(Schema.Type.BYTES);
result.addProp(CLASS_PROP, c.getName());
return result;
}
Schema result = Schema.createArray(createSchema(component, names));
setElement(result, component);
return result;
Expand Down
Expand Up @@ -121,11 +121,17 @@ protected Object readString(Object old, Decoder in) throws IOException {
protected Object createString(String value) { return value; }

@Override
protected Object readBytes(Object old, Decoder in) throws IOException {
protected Object readBytes(Object old, Schema s, Decoder in)
throws IOException {
ByteBuffer bytes = in.readBytes(null);
byte[] result = new byte[bytes.remaining()];
bytes.get(result);
return result;
Class c = ReflectData.getClassProp(s, ReflectData.CLASS_PROP);
if (c != null && c.isArray()) {
byte[] result = new byte[bytes.remaining()];
bytes.get(result);
return result;
} else {
return bytes;
}
}

@Override
Expand Down
Expand Up @@ -101,7 +101,8 @@ public class TestReflect {
}

@Test public void testBytes() {
check(new byte[0], "\"bytes\"");
check(ByteBuffer.allocate(0), "\"bytes\"");
check(new byte[0], "{\"type\":\"bytes\",\"java-class\":\"[B\"}");
}

@Test public void testUnionWithCollection() {
Expand Down
@@ -0,0 +1,140 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.avro.reflect;

import static org.junit.Assert.*;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Iterator;

import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.file.FileReader;
import org.apache.avro.file.SeekableByteArrayInput;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class ByteBufferTest {
static class X{
String name = "";
ByteBuffer content;
}
File tmpdir;
File content;

@Before public void before() throws IOException{
tmpdir = File.createTempFile("avro", "test");
tmpdir.delete();
tmpdir.mkdirs();
content = new File(tmpdir,"test-content");
FileOutputStream out = new FileOutputStream(content);
for(int i=0;i<100000;i++){
out.write("hello world\n".getBytes());
}
out.close();
}

@Test public void test() throws Exception{
Schema schema = ReflectData.get().getSchema(X.class);
ByteArrayOutputStream bout = new ByteArrayOutputStream();
writeOneXAsAvro(schema, bout);
X record = readOneXFromAvro(schema, bout);

String expected = getmd5(content);
String actual = getmd5(record.content);
assertEquals("md5 for result differed from input",expected,actual);
}

private X readOneXFromAvro(Schema schema, ByteArrayOutputStream bout)
throws IOException {
SeekableByteArrayInput input = new SeekableByteArrayInput(bout.toByteArray());
ReflectDatumReader<X> datumReader = new ReflectDatumReader<X>(schema);
FileReader<X> reader = DataFileReader.openReader(input, datumReader);
Iterator<X> it = reader.iterator();
assertTrue("missing first record",it.hasNext());
X record = it.next();
assertFalse("should be no more records - only wrote one out",it.hasNext());
return record;
}

private void writeOneXAsAvro(Schema schema, ByteArrayOutputStream bout)
throws IOException, FileNotFoundException {
DatumWriter<X> datumWriter = new ReflectDatumWriter<X>(schema);
DataFileWriter<X> writer = new DataFileWriter<X>(datumWriter);
writer.create(schema, bout);
X x = new X();
x.name = "xxx";
FileInputStream fis = new FileInputStream(content);
try{
FileChannel channel = fis.getChannel();
try{
long contentLength = content.length();
//set the content to be a file channel.
ByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, 0, contentLength);
x.content = buffer;
writer.append(x);
}finally{
channel.close();
}
}finally{
fis.close();
}
writer.flush();
writer.close();
}

private String getmd5(File file) throws Exception{
FileInputStream fis = new FileInputStream(content);
try{
FileChannel channel = fis.getChannel();
try{
long contentLength = content.length();
ByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, 0, contentLength);
return getmd5(buffer);
}finally{
channel.close();
}
}finally{
fis.close();
}
}

String getmd5(ByteBuffer buffer) throws NoSuchAlgorithmException{
MessageDigest mdEnc = MessageDigest.getInstance("MD5");
mdEnc.reset();
mdEnc.update(buffer);
return new BigInteger(1, mdEnc.digest()).toString(16);
}
}

0 comments on commit 5be068e

Please sign in to comment.