From 74d904b463fd7ae6acc1998350de437ea2aa8a83 Mon Sep 17 00:00:00 2001 From: Marius Posta Date: Tue, 3 Jan 2017 13:52:52 -0500 Subject: [PATCH] AVRO-1976: Add Input/OutputFormat to read/write encoded objects --- .../avro/mapred/AvroContainerFileBlock.java | 196 +++++++++++++++++ .../avro/mapred/AvroContainerFileHeader.java | 196 +++++++++++++++++ .../mapreduce/AvroEncodedInputFormat.java | 134 ++++++++++++ .../mapreduce/AvroEncodedOutputFormat.java | 75 +++++++ .../mapreduce/AvroEncodedRecordReader.java | 95 ++++++++ .../mapreduce/AvroEncodedRecordWriter.java | 81 +++++++ .../TestAvroEncodedInputAndOutputFormats.java | 205 ++++++++++++++++++ .../TestAvroEncodedRecordReader.java | 173 +++++++++++++++ .../TestAvroEncodedRecordWriter.java | 154 +++++++++++++ 9 files changed, 1309 insertions(+) create mode 100644 lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroContainerFileBlock.java create mode 100644 lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroContainerFileHeader.java create mode 100644 lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroEncodedInputFormat.java create mode 100644 lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroEncodedOutputFormat.java create mode 100644 lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroEncodedRecordReader.java create mode 100644 lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroEncodedRecordWriter.java create mode 100644 lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroEncodedInputAndOutputFormats.java create mode 100644 lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroEncodedRecordReader.java create mode 100644 lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroEncodedRecordWriter.java diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroContainerFileBlock.java b/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroContainerFileBlock.java new file mode 100644 index 00000000000..eb7c55dcec3 --- /dev/null +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroContainerFileBlock.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.avro.mapred; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.file.DataFileConstants; +import org.apache.avro.io.BinaryData; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.WritableComparable; + +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.InputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * Values for hadoop files opened with AvroEncodedInputFormat. + * + *

The data wrapped in this object corresponds to a file data block in an Avro container + * file according to the + * spec.

+ */ +public class AvroContainerFileBlock implements WritableComparable { + + static private final int BLOCK_OVERHEAD_BYTES_MAX; + + static { + final int varLongBytesMax = 9; + final int varIntBytesMax = 5; + BLOCK_OVERHEAD_BYTES_MAX = varLongBytesMax + varIntBytesMax + + AvroContainerFileHeader.SYNC_MARKER.length; + } + + private BytesWritable block; + private int objectsOffset; + private int objectsLength; + private long objectCount; + + /** + * No-arg constructor for empty block. + */ + public AvroContainerFileBlock() { + this(0L, new byte[0], 0, 0); + } + + /** + * Wraps an uncompressed, binary-encoded sequence of objects. + */ + public AvroContainerFileBlock(long objectCount, byte[] encodedObjects) { + this(objectCount, encodedObjects, 0, encodedObjects.length); + } + + /** + * Wraps an uncompressed, binary-encoded sequence of objects, in range [pos, pos+len[. + */ + public AvroContainerFileBlock(long objectCount, byte[] encodedObjects, int pos, int len) { + block = new BytesWritable(new byte[DataFileConstants.DEFAULT_SYNC_INTERVAL]); + set(objectCount, encodedObjects, pos, len); + } + + /** + * Schema for Avro container file blocks, as defined in the spec. + */ + public static Schema fileBlockSchema() { + return SchemaBuilder + .record("org.apache.avro.file.Block") + .fields() + .name("objects") + .type(Schema.create(Schema.Type.LONG)) + .noDefault() + .name("bytes") + .type(Schema.create(Schema.Type.BYTES)) + .noDefault() + .name("sync") + .type(SchemaBuilder.fixed("Magic").size(DataFileConstants.SYNC_SIZE)) + .noDefault() + .endRecord(); + } + + /** + * Wraps an uncompressed, binary-encoded sequence of objects. + */ + public void set(long objectCount, byte[] encodedObjects) { + set(objectCount, encodedObjects, 0, encodedObjects.length); + } + + /** + * Wraps an uncompressed, binary-encoded sequence of objects, in range [pos, pos+len[. + */ + public void set(long objectCount, byte[] encodedObjects, int pos, int len) { + resetBuffer(objectCount, len); + System.arraycopy(encodedObjects, pos, block.getBytes(), objectsOffset, objectsLength); + finalizeBuffer(); + } + + /** + * Generates a ByteBuffer wrapping the encoded objects. + */ + public ByteBuffer getEncodedObjects() { + return ByteBuffer.wrap(block.getBytes(), objectsOffset, objectsLength); + } + + /** + * Generates an InputStream on the encoded objects. + */ + public InputStream getEncodedObjectStream() { + return new ByteArrayInputStream(block.getBytes(), objectsOffset, objectsLength); + } + + /** + * Returns the object count in this block. + */ + public long getObjectCount() { + return objectCount; + } + + /** + * Exposes the underlying BytesWritable instance containing the encoded file block object. + */ + public BytesWritable unwrap() { + return block; + } + + /** {@inheritDoc} */ + @Override + public int compareTo(AvroContainerFileBlock o) { + return block.compareTo(o.block); + } + + /** {@inheritDoc} */ + @Override + public void write(DataOutput out) throws IOException { + out.writeLong(objectCount); + out.writeInt(objectsLength); + out.write(block.getBytes(), objectsOffset, objectsLength); + } + + /** {@inheritDoc} */ + @Override + public void readFields(DataInput in) throws IOException { + resetBuffer(in.readLong(), in.readInt()); + in.readFully(block.getBytes(), objectsOffset, objectsLength); + finalizeBuffer(); + } + + private void resetBuffer(long newBlockCount, int newBlockLength) { + objectCount = newBlockCount; + objectsLength = newBlockLength; + block.setSize(BLOCK_OVERHEAD_BYTES_MAX + objectsLength); + byte[] buffer = block.getBytes(); + objectsOffset = BinaryData.encodeLong(objectCount, buffer, 0); + objectsOffset += BinaryData.encodeInt(objectsLength, buffer, objectsOffset); + block.setSize(objectsOffset + objectsLength + AvroContainerFileHeader.SYNC_MARKER.length); + } + + private void finalizeBuffer() { + System.arraycopy( + AvroContainerFileHeader.SYNC_MARKER, 0, + block.getBytes(), objectsOffset + objectsLength, + AvroContainerFileHeader.SYNC_MARKER.length); + } + + /** {@inheritDoc} */ + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || !(o instanceof AvroContainerFileBlock)) return false; + AvroContainerFileBlock that = (AvroContainerFileBlock) o; + return block.equals(that.block); + } + + /** {@inheritDoc} */ + @Override + public int hashCode() { + return block.hashCode(); + } +} diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroContainerFileHeader.java b/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroContainerFileHeader.java new file mode 100644 index 00000000000..b92cc007d60 --- /dev/null +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroContainerFileHeader.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.avro.mapred; + +import org.apache.avro.AvroRuntimeException; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.file.DataFileConstants; +import org.apache.avro.io.BinaryData; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.WritableComparable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.InputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; + +/** + * Keys for hadoop files opened with AvroEncodedInputFormat. + * + *

The data wrapped in this object corresponds to a file header in an Avro container + * file according to the + * spec.

+ */ +public class AvroContainerFileHeader implements WritableComparable { + + static final byte[] SYNC_MARKER = new byte[DataFileConstants.SYNC_SIZE]; + static private final int SCHEMA_LENGTH_VARINT_BYTES_MAX = 5; + static private final byte[] HEAD; + static private final byte[] TAIL; + + static { + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(stream, null); + try { + encoder.writeFixed(DataFileConstants.MAGIC); + encoder.writeMapStart(); + encoder.setItemCount(1L); + encoder.startItem(); + encoder.writeString(DataFileConstants.SCHEMA); + HEAD = stream.toByteArray(); + stream.reset(); + encoder.writeMapEnd(); + encoder.writeFixed(SYNC_MARKER); + TAIL = stream.toByteArray(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private BytesWritable header; + private int schemaOffset; + private int schemaLength; + + /** + * No-arg constructor. + * + * Instance remains invalid until writer schema is set using + * {@link AvroContainerFileHeader#set(Schema)} or until + * {@link AvroContainerFileHeader#readFields(DataInput)} is called. + */ + public AvroContainerFileHeader() { + header = new BytesWritable(new byte[DataFileConstants.DEFAULT_SYNC_INTERVAL]); + header.set(HEAD, 0, HEAD.length); + } + + /** + * Wrap a writer schema. + */ + public AvroContainerFileHeader(Schema writerSchema) { + this(); + set(writerSchema); + } + + /** + * Schema for Avro container file headers, as defined in the spec. + */ + public static Schema fileHeaderSchema() { + return SchemaBuilder + .record("org.apache.avro.file.Header") + .fields() + .name("magic") + .type(SchemaBuilder.fixed("Magic").size(DataFileConstants.MAGIC.length)) + .noDefault() + .name("meta") + .type(SchemaBuilder.map().values(Schema.create(Schema.Type.BYTES))) + .noDefault() + .name("sync") + .type(SchemaBuilder.fixed("Magic").size(DataFileConstants.SYNC_SIZE)) + .noDefault() + .endRecord(); + } + + /** + * Sets writer schema for this instance. + */ + public void set(Schema writerSchema) { + final byte[] json; + try { + json = writerSchema.toString().getBytes("UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + resetBuffer(json.length); + System.arraycopy(json, 0, header.getBytes(), schemaOffset, schemaLength); + finalizeBuffer(); + } + + /** + * Returns copy of writer schema. + */ + public Schema getWriterSchema() { + InputStream stream = new ByteArrayInputStream(header.getBytes(), schemaOffset, schemaLength); + try { + return new Schema.Parser().parse(stream); + } catch (IOException e) { + throw new AvroRuntimeException(e); + } + } + + /** + * Exposes the underlying BytesWritable instance containing the encoded file header object. + */ + public BytesWritable unwrap() { + return header; + } + + /** {@inheritDoc} */ + @Override + public int compareTo(AvroContainerFileHeader o) { + return header.compareTo(o.header); + } + + /** {@inheritDoc} */ + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(schemaLength); + out.write(header.getBytes(), schemaOffset, schemaLength); + } + + /** {@inheritDoc} */ + @Override + public void readFields(DataInput in) throws IOException { + resetBuffer(in.readInt()); + in.readFully(header.getBytes(), schemaOffset, schemaLength); + finalizeBuffer(); + } + + private void resetBuffer(int newSchemaLength) { + schemaLength = newSchemaLength; + header.setSize(HEAD.length + SCHEMA_LENGTH_VARINT_BYTES_MAX + schemaLength + TAIL.length); + schemaOffset = HEAD.length; + schemaOffset += BinaryData.encodeInt(schemaLength, header.getBytes(), schemaOffset); + header.setSize(schemaOffset + schemaLength + TAIL.length); + } + + private void finalizeBuffer() { + System.arraycopy(TAIL, 0, header.getBytes(), schemaOffset + schemaLength, TAIL.length); + } + + /** {@inheritDoc} */ + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || !(o instanceof AvroContainerFileHeader)) return false; + AvroContainerFileHeader that = (AvroContainerFileHeader) o; + return header.equals(that.header); + } + + /** {@inheritDoc} */ + @Override + public int hashCode() { + return header.hashCode(); + } +} diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroEncodedInputFormat.java b/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroEncodedInputFormat.java new file mode 100644 index 00000000000..2cb9b17f5fa --- /dev/null +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroEncodedInputFormat.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.avro.mapreduce; + +import org.apache.avro.file.DataFileStream; +import org.apache.avro.io.DatumReader; +import org.apache.avro.mapred.AvroContainerFileBlock; +import org.apache.avro.mapred.AvroContainerFileHeader; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Collections; +import java.util.Iterator; + +/** + * A MapReduce InputFormat that can handle Avro container files. + * + *

Keys are AvroContainerFileHeader objects which contain the Avro container file header. + * Values are AvroContainerFileBlock objects which contain decompressed Avro container file blocks, + * which consist of a number of binary-encoded objects.

+ */ +public class AvroEncodedInputFormat + extends FileInputFormat { + + /** + * Utility static method for Hadoop Map/Reduce jobs. + *

+ * Constructs a DataFileStream given an Avro container file header and one block. + */ + public static DataFileStream stream( + DatumReader reader, AvroContainerFileHeader key, AvroContainerFileBlock value) + throws IOException { + return new DataFileStream(new AvroBlockInputStream(key, value), reader); + } + + /** + * Utility static method for Hadoop Map/Reduce jobs. + *

+ * Constructs a DataFileStream given an Avro container file header and some blocks. + */ + public static DataFileStream stream( + DatumReader reader, AvroContainerFileHeader key, Iterable values) + throws IOException { + return new DataFileStream(new AvroBlockInputStream(key, values), reader); + } + + /** + * {@inheritDoc} + */ + @Override + public RecordReader createRecordReader( + InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { + return new AvroEncodedRecordReader(); + } + + static class AvroBlockInputStream extends InputStream { + + private InputStream stream; + private Iterator iterator; + + AvroBlockInputStream(AvroContainerFileHeader header, AvroContainerFileBlock value) { + this.stream = bytesWritableToStream(header.unwrap()); + this.iterator = Collections.singletonList(value).iterator(); + } + + AvroBlockInputStream(AvroContainerFileHeader header, Iterable values) { + this.stream = bytesWritableToStream(header.unwrap()); + this.iterator = values.iterator(); + } + + private void update() throws IOException { + if (stream.available() == 0 && iterator.hasNext()) { + stream = bytesWritableToStream(iterator.next().unwrap()); + } + } + + private InputStream bytesWritableToStream(BytesWritable bw) { + return new ByteArrayInputStream(bw.getBytes(), 0, bw.getLength()); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + update(); + return stream.read(b, off, len); + } + + @Override + public int read() throws IOException { + update(); + return stream.read(); + } + + @Override + public int available() throws IOException { + return stream.available(); + } + + @Override + public long skip(long n) throws IOException { + return stream.skip(n); + } + + @Override + public void close() throws IOException { + stream.close(); + stream = new ByteArrayInputStream(new byte[0]); + iterator = Collections.emptyList().iterator(); + super.close(); + } + } +} + diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroEncodedOutputFormat.java b/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroEncodedOutputFormat.java new file mode 100644 index 00000000000..f02abbe0834 --- /dev/null +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroEncodedOutputFormat.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.avro.mapreduce; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.hadoop.io.AvroSerialization; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; + +/** + * FileOutputFormat for writing Avro container files with already-encoded objects. + * + *

Keys are BytesWritable containing a sequence of binary-encoded objects. Values are + * LongWritable containing the object count in the key.

+ * + *

No validation is performed to verify that the schema used in the encoding matches the writer + * schema declared in the job configuration.

+ */ +public class AvroEncodedOutputFormat + extends AvroOutputFormatBase { + + /** + * {@inheritDoc} + */ + @Override + public RecordWriter getRecordWriter(TaskAttemptContext context) + throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + GenericData dataModel = AvroSerialization.createDataModel(conf); + + // Get the writer schema. + Schema writerSchema = AvroJob.getOutputKeySchema(conf); + boolean isMapOnly = context.getNumReduceTasks() == 0; + if (isMapOnly) { + Schema mapOutputSchema = AvroJob.getMapOutputKeySchema(conf); + if (mapOutputSchema != null) { + writerSchema = mapOutputSchema; + } + } + if (null == writerSchema) { + throw new IOException( + "AvroEncodedOutputFormat requires an output schema. " + + "Use AvroJob.setOutputKeySchema()."); + } + + return new AvroEncodedRecordWriter( + writerSchema, + dataModel, + getCompressionCodec(context), + getAvroFileOutputStream(context), + getSyncInterval(context)); + } +} diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroEncodedRecordReader.java b/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroEncodedRecordReader.java new file mode 100644 index 00000000000..e851e4ac0de --- /dev/null +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroEncodedRecordReader.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.avro.mapreduce; + +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.SeekableInput; +import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.mapred.AvroContainerFileBlock; +import org.apache.avro.mapred.AvroContainerFileHeader; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * Reads file header and decompressed, binary-encoded object blocks from an input split + * representing a chunk of an Avro container file. + */ +public class AvroEncodedRecordReader + extends AvroRecordReaderBase { + + private BlockDataFileReader fileReader; + private AvroContainerFileHeader fileHeader; + + public AvroEncodedRecordReader() { + super(null); + } + + /** {@inheritDoc} */ + @Override + protected DataFileReader createAvroFileReader( + SeekableInput input, DatumReader datumReader) + throws IOException { + return fileReader = new BlockDataFileReader(input, datumReader); + } + + /** {@inheritDoc} */ + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext context) + throws IOException, InterruptedException { + super.initialize(inputSplit, context); + fileHeader = new AvroContainerFileHeader(fileReader.getSchema()); + } + + /** {@inheritDoc} */ + @Override + public AvroContainerFileHeader getCurrentKey() { + return fileHeader; + } + + /** {@inheritDoc} */ + @Override + public AvroContainerFileBlock getCurrentValue() { + return fileReader.block; + } + + static class BlockDataFileReader extends DataFileReader { + + AvroContainerFileBlock block = new AvroContainerFileBlock(); + + BlockDataFileReader(SeekableInput sin, DatumReader reader) + throws IOException { + super(sin, reader); + } + + @Override + public IndexedRecord next(IndexedRecord indexedRecord) throws IOException { + ByteBuffer bb = nextBlock(); + final long count = getBlockCount(); + final int pos = bb.arrayOffset() + bb.position(); + final int len = bb.remaining(); + block.set(count, bb.array(), pos, len); + blockFinished(); + return null; + } + } +} diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroEncodedRecordWriter.java b/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroEncodedRecordWriter.java new file mode 100644 index 00000000000..423e9f31080 --- /dev/null +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroEncodedRecordWriter.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.avro.mapreduce; + +import org.apache.avro.Schema; +import org.apache.avro.file.CodecFactory; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +/** + * Writes binary-encoded Avro records to an Avro container file output stream. + */ +public class AvroEncodedRecordWriter + extends RecordWriter + implements Syncable { + + private final DataFileWriter fileWriter; + + @SuppressWarnings("unchecked") + public AvroEncodedRecordWriter(Schema writerSchema, + GenericData dataModel, + CodecFactory compressionCodec, + OutputStream outputStream, + int syncInterval) + throws IOException { + fileWriter = new DataFileWriter(dataModel.createDatumWriter(writerSchema)); + fileWriter.setCodec(compressionCodec); + fileWriter.setSyncInterval(syncInterval); + fileWriter.create(writerSchema, outputStream); + } + + /** {@inheritDoc} */ + @Override + public long sync() throws IOException { + return fileWriter.sync(); + } + + /** {@inheritDoc} */ + @Override + public void write(BytesWritable encodedObjects, LongWritable objectsCount) throws IOException { + long count = objectsCount.get(); + if (count <= 0) { + throw new IOException("AvroEncodedRecordWriter requires non-negative object count."); + } + while (--count > 0) { + fileWriter.appendEncoded(ByteBuffer.wrap(new byte[0])); + } + fileWriter.appendEncoded( + ByteBuffer.wrap(encodedObjects.getBytes(), 0, encodedObjects.getLength())); + } + + /** {@inheritDoc} */ + @Override + public void close(TaskAttemptContext context) throws IOException { + fileWriter.close(); + } +} diff --git a/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroEncodedInputAndOutputFormats.java b/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroEncodedInputAndOutputFormats.java new file mode 100644 index 00000000000..1548c66389d --- /dev/null +++ b/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroEncodedInputAndOutputFormats.java @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.avro.mapreduce; + +import org.apache.avro.file.DataFileReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.mapred.AvroContainerFileBlock; +import org.apache.avro.mapred.AvroContainerFileHeader; +import org.apache.avro.mapred.AvroKey; +import org.apache.avro.mapred.FsInput; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.easymock.EasyMock.*; +import static org.junit.Assert.assertNotNull; + +public class TestAvroEncodedInputAndOutputFormats { + + /** A temporary directory for test data. */ + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); + + /** + * Verifies that a non-null record reader can be created, and the key/value types are + * as expected. + */ + @Test + public void testCreateRecordReader() throws IOException, InterruptedException { + // Set up the job configuration. + Job job = new Job(); + Configuration conf = job.getConfiguration(); + + FileSplit inputSplit = createMock(FileSplit.class); + TaskAttemptContext context = createMock(TaskAttemptContext.class); + expect(context.getConfiguration()).andReturn(conf).anyTimes(); + + replay(inputSplit); + replay(context); + + AvroEncodedInputFormat inputFormat = new AvroEncodedInputFormat(); + @SuppressWarnings("unchecked") + RecordReader recordReader = + inputFormat.createRecordReader(inputSplit, context); + assertNotNull(inputFormat); + recordReader.close(); + + verify(inputSplit); + verify(context); + } + + private static class TestMapper extends + Mapper, NullWritable> { + + private AvroKey mAvroKey = new AvroKey(); + + @Override + protected void map(AvroContainerFileHeader key, AvroContainerFileBlock value, Context context) + throws IOException, InterruptedException { + + assert TextStats.getClassSchema().equals(key.getWriterSchema()); + assert value.getObjectCount() > 0; + + DatumReader datumReader = new ReflectDatumReader(TextStats.class); + for (TextStats ts : AvroEncodedInputFormat.stream(datumReader, key, value)) { + mAvroKey.datum(ts); + context.write(mAvroKey, NullWritable.get()); + } + } + } + + private Job createTestJob(String outputLabel) throws Exception { + Job job = new Job(); + FileInputFormat.setInputPaths(job, new Path(getClass() + .getResource("/org/apache/avro/mapreduce/mapreduce-test-input.avro") + .toURI().toString())); + job.setInputFormatClass(AvroEncodedInputFormat.class); + Path outputPath = new Path(tmpFolder.getRoot().getPath() + "/out-" + outputLabel); + FileOutputFormat.setOutputPath(job, outputPath); + return job; + } + + /** Checks that the results from the MapReduce were as expected. */ + private void validateTestJobResults(Job job) throws Exception { + Path outputPath = FileOutputFormat.getOutputPath(job); + FileSystem fileSystem = FileSystem.get(job.getConfiguration()); + FileStatus[] outputFiles = fileSystem.globStatus(outputPath.suffix("/part-*")); + Assert.assertEquals(1, outputFiles.length); + + DataFileReader reader = new DataFileReader( + new FsInput(outputFiles[0].getPath(), job.getConfiguration()), + new SpecificDatumReader()); + Map counts = new HashMap(); + for (TextStats record : reader) { + counts.put(record.name.toString(), record.count); + } + reader.close(); + + Assert.assertEquals(3, counts.get("apple").intValue()); + Assert.assertEquals(2, counts.get("banana").intValue()); + Assert.assertEquals(1, counts.get("carrot").intValue()); + } + + /** + * Verifies that the AvroEncodedInputFormat behaves as expected. + */ + @Test + public void testInputFormat() throws Exception { + Job job = createTestJob("test-avro-encoded-input-format"); + + job.setMapperClass(TestMapper.class); + AvroJob.setMapOutputKeySchema(job, TextStats.getClassSchema()); + job.setNumReduceTasks(0); + job.setOutputFormatClass(AvroKeyOutputFormat.class); + + Assert.assertTrue(job.waitForCompletion(true)); + + validateTestJobResults(job); + } + + private static class TestReducer + extends Reducer { + + @Override + protected void reduce( + AvroContainerFileHeader key, Iterable values, Context context) + throws IOException, InterruptedException { + + assert TextStats.getClassSchema().equals(key.getWriterSchema()); + + DatumReader datumReader = new ReflectDatumReader(TextStats.class); + DatumWriter datumWriter = + new GenericDatumWriter(TextStats.getClassSchema()); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + BinaryEncoder encoder = null; + for (TextStats ts : AvroEncodedInputFormat.stream(datumReader, key, values)) { + outputStream.reset(); + encoder = EncoderFactory.get().directBinaryEncoder(outputStream, encoder); + datumWriter.write(ts, encoder); + context.write(new BytesWritable(outputStream.toByteArray()), new LongWritable(1L)); + } + } + } + + /** + * Verifies that the AvroEncodedOutputFormat behaves as expected, assuming that + * AvroEncodedInputFormat behaves as expected. + */ + @Test + public void testOutputFormat() throws Exception { + Job job = createTestJob("test-avro-encoded-output-format"); + + job.setMapOutputKeyClass(AvroContainerFileHeader.class); + job.setMapOutputValueClass(AvroContainerFileBlock.class); + job.setReducerClass(TestReducer.class); + AvroJob.setOutputKeySchema(job, TextStats.getClassSchema()); + job.setOutputKeyClass(BytesWritable.class); + job.setOutputValueClass(LongWritable.class); + job.setOutputFormatClass(AvroEncodedOutputFormat.class); + + Assert.assertTrue(job.waitForCompletion(true)); + + validateTestJobResults(job); + } +} diff --git a/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroEncodedRecordReader.java b/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroEncodedRecordReader.java new file mode 100644 index 00000000000..874f7cc6b86 --- /dev/null +++ b/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroEncodedRecordReader.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.avro.mapreduce; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.file.SeekableFileInput; +import org.apache.avro.file.SeekableInput; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.mapred.AvroContainerFileBlock; +import org.apache.avro.mapred.AvroContainerFileHeader; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +import static org.easymock.EasyMock.*; +import static org.junit.Assert.*; + +public class TestAvroEncodedRecordReader { + + /** + * A temporary directory for test data. + */ + @Rule + public TemporaryFolder mTempDir = new TemporaryFolder(); + + /** + * Verifies that avro records can be read and progress is reported correctly. + */ + @Test + public void testReadRecords() throws IOException, InterruptedException { + + // Create the test avro file input with three records. + Schema schema = SchemaBuilder.builder().record("stats").fields() + .name("counts").type(Schema.create(Schema.Type.INT)).noDefault() + .name("name").type(Schema.create(Schema.Type.STRING)).noDefault() + .endRecord(); + + GenericRecord firstInputRecord = new GenericRecordBuilder(schema) + .set("counts", 3) + .set("name", "apple") + .build(); + + GenericRecord secondInputRecord = new GenericRecordBuilder(schema) + .set("counts", 2) + .set("name", "banana") + .build(); + + GenericRecord thirdInputRecord = new GenericRecordBuilder(schema) + .set("counts", 1) + .set("name", "carrot") + .build(); + + final SeekableInput avroFileInput = new SeekableFileInput( + AvroFiles.createFile( + new File(mTempDir.getRoot(), "myInputFile.avro"), + schema, + firstInputRecord, + secondInputRecord, + thirdInputRecord)); + + // Create the record reader over the avro input file. + RecordReader recordReader + = new AvroEncodedRecordReader() { + @Override + protected SeekableInput createSeekableInput(Configuration conf, Path path) + throws IOException { + return avroFileInput; + } + }; + + // Set up the job configuration. + Configuration conf = new Configuration(); + + // Create a mock input split for this record reader. + FileSplit inputSplit = createMock(FileSplit.class); + expect(inputSplit.getPath()).andReturn(new Path("/path/to/an/avro/file")).anyTimes(); + expect(inputSplit.getStart()).andReturn(0L).anyTimes(); + expect(inputSplit.getLength()).andReturn(avroFileInput.length()).anyTimes(); + + // Create a mock task attempt context for this record reader. + TaskAttemptContext context = createMock(TaskAttemptContext.class); + expect(context.getConfiguration()).andReturn(conf).anyTimes(); + + // Initialize the record reader. + replay(inputSplit); + replay(context); + recordReader.initialize(inputSplit, context); + + assertEquals("Progress should be zero before any records are read", + 0.0f, recordReader.getProgress(), 0.0f); + + // Some variables to hold the records. + AvroContainerFileHeader key; + AvroContainerFileBlock value; + + // Read the first record. + assertTrue("Expected at least one record", recordReader.nextKeyValue()); + key = recordReader.getCurrentKey(); + value = recordReader.getCurrentValue(); + + assertNotNull("First record had null key", key); + assertNotNull("First record had null value", value); + + assertEquals(schema, key.getWriterSchema()); + + assertEquals(3L, value.getObjectCount()); + + ByteBuffer encodedObjects = value.getEncodedObjects(); + assertEquals(23, encodedObjects.remaining()); + assertEquals(0x1422132F, encodedObjects.hashCode()); + + assertEquals("Progress should be complete (1 out of 1 records processed)", + 1.0f, recordReader.getProgress(), 0.0f); + + GenericDatumReader datumReader = new GenericDatumReader(schema); + InputStream encodedStream = value.getEncodedObjectStream(); + BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(encodedStream, null); + GenericRecord record = datumReader.read(null, decoder); + assertEquals(3, record.get("counts")); + assertEquals("apple", record.get("name").toString()); + + record = datumReader.read(record, decoder); + assertEquals(2, record.get("counts")); + assertEquals("banana", record.get("name").toString()); + + record = datumReader.read(record, decoder); + assertEquals(1, record.get("counts")); + assertEquals("carrot", record.get("name").toString()); + + assertEquals(0, encodedStream.available()); + + // There should be no more records. + assertFalse("Expected only 1 record", recordReader.nextKeyValue()); + + // Close the record reader. + recordReader.close(); + + // Verify the expected calls on the mocks. + verify(inputSplit); + verify(context); + } +} diff --git a/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroEncodedRecordWriter.java b/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroEncodedRecordWriter.java new file mode 100644 index 00000000000..193d460ebed --- /dev/null +++ b/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroEncodedRecordWriter.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.avro.mapreduce; + +import org.apache.avro.Schema; +import org.apache.avro.file.CodecFactory; +import org.apache.avro.file.DataFileConstants; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericData; +import org.apache.avro.io.DatumReader; +import org.apache.avro.mapred.FsInput; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.InputStream; +import java.io.IOException; + +import static org.easymock.EasyMock.*; +import static org.junit.Assert.*; + +public class TestAvroEncodedRecordWriter { + + /** + * A temporary directory for test data. + */ + @Rule + public TemporaryFolder mTempDir = new TemporaryFolder(); + + /** + * Verifies that AvroEncodedRecordWriter performs as expected. + */ + @Test + public void testWrite() throws IOException { + Schema writerSchema = Schema.create(Schema.Type.INT); + GenericData dataModel = new ReflectData(); + CodecFactory compressionCodec = CodecFactory.deflateCodec(6); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + TaskAttemptContext context = createMock(TaskAttemptContext.class); + + replay(context); + + // Write an avro container file with two records: 1 and 2. + AvroEncodedRecordWriter recordWriter = new AvroEncodedRecordWriter( + writerSchema, + dataModel, + compressionCodec, + outputStream, + DataFileConstants.DEFAULT_SYNC_INTERVAL); + + BytesWritable encodedObjects = new BytesWritable(new byte[2]); + encodedObjects.getBytes()[0] = 2; + encodedObjects.getBytes()[1] = 4; + LongWritable objectsCount = new LongWritable(2L); + recordWriter.write(encodedObjects, objectsCount); + recordWriter.close(context); + + verify(context); + + // Verify that the file was written as expected. + InputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray()); + Schema readerSchema = Schema.create(Schema.Type.INT); + DatumReader datumReader = new SpecificDatumReader(readerSchema); + DataFileStream dataFileReader = new DataFileStream(inputStream, datumReader); + + assertTrue(dataFileReader.hasNext()); // Record 1. + assertEquals(1, dataFileReader.next().intValue()); + assertTrue(dataFileReader.hasNext()); // Record 2. + assertEquals(2, dataFileReader.next().intValue()); + assertFalse(dataFileReader.hasNext()); // No more records. + + dataFileReader.close(); + } + + /** + * Verifies that AvroEncodedRecordWriter performs as expected with syncable outputs.. + */ + @Test + public void testSyncableWrite() throws IOException { + Schema writerSchema = Schema.create(Schema.Type.INT); + GenericData dataModel = new ReflectData(); + CodecFactory compressionCodec = CodecFactory.deflateCodec(6); + FileOutputStream outputStream = + new FileOutputStream(new File(mTempDir.getRoot(), "temp.avro")); + TaskAttemptContext context = createMock(TaskAttemptContext.class); + + replay(context); + + // Write an avro container file with two records: 1 and 2. + AvroEncodedRecordWriter recordWriter = new AvroEncodedRecordWriter( + writerSchema, + dataModel, + compressionCodec, + outputStream, + DataFileConstants.DEFAULT_SYNC_INTERVAL); + + BytesWritable encodedObjects = new BytesWritable(new byte[1]); + LongWritable objectsCount = new LongWritable(1L); + long positionOne = recordWriter.sync(); + encodedObjects.getBytes()[0] = 2; + recordWriter.write(encodedObjects, objectsCount); + long positionTwo = recordWriter.sync(); + encodedObjects.getBytes()[0] = 4; + recordWriter.write(encodedObjects, objectsCount); + recordWriter.close(context); + + verify(context); + + // Verify that the file was written as expected. + Configuration conf = new Configuration(); + conf.set("fs.default.name", "file:///"); + Path avroFile = new Path(new File(mTempDir.getRoot(), "temp.avro").toString()); + DataFileReader dataFileReader = + new DataFileReader(new FsInput(avroFile, conf), new SpecificDatumReader()); + + dataFileReader.seek(positionTwo); + assertTrue(dataFileReader.hasNext()); // Record 2. + assertEquals(2, (Object) dataFileReader.next()); + + dataFileReader.seek(positionOne); + assertTrue(dataFileReader.hasNext()); // Record 1. + assertEquals(1, (Object) dataFileReader.next()); + + dataFileReader.close(); + } +}